ImageFlux Live StreamingでAIアシスタントを作ってみた

こんにちは、テリーです。ChatGPTのライバルサービスが次々に出てきています。GeminiもClaudeもそれぞれ特徴があり、同じプロンプトで比較しても全く異なる返答が来てとても興味深いです。人間の専門家で言うところの「セカンドオピニオン」「サードオピニオン」に相当するでしょうか。これからますます多くの専門家が登場し、いつでもどこでもAIを使う状態(コモディティ化)に向かっていくことが想像できます。

さて、AIと人間が文章で会話できる時代になりました。次は音声による会話の時代がすぐにやってきます。そのあとはAIキャラクターとのビデオ通話も来年には当たり前になっているでしょう。例えば旅行や結婚式のプランニングをカップルが相談するケースです。有料通話サービス中にAIが参加し、専門家として音声でアドバイスをしてくれたら高品質で格安のサービスが実現できそうです。

音声による会話やサポートを自社サービスに入れたくなったら、どのように実装すれば良いでしょうか?ChatGPTはAIと人間が1対1で会話をするのが前提に作られています。人間が2人以上の会議中に3人目としてAIが参加するにはどうしたら良いでしょうか? 今回はこのケースの実装例を紹介します。

対象読者

  • AIチャットボットを使ったビデオ通話アプリを作りたい人
  • ビデオ会議・音声通話会議にAIアシスタントを追加したい人
  • ビデオ会議・音声通話会議にTTSを使った音声を差し込みたい人

動作確認環境

  • Ubuntu 22.04 LTS (x64)
  • VOICEVOX CORE 0.15.3 (x64)
  • Python 3.10.12
  • node.js 22.3.0
  • Sora JS SDK 2024.1.1
  • Sora Python SDK 2024.2.0

本サンプルはOS依存、CPU依存部分を複数含みます。異なるOS、異なるCPUの場合、動作しないことがあります。dockerで動作確認済みです。

VOICEVOX COREでWaveファイル作成

まず、AIアシスタントに話をさせるためにTTSのセットアップを行います。高品質なTTSの選択肢がたくさんありますが、本記事では無料で使える「VOICEVOX CORE: 春日部つむぎ」を使用します。

下記のコマンドを実行してインストールします。NVIDIA GPUがついている場合はCUDA用のバイナリを使うことができます。

curl -sSfL https://github.com/VOICEVOX/voicevox_core/releases/latest/download/download-linux-x64 -o download
chmod +x download
./download
# ./download --device cuda
rm download
pip3 install https://github.com/VOICEVOX/voicevox_core/releases/download/0.15.3/voicevox_core-0.15.3+cpu-cp38-abi3-linux_x86_64.whl
# pip3 install https://github.com/VOICEVOX/voicevox_core/releases/download/0.15.3/voicevox_core-0.15.3+cuda-cp38-abi3-linux_x86_64.whl
export LD_LIBRARY_PATH=$PWD/voicevox_core
echo "export LD_LIBRARY_PATH=$PWD/voicevox_core" >> ~/.bashrc

インストールが終わったら下記のPythonスクリプトを実行し、音声ファイルが作成できることを確認します。

from voicevox_core import VoicevoxCore
from pathlib import Path

class TTS:
    def __init__(self, speakerid: int):
        self.speakerid = speakerid
        open_jtalk_dict_dir = (
            Path(__file__).parent / "voicevox_core/open_jtalk_dic_utf_8-1.11"
        )
        self.core = VoicevoxCore(open_jtalk_dict_dir=open_jtalk_dict_dir)
        self.core.load_model(speakerid)

    def saveWav(self, wav_path: str, text: str):
        audio_query = self.core.audio_query(text, self.speakerid)
        wav = self.core.synthesis(audio_query, self.speakerid)
        Path(wav_path).write_bytes(wav)
if __name__ == "__main__":
    speakerid = 8  # 春日部つむぎ
    tts = TTS(speakerid)

    text1 = "ご利用ありがとうございます。"
    tts.saveWav(f"tts_1.wav", text1)

    text2 = "ご利用ありがとうございました。"
    tts.saveWav(f"tts_2.wav", text2)

