さくらのクラウド高火力プランを使ってImageFlux Live StreamingにAIライブ配信してみた
こんにちは、テリーです。2023年は画像生成AI、音声AI、LLMなど、映像配信に使用できるたくさんの技術とサービスが公開されて、10年に一度級の進歩を感じました。AIの知能はすでに大学生の平均を超えているという説もあります。疲れ知らずのAIを活用し、2024年も去年と同じような速さで、次から次へと新しい技術、進化した技術が公開されていくのだろうとワクワクが止まりません。
さて、前回の記事では、P2Pの機能を使用してクラウド上のGPUサーバでAIフィルタを適用し、その映像を送り返すサンプルを紹介しました。回線とCPUに余裕がある場合はよいのですが、スマートフォンからのモバイル配信のような限られたリソースの場合には問題が発生します。映像を配信元に送り返さずに、クラウドから直接ビデオ会議サービスに映像を送りたいです。
そこで今回は、さくらのクラウド高火力プラン上で動作するPythonプログラムで映像を受け取り、その映像を元にAIフィルタで生成した画像フレームをImageFlux Live Streamingにライブ配信するサンプルをご紹介します。
目次
対象読者
- Pythonを使ってリアルタイムのAI画像解析を行い、その出力を元にImageFlux Live Streamingで大規模ライブ配信をしたい人
- さくらのクラウド高火力プランを使ってみたい人
本記事はImageFlux Live StreamingおよびWebRTC SFU Soraについて、実装経験のある方、アカウントをお持ちの方向けに書いています。ImageFlux Live Streamingの詳細および契約についてはこちらをご参照ください。
動作確認環境
クライアントPC
- macOS 14.2.1
- Chrome 120.0.6099.234
サーバ
- Ubuntu 22.04.2 LTS
- aiortc 1.6.0
- aiohttp 3.9.1
- sora-python-sdk 2023.3.1
- nvidia-driver 545.29.06
前回の記事との違い
前回の記事で紹介したAIフィルタ用のPythonプログラムは、接続元のブラウザから映像を受け取り、加工して送り返し、さらに別のビデオ会議サイトに送信しています。図にすると下記のようになります。配信元のブラウザが接続している回線を2倍消費していることがわかります。
今回ご紹介する方法では、AIフィルタ用のPythonプログラムから直接ImageFlux Live Streamingに接続し配信するため、配信元の回線は1映像分のみです。
この方式では配信元のパソコンやマイコンへの負荷が低く、また回線がそれほど高速でなくても使用できるというメリットがある一方で、ImageFlux Live Streamingとの接続、切断イベントなどをPythonで記述する必要があります。
さくらのクラウド高火力プランを起動
さくらのクラウド高火力プランは「石狩第1ゾーン」でサーバを追加するときのみ、選択肢に現れます。はじめて使用するときには混乱するので特に要注意です。
値段を見ると、最小プランでも1時間483円、1ヶ月23.1万円という料金です。とても高額なので、しっかりと使用時間の計画を立てて準備をしてから起動しましょう。
OSは「Ubuntu 22.04.2 LTS」を選択します。ディスクは使用するAIが必要とするモデルのファイルサイズで決めます。まずは100GBで試して、足りなければ増やすのがよいでしょう。20GB,40GBでは空き容量に余裕がなく、作り直しになることが多いです。
さくらの高火力サーバには高速なバックボーン回線も付いてきます。他社のGPUクラウドサービスでは、UDPポートを使用できない、回線が細い、ストリーミングが利用規約違反、などの制限があり、ライブ映像解析・ライブ配信用途には使えないところが多いですが、さくらの高火力サーバでは安心して配信することができます。1時間483円は非常に高額ですが、高速な回線費用が含まれていると考えれば納得できるユースケースもあるでしょう。サーバ管理画面には下図のように「100Mbpsベストエフォート」と書かれていますが、実測ではそれよりも速い速度が出ることもあります。
サーバを起動し、SSHで接続ができたら、下記のコマンドを順に実行し、NVIDIAの最新のドライバをインストールします。インストールには5分から10分かかります。インストールが終わったら念のため再起動します。
wget https://us.download.nvidia.com/XFree86/Linux-x86_64/545.29.06/NVIDIA-Linux-x86_64-545.29.06.run
sudo ./NVIDIA-Linux-x86_64-545.29.06.run
sudo apt install -y software-properties-common pciutils gcc make dkms
sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 100 --slave /usr/bin/g++ g++ /usr/bin/g++-12 --slave /usr/bin/gcov gcov /usr/bin/gcov-12
sudo ./NVIDIA-Linux-x86_64-545.29.06.run
nvidia-smi
nvcc -V
sudo reboot
次にsora-python-sdkのインストールをします。
pip install sora_sdk
その他のインストールは、前回の記事を参照してください。
以上でセットアップは終了です。
PythonプログラムからImageFlux Live Streamingにライブ配信
まずはsora_sdkを使ってImageFlux Live Streamingに映像を配信できることを確認します。
下記のプログラムを作成します。(行頭の数字は行番号です)
1 import cv2
2 from PIL import ImageFont, ImageDraw, Image
3 import numpy as np
4 import time
5 from sora_sdk import Sora
6
7
8 class App:
9 def __init__(self):
10 self.init_colors()
11 self.init_sora()
12 self.run()
13
14 def init_colors(self):
15 font_name = "MPLUSRounded1c-Bold"
16 font_size = 72
17 self.font = ImageFont.truetype(font_name, font_size, encoding="utf-8 ")
18 hsv_colors = np.zeros((180, 1, 3), dtype=np.uint8)
19 for i in range(180):
20 hsv_colors[i] = [i, 255, 255]
21 rgb_colors = cv2.cvtColor(hsv_colors, cv2.COLOR_HSV2RGB)
22 self.rgb_array = np.squeeze(rgb_colors)
23
24 def init_sora(self):
25 signaling_url = SIGNALING_URL
26 channel_id = CHANNEL_ID
27 self.sora = Sora()
28 self.video_source = self.sora.create_video_source()
29 self.connection = self.sora.create_connection(
30 signaling_urls=[signaling_url],
31 role="sendonly",
32 channel_id=channel_id,
33 video_source=self.video_source,
34 video_bit_rate=1000,
35 )
36 self.connection.on_disconnect = self.on_disconnect
37
38 def on_disconnect(self, error_code, message):
39 print(f"on_disconnect: error_code='{error_code}' message='{message}' ")
40 self.running = False
41
42 def run(self):
43 img_pil = Image.new("RGB", (320, 240), (255, 255, 255))
44 draw = ImageDraw.Draw(img_pil)
45 position = (img_pil.width // 2, img_pil.height // 2)
46
47 self.connection.connect()
48
49 try:
50 start = time.time()
51 i = 0
52 while True:
53 draw.rectangle([(0, 0), img_pil.size], fill=tuple(self.rgb_a rray[i % 180]))
54 now = time.time() - start
55 formatted_time = "{:.3f}".format(now)
56 draw.text(
57 position,
58 formatted_time,
59 font=self.font,
60 fill="black",
61 stroke_fill="white",
62 stroke_width=6,
63 align="center",
64 anchor="mm",
65 )
66 img = np.array(img_pil)
67 # cv2.imshow("Image", img)
68 self.video_source.on_captured(img)
69 delay = max(int(((i + 1) / 30 - now) * 1000), 1)
70 # print(delay)
71 ret = cv2.waitKey(delay)
72 if ret >= 0:
73 break
74 i += 1
75 finally:
76 self.connection.disconnect()
77 cv2.destroyAllWindows()
78
79
80 if __name__ == "__main__":
81 App()
25,26行目のSIGNALING_URL, CHANNEL_IDには、ImageFlux Live StreamingのCreateMultistreamChannel APIまたはCreateMultistreamChannelWithHLS APIを呼び出して取得した値を指定します。sora-python-sdkはmultistream:trueでの配信になるため、CreateChannel APIで作成したチャンネルには配信できません。
上記のPythonプログラムを実行すると、下図のように経過時間を表示する映像が配信されます。Sora DevToolsを使用するなどして同チャンネルを受信すると、経過時間のカウントが増えていくことが確認できます。
ブラウザのWebカメラ映像をPythonプログラムを経由してImageFlux Live Streamingにライブ配信
次に、前回の記事を参考に、ブラウザからPythonプログラムにP2Pで接続してWebカメラの映像を送信し、Pythonプログラムで受け取った画像フレームを加工せずにImageFlux Live Streamingに送ります。前回の記事との違いは、映像の流れが一方通行で、Pythonプログラムからブラウザには送り返しません。
1 import asyncio, json, logging, ssl, uuid
2 import traceback
3 from aiohttp import web
4 from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescriptio n
5 from av import VideoFrame
6 import numpy as np
7 from queue import Empty
8 from torch.multiprocessing import Process, Queue, RawValue, set_start_method
9 from aiortc.mediastreams import MediaStreamError
10 from sora_sdk import Sora
11
12
13 try:
14 set_start_method("spawn")
15 except RuntimeError:
16 pass
17
18 sora_process = None
19
20
21 logger = logging.getLogger("pc")
22 pcs = set()
23
24
25 class SoraSink:
26 def __init__(self):
27 self.__tracks = {}
28 self.disconnected = False
29
30 def addTrack(self, track):
31 if track not in self.__tracks:
32 self.__tracks[track] = None
33
34 async def start(self):
35 for track, task in self.__tracks.items():
36 if task is None:
37 self.__tracks[track] = asyncio.ensure_future(self.consume(track))
38
39 async def stop(self):
40 for task in self.__tracks.values():
41 if task is not None:
42 task.cancel()
43 self.__tracks = {}
44
45 def on_disconnect(self, error_code, message: str):
46 print(f"on_disconnect: error_code: {error_code}, message: {message}")
47 self.disconnected = True
48
49 async def consume(self, track):
50 signaling_url = SIGNALING_URL
51 channel_id = CHANNEL_ID
52 sora = Sora()
53 video_source = sora.create_video_source()
54 conn = sora.create_connection(
55 signaling_urls=[signaling_url],
56 role="sendonly",
57 channel_id=channel_id,
58 video_source=video_source,
59 video_bit_rate=1000,
60 )
61 conn.on_disconnect = self.on_disconnect
62 conn.connect()
63 try:
64 while not self.disconnected:
65 try:
66 frame = await track.recv()
67 img = frame.to_ndarray(format="bgr24")
68 if img is None:
69 continue
70 video_source.on_captured(img)
71 except MediaStreamError:
72 try:
73 await asyncio.sleep(0.1)
74 except:
75 break
76 except:
77 traceback.print_exc()
78 finally:
79 conn.disconnect()
80
81
82 async def offer(request):
83 params = await request.json()
84 offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
85
86 pc = RTCPeerConnection()
87 pc_id = "PeerConnection(%s)" % uuid.uuid4()
88 pcs.add(pc)
89
90 sorasink = SoraSink()
91
92 def log_info(msg, *args):
93 logger.info(pc_id + " " + msg, *args)
94
95 log_info("Created for %s", request.remote)
96
97 @pc.on("datachannel")
98 def on_datachannel(channel):
99 @channel.on("message")
100 def on_message(message):
101 if isinstance(message, str) and message.startswith("ping"):
102 channel.send("pong" + message[4:])
103
104 @pc.on("connectionstatechange")
105 async def on_connectionstatechange():
106 log_info("Connection state is %s", pc.connectionState)
107 if pc.connectionState == "failed":
108 await pc.close()
109 await sorasink.stop()
110 pcs.discard(pc)
111
112 @pc.on("track")
113 def on_track(track):
114 log_info("Track %s received", track.kind)
115
116 if track.kind == "video":
117 sorasink.addTrack(track)
118
119 @track.on("ended")
120 async def on_ended():
121 log_info("Track %s ended", track.kind)
122 await sorasink.stop()
123
124 await pc.setRemoteDescription(offer)
125 answer = await pc.createAnswer()
126 await pc.setLocalDescription(answer)
127 await sorasink.start()
128
129 return web.Response(
130 content_type="application/json",
131 text=json.dumps(
132 {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
133 ),
134 )
135
136
137 async def index(request):
138 return web.HTTPFound("/index.html")
139
140
141 async def on_shutdown(app):
142 coros = [pc.close() for pc in pcs]
143 await asyncio.gather(*coros)
144 pcs.clear()
145
146
147 @web.middleware
148 async def cors_middleware(request, handler):
149 headers = {
150 "Access-Control-Allow-Headers": "*",
151 "Access-Control-Allow-Methods": "*",
152 "Access-Control-Allow-Origin": "*",
153 }
154 if request.method == "OPTIONS":
155 return web.Response(headers=headers)
156 try:
157 response = await handler(request)
158 for key, value in headers.items():
159 response.headers[key] = value
160 return response
161 except web.HTTPException as e:
162 for key, value in headers.items():
163 e.headers[key] = value
164 raise e
165
166
167 def main():
168 cert_file = "fullchain.pem"
169 key_file = "privkey.pem"
170 ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
171 ssl_context.load_cert_chain(cert_file, key_file)
172
173 app = web.Application(middlewares=[cors_middleware])
174 app.on_shutdown.append(on_shutdown)
175 app.router.add_get("/", index)
176 app.router.add_post("/offer", offer)
177 app.add_routes([web.static("/", "static", show_index=True)])
178
179 web.run_app(
180 app, access_log=None, host="0.0.0.0", port=7860, ssl_context=ssl_context
181 )
182
183
184 if __name__ == "__main__":
185 main()
上記のプログラムを実行し、ブラウザで https://p2pcamera.hopto.org:7860/index.html (前回の記事参照)にアクセスして配信を開始すると、Webカメラの映像が上記Pythonプログラムを経由してImageFlux Live Streamingにリレー配信されていることが確認できます。
AIフィルタの実装と確認
ここまで動作することが確認できれば、あとはPythonのプログラムにAIの処理を追加するだけです。ブラウザからP2Pで接続し、Pythonプログラムで受け取った映像をAI画像解析し、その結果を元に新たな画像フレームを生成してImageFlux Live Streamingに送ります。AIフィルタは前回の記事で使用したDensePoseを使用しています。
1 import asyncio, json, logging, ssl, uuid
2 import traceback
3 from aiohttp import web
4 from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
5 from av import VideoFrame
6 import numpy as np
7 from queue import Empty
8 from torch.multiprocessing import Process, Queue, RawValue, set_start_method
9 from aiortc.mediastreams import MediaStreamError
10 from sora_sdk import Sora
11
12
13 try:
14 set_start_method("spawn")
15 except RuntimeError:
16 pass
17
18 ai_process = None
19 sora_process = None
20 input_queue = Queue()
21 output_queue = Queue()
22 processed_count = RawValue("i", 0)
23
24
25 def push_pop(frame):
26 try:
27 while not input_queue.empty():
28 input_queue.get_nowait()
29 except Empty:
30 pass
31 input_queue.put(frame.to_image())
32 try:
33 return output_queue.get_nowait()
34 except Empty:
35 return None
36
37
38 logger = logging.getLogger("pc")
39 pcs = set()
40
41
42 class SoraSink:
43 def __init__(self):
44 self.__tracks = {}
45 self.disconnected = False
46
47 def addTrack(self, track):
48 if track not in self.__tracks:
49 self.__tracks[track] = None
50
51 async def start(self):
52 for track, task in self.__tracks.items():
53 if task is None:
54 self.__tracks[track] = asyncio.ensure_future(self.consume(track))
55
56 async def stop(self):
57 for task in self.__tracks.values():
58 if task is not None:
59 task.cancel()
60 self.__tracks = {}
61
62 def on_disconnect(self, error_code, message: str):
63 print(f"on_disconnect: error_code: {error_code}, message: {message}")
64 self.disconnected = True
65
66 async def consume(self, track):
67 signaling_url = SIGNALING_URL
68 channel_id = CHANNEL_ID
69 sora = Sora()
70 video_source = sora.create_video_source()
71 conn = sora.create_connection(
72 signaling_urls=[signaling_url],
73 role="sendonly",
74 channel_id=channel_id,
75 video_source=video_source,
76 video_bit_rate=1000,
77 )
78 conn.on_disconnect = self.on_disconnect
79 conn.connect()
80 try:
81 while not self.disconnected:
82 try:
83 frame = await track.recv()
84 img = push_pop(frame)
85 if img is None:
86 continue
87 video_source.on_captured(img)
88 except MediaStreamError:
89 try:
90 await asyncio.sleep(0.1)
91 except:
92 break
93 except:
94 traceback.print_exc()
95 finally:
96 conn.disconnect()
97
98
99 async def offer(request):
100 params = await request.json()
101 offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
102
103 pc = RTCPeerConnection()
104 pc_id = "PeerConnection(%s)" % uuid.uuid4()
105 pcs.add(pc)
106
107 sorasink = SoraSink()
108
109 def log_info(msg, *args):
110 logger.info(pc_id + " " + msg, *args)
111
112 log_info("Created for %s", request.remote)
113
114 @pc.on("datachannel")
115 def on_datachannel(channel):
116 @channel.on("message")
117 def on_message(message):
118 if isinstance(message, str) and message.startswith("ping"):
119 channel.send("pong" + message[4:])
120
121 @pc.on("connectionstatechange")
122 async def on_connectionstatechange():
123 log_info("Connection state is %s", pc.connectionState)
124 if pc.connectionState == "failed":
125 await pc.close()
126 await sorasink.stop()
127 pcs.discard(pc)
128
129 @pc.on("track")
130 def on_track(track):
131 log_info("Track %s received", track.kind)
132
133 if track.kind == "video":
134 sorasink.addTrack(track)
135
136 @track.on("ended")
137 async def on_ended():
138 log_info("Track %s ended", track.kind)
139 await sorasink.stop()
140
141 await pc.setRemoteDescription(offer)
142 answer = await pc.createAnswer()
143 await pc.setLocalDescription(answer)
144 await sorasink.start()
145
146 return web.Response(
147 content_type="application/json",
148 text=json.dumps(
149 {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
150 ),
151 )
152
153
154 async def index(request):
155 return web.HTTPFound("/index.html")
156
157
158 async def on_shutdown(app):
159 coros = [pc.close() for pc in pcs]
160 await asyncio.gather(*coros)
161 pcs.clear()
162
163
164 @web.middleware
165 async def cors_middleware(request, handler):
166 headers = {
167 "Access-Control-Allow-Headers": "*",
168 "Access-Control-Allow-Methods": "*",
169 "Access-Control-Allow-Origin": "*",
170 }
171 if request.method == "OPTIONS":
172 return web.Response(headers=headers)
173 try:
174 response = await handler(request)
175 for key, value in headers.items():
176 response.headers[key] = value
177 return response
178 except web.HTTPException as e:
179 for key, value in headers.items():
180 e.headers[key] = value
181 raise e
182
183
184 def main():
185 from sub_densepose import sub_main
186
187 global ai_process
188 ai_process = Process(
189 target=sub_main,
190 args=(input_queue, output_queue, processed_count),
191 daemon=True,
192 )
193 ai_process.start()
194 logging.basicConfig(level=logging.INFO)
195
196 cert_file = "fullchain.pem"
197 key_file = "privkey.pem"
198 ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_SERVER)
199 ssl_context.load_cert_chain(cert_file, key_file)
200
201 app = web.Application(middlewares=[cors_middleware])
202 app.on_shutdown.append(on_shutdown)
203 app.router.add_get("/", index)
204 app.router.add_post("/offer", offer)
205 app.add_routes([web.static("/", "static", show_index=True)])
206
207 web.run_app(
208 app, access_log=None, host="0.0.0.0", port=7860, ssl_context=ssl_context
209 )
210
211
212 if __name__ == "__main__":
213 main()
プログラムを実行したらImageFlux Live StreamingのHLS視聴用URLにアクセスし、視聴できることを確認します。
まとめ
ブラウザからP2Pでさくらのクラウド高火力サーバに映像を転送し、AIフィルタを通した映像をImageFlux Live Streaming経由で配信するサンプルを紹介しました。セグメンテーションや物体検出、顔検出、その他AIが得意とする処理をクラウド上で実現できます。データチャンネルを使用してImageFlux Live Streamingの接続情報を送るとより汎用的に配信することができるでしょう。また、さくらの高火力サーバはWebAPIを使用して起動・停止ができるようなので、ワンクリックでサーバを起動・停止できると使い勝手もよくなります。ぜひ挑戦してください。ご質問、ご感想もお待ちしています。