ImageFlux Live StreamingのWebアプリを作成

ボットの音声を受信、再生する動作を確認するために、ImageFlux Live Streamingを使った最も簡単なWebアプリを作成します。

ImageFlux Live StreamingのWebhookを受け取るには外部からアクセスできるホストが必要なので、簡易的にngrokコマンドを使用します。ngrokについては過去記事で紹介していますのでご覧ください。

ngrok http 8080

ngrokを実行すると下記のようなホスト名が割り当てられます。本記事ではこのホスト名を例として使用しますので、適宜読み替えてください。

https://123f-123-456-789-012.ngrok-free.app

次にImageFlux Live Streamingから受け取るWebhookのURLを決めます。認証WebhookとイベントWebhookの2つです。本記事では、下記のように定めます。

https://123f-123-456-789-012.ngrok-free.app/auth_webhook
https://123f-123-456-789-012.ngrok-free.app/event_webhook

このWebhookを使用して双方向の受信をするために、ImageFlux Live StreamingのCreateMultistreamChannel APIを使用してチャンネルを作成します。ACCESS_TOKENはImageFlux Live Streamingを契約した時に受け取る文字列です。

下記のコマンドを実行します。

ACCESS_TOKEN="XXXXXXXXXXXXXXXXXXXXXXXXX"
curl -H "Content-Type: application/json" -H "X-Sora-Target: ImageFlux_20200316.CreateMultistreamChannel" -H "Authorization: Bearer ${ACCESS_TOKEN}" -d '{
"auth_webhook_url":"https://123f-123-456-789-012.ngrok-free.app/auth_webhook",
"event_webhook_url":"https://123f-123-456-789-012.ngrok-free.app/event_webhook"
}' https://live-api.imageflux.jp/

下記のようなレスポンスが帰ってきますので値をメモします。本記事では下記の値が取得できたものとして進めますので、適宜読み替えてください。

{
"channel_id": "aabbccddeeff11223344aabbccddeeff11223344aabbccddeeff11223344",
"sora_url": "wss://xxxyyy.imageflux.jp/signaling"
}

本記事ではcurlコマンドでチャンネルを作成しましたが、本番環境ではWebアプリからサーバ側のプログラミング言語でImageFluxのAPIを呼びます。

チャンネルが作成できたら、ボットの音声を受信、再生するためのシンプルなWebページを作成します。recv.htmlという名前でブラウザからアクセスできる場所に保存します。

<div id="remote_players">
	<audio id="remote_audio" autoplay controls playsinline></audio>
</div>
<button onclick="connect()">Connect</button>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/sora-js-sdk/dist/sora.min.js"></script>
<script type="text/javascript">
	async function connect() {
		const channel_id = 'aabbccddeeff11223344aabbccddeeff11223344aabbccddeeff11223344';
		const sora_url = 'wss://xxxyyy.imageflux.jp/signaling';
		const metadata = {};
		const sora = Sora.connection(sora_url, true);
		let options = {
			multistream: true,
			audio: true,
			audioCodecType: 'OPUS',
			video: false,
		}
		const pub = sora.recvonly(channel_id, metadata, options);
		pub.on('track', (event) => {
			if (event.track.kind === 'audio') {
				const stream = event.streams[0];
				addPlayer(stream);
			}
		});
		await pub.connect();
	}
	function addPlayer(stream) {
		document.querySelector('#remote_audio').srcObject = stream;
	}
</script>

ブラウザでrecv.htmlにアクセスし、consoleを開いた後にconnectボタンを押すと大量のログが出力されます。エラーが出ないことを確認します。この時点では音声は何も再生されませんが、この状態でブラウザを閉じずに次に進みます。

ボット音声を配信

次はImageFlux Live Streamingに接続し、TTSの音声を送信するボットスクリプトです。下記のコマンドをbot_voice.pyという名前で保存します。

import json, os, time, sys
from threading import Event
from typing import Any, Dict, List
from dotenv import load_dotenv
import numpy as np
from sora_sdk import Sora, SoraConnection, SoraSignalingErrorCode

from voicevox_core import VoicevoxCore
from pathlib import Path


class TTS:
    def __init__(self, speakerid: int):
        self.speakerid = speakerid
        open_jtalk_dict_dir = (
            Path(__file__).parent / "voicevox_core/open_jtalk_dic_utf_8-1.11"
        )
        self.core = VoicevoxCore(open_jtalk_dict_dir=open_jtalk_dict_dir)
        self.core.load_model(speakerid)

    def saveWav(self, wav_path: str, text: str):
        audio_query = self.core.audio_query(text, self.speakerid)
        wav = self.core.synthesis(audio_query, self.speakerid)
        Path(wav_path).write_bytes(wav)


class BotVoice:
    def __init__(
        self,
        channel_id: str,
        signaling_url: str,
        text: str,
    ):
        self.text = text
        self.audio_channels = 1
        self.audio_sample_rate = 24000
        self.tts = None
        self._sora: Sora = Sora()
        self._audio_source = self._sora.create_audio_source(
            self.audio_channels, self.audio_sample_rate
        )
        self._connection: SoraConnection = self._sora.create_connection(
            signaling_urls=[signaling_url],
            role="sendonly",
            channel_id=channel_id,
            audio_source=self._audio_source,
            audio=True,
            video=False,
        )
        self._connection_id = ""
        self._connected = Event()
        self._closed = False
        self._default_connection_timeout_s = 10.0
        self._connection.on_set_offer = self._on_set_offer
        self._connection.on_notify = self._on_notify
        self._connection.on_disconnect = self._on_disconnect

    def connect(self):
        self._connection.connect()
        assert self._connected.wait(
            timeout=self._default_connection_timeout_s
        ), "connection timeout"

    def disconnect(self):
        self._connection.disconnect()

    def _on_notify(self, raw_message: str):
        message: Dict[str, Any] = json.loads(raw_message)
        if (
            message["type"] == "notify"
            and message["event_type"] == "connection.created"
            and message["connection_id"] == self._connection_id
        ):
            self._connected.set()

    def _on_set_offer(self, raw_message: str):
        message: Dict[str, Any] = json.loads(raw_message)
        if message["type"] == "offer":
            self._connection_id = message["connection_id"]

    def _on_disconnect(self, error_code: SoraSignalingErrorCode, message: str):
        self._connected.clear()
        self._closed = True

    def _callback(self, indata: np.ndarray, frames: int, time, status):
        self._audio_source.on_data(indata)

    def play(self):
        with open(self.file_path, "rb") as f:
            _ = f.read(44)  # 先頭の44バイトを読み飛ばす
            data = np.fromfile(f, dtype=np.int16)
        chunk_duration = 0.02
        chunk_size = int(self.audio_sample_rate * chunk_duration)
        padding_size = chunk_size - (len(data) % chunk_size)
        if padding_size != chunk_size:
            data = np.pad(data, (0, padding_size), "constant", constant_values=0)

        t = time.time()
        for i in range(0, len(data), chunk_size):
            chunk = data[i : i + chunk_size]
            chunk = chunk.reshape((-1, 1))
            self._audio_source.on_data(chunk, t)
            t += chunk_duration
            # オーディオバッファの空き状態を取得する機能がないため、ウェイトを入れて調整する
            # ウェイトを入れすぎるとスキマを感じる
            # ウェイトが少ないと早口になり、たまにチャンク落ちする
            time.sleep(chunk_duration * 0.8)  # 0.8-1.0で調整
        time.sleep(1)  # 終了後に即切断すると最後のチャンクが再生されない

    def createVoice(self):
        if self.tts is None:
            speakerid = 8  # 春日部つむぎ
            self.tts = TTS(speakerid)
        t = time.time()
        self.file_path = f"tts_{t}.wav"
        self.tts.saveWav(self.file_path, self.text)

    def run(self):
        self.createVoice()
        self.connect()
        try:
            self.play()
        except KeyboardInterrupt:
            pass
        finally:
            self.disconnect()


def main():
    load_dotenv()
    channel_id = sys.argv[1]
    signaling_url = sys.argv[2]
    text = sys.argv[3]
    bot = BotVoice(channel_id, signaling_url, text)
    bot.run()


if __name__ == "__main__":
    main()

コマンドライン引数が3つです。第一引数がchannel_id、第二引数がsignaling_url(sora_url)、第三引数がボットにしゃべらせたい文字列です。

下記のコマンドを実行すると、WebRTCに接続し、ボットの音声がブラウザ経由で再生されます。

python3 bot_voice.py aabbccddeeff11223344aabbccddeeff11223344aabbccddeeff11223344 wss://xxxyyy.imageflux.jp/signaling おはようございます

VOICEVOXは24kHzモノラルの音声を出力します。他のTTSを使用する場合はサンプリングレートの違いに注意が必要です。

通話開始判定

ここまででボットからWebRTCに音声を送信することができました。次はユーザーがWebRTCに接続したらボットから音声が再生されるようにします。WebRTCに接続されたかどうかの判定はイベントWebhookのconnection.createdタイプを使用します。ポイントはWebアプリとは独立した外部プロセスとして実行するところです。TTSは比較的重たい処理のため、Webアプリがブロックしないように注意する必要があります。

下記の例はNuxtを使用しています。role変数の値によって、ブラウザ(recvonly,sendrecv)かボット(sendonly)かを区別しています。

import { spawn } from 'child_process';
export default defineEventHandler(async (event) => {
    const request = await readBody(event);
    if (request.role !== 'sendonly' && request.type == 'connection.created') {
        const signaling_url = 'wss://xxxyyy.imageflux.jp/signaling';
        const cmd = "python3";
        const args = ["bot_voice.py", request.channel_id, signaling_url, "こんにちは、ご利用ありがとうございます"];
        const child = spawn(cmd, args, {
            stdio: 'ignore',
            detached: true,
            env: process.env,
        });
        child.unref();
    }
    return '';
})

イベントWebhookが実装できたらブラウザのrecv.htmlをリロードしてconnectボタンを押します。イベントWebhookが呼び出され、ブラウザから「こんにちは、ご利用ありがとうございます」という音声が再生されれば成功です。

サンプル動画

以上を踏まえて、音声AIアシスタントボットをWebRTCの会議に参加させるサンプルアプリを作成しました。時間制限付きの有料通話を想定したサンプルです。3つのrole(sendrecv, sendonly, recvonly)をそれぞれ、人間の話者、AIアシスタントボット、モニター兼レコーダーとして使っています。2人目の参加者が接続すると同時にボットが起動し「ご利用ありがとうございます。...」と再生。15秒後に別の音声「ご利用ありがとうございました」を再生。20秒後に通話を全員強制切断しています。

0:00 - 0:05 CreateMultistreamChannel API を呼び出してチャンネル作成
0:05 - 0:08 作成したチャンネルにsendrecvで接続
0:08 - 0:11 別ブラウザで最新のチャンネルリスト取得・表示
0:11 - 0:13 一番最新のチャンネルにrecvonlyで接続
0:13 - 0:16 別ブラウザで最新のチャンネルリスト取得・表示
0:16 - 0:19 一番最新のチャンネルにsendrecvで接続
0:19 - 0:27 sendrecvが2接続始まったタイミングから0秒後にボットがバックグラウンドで起動し、sendonlyで接続して音声を送信し、即切断
0:35 - 0:39 同上15秒後にボットがバックグラウンドで起動し、sendonlyで接続して音声を送信し、即切断
0:39 - 0:41 同上20秒後にバックグラウンドタスクからこのチャンネルの全ての接続を強制切断
0:42 - 0:46 DeleteChannel API を呼び出してチャンネル削除

まとめ

音声AIアシスタントボットをImageFlux Live Streamingの音声通話会議に参加させる実装方法を紹介しました。本記事では固定の文字列ですが、ChatGPTを使用して生成した文章を再生させることを想定しています。ぜひ挑戦してみてください。より詳しい実装方法に興味がある方はお気軽にご相談ください。