OBS WHIPを使用してAV1映像を送受信するビデオプロセッシングAIサーバの構築

こんにちは、テリーです。OBS 30.2でWHIPという新機能の配信方式が追加されました。OBSというとRTMPでの配信が基本ですが、近年はそれ以外の配信プロトコル・コーデックも増えていました。今回はWHIPの紹介と、Pythonを使ったAV1コーデック対応のWHIP受信サーバの構築手順を紹介します。

対象読者

  • リモートのGPUサーバでビデオ・オーディオ処理をしたい人
  • Pythonのビデオプロセッシングサーバの構築方法を知りたい人
  • WHIP に興味がある人
  • AV1コーデックに興味がある人

動作確認環境

  • Ubuntu 22.04 LTS (x64)
  • Python 3.10.12
  • aiortc 1.9.0
  • aiohttp 3.10.9
  • VSCode 1.94.1
  • OBS 30.2.3

WHIPの紹介

WebRTCで通信する際には映像・音声コーデックを確認しあうための「シグナリング」という処理が最初に行われます。シグナリングにはWebSocketが使われてきましたが、WHIPではHTTP POSTを使い、リクエスト、レスポンスともにHTTPのボディ部で生のSDPを送ります。HTTPリクエストがoffer、そのレスポンスがanswerに相当します。

ブラウザでよく使われるWebSocketによるシグナリングとの違いは、配信開始時と配信終了時にしかやりとりをしないことです。また受信側から送信側に追加情報を伝達する手段がありません。片方向に配信する用途に機能を限定して、シンプルにしたシグナリングと言えます。まさにOBSのための仕様とも言えそうです。実際のところ、現時点でOBSはanswer SDPの多くを無視して配信を行います。

WHIP受信サーバの構築と動作確認

WHIPはSDP交換と切断のためのプロトコルですので、その後の映像・音声送受信処理は従来のWebRTCと全く変わりありません。PythonではaiortcというライブラリでWebRTCが処理できますので、それをWHIPに対応してみましょう。

受信側のサーバで下記のコマンドを実行し、ライブラリをインストールします。事前にvenvを使用するのがオススメですが解説は省略します。

pip install aiortc aiohttp

修正のベースとするプログラムは、公式サンプルのこちらです。WHIPのない時代のサンプルですが、HTTP POSTを使ってSDPを送受信しているのはほぼWHIPと同じです。下記のように修正することで、OBSの接続を受け付けることができます。

1.  import argparse
2.  import asyncio
3.  import json
4.  import logging
5.  import os
6.  import ssl
7.  import uuid
8.
9.  import cv2
10. from aiohttp import web
11. from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
12. from aiortc.contrib.media import MediaBlackhole, MediaPlayer, MediaRecorder, MediaRelay
13. from av import VideoFrame
14.
15. ROOT = os.path.dirname(__file__)
16.
17. logger = logging.getLogger("pc")
18. pcs = dict()
19. # pcs = set()
20. relay = MediaRelay()
21.
22. async def offer(request):
23.     # params = await request.json()
24.     # offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
25.
26.     # pc = RTCPeerConnection()
27.     # pc_id = "PeerConnection(%s)" % uuid.uuid4()
28.     # pcs.add(pc)
29.     if request.content_type != "application/sdp":
30.         return web.Response(status=400)
31.     offerText = await request.text()
32.     offer = RTCSessionDescription(sdp=offerText, type="offer")
33.     pc_id = request.match_info.get("pc_id", "Anonymous")
34.     pc = RTCPeerConnection()
35.     pcs[pc_id] = pc
36.
37.     def log_info(msg, *args):
38.         logger.info(pc_id + " " + msg, *args)
39.
40.     log_info("Created for %s", request.remote)
41.
42.     # prepare local media
43.     # player = MediaPlayer(os.path.join(ROOT, "demo-instruct.wav"))
44.     if args.record_to:
45.         recorder = MediaRecorder(args.record_to)
46.     else:
47.         recorder = MediaBlackhole()
48.
49.     # @pc.on("datachannel")
50.     # def on_datachannel(channel):
51.     #     @channel.on("message")
52.     #     def on_message(message):
53.     #         if isinstance(message, str) and message.startswith("ping"):
54.     #             channel.send("pong" + message[4:])
55.
56.     @pc.on("connectionstatechange")
57.     async def on_connectionstatechange():
58.         log_info("Connection state is %s", pc.connectionState)
59.         if pc.connectionState == "failed":
60.             await pc.close()
61.             pcs.discard(pc)
62.
63.     @pc.on("track")
64.     def on_track(track):
65.         log_info("Track %s received", track.kind)
66.
67.         if track.kind == "audio":
68.             # pc.addTrack(player.audio)
69.             recorder.addTrack(track)
70.         elif track.kind == "video":
71.             # pc.addTrack(
72.             #     VideoTransformTrack(
73.             #         relay.subscribe(track), transform=params["video_transform"]
74.             #     )
75.             # )
76.             if args.record_to:
77.                 recorder.addTrack(relay.subscribe(track))
78.
79.         @track.on("ended")
80.         async def on_ended():
81.             log_info("Track %s ended", track.kind)
82.             await recorder.stop()
83.
84.     # handle offer
85.     await pc.setRemoteDescription(offer)
86.     await recorder.start()
87.
88.     # send answer
89.     answer = await pc.createAnswer()
90.     await pc.setLocalDescription(answer)
91.
92.     # return web.Response(
93.     #     content_type="application/json",
94.     #     text=json.dumps(
95.     #         {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
96.     #     ),
97.     # )
98.     return web.Response(
99.         status=201,
100.        content_type="application/sdp",
101.        headers={"Location": f"/whip/{pc_id}"},
102.        text=pc.localDescription.sdp,
103.    )
104.
105.async def on_whip_disconnect(request):
106.    pc_id = request.match_info.get("pc_id", "Anonymous")
107.    logger.info(pc_id + " whip disconnect")
108.    pc = pcs.get(pc_id)
109.    if pc is not None:
110.        await pc.close()
111.        del pcs[pc_id]
112.    return web.Response()
113.
114.async def on_shutdown(app):
115.    # close peer connections
116.    # coros = [pc.close() for pc in pcs]
117.    coros = [pc.close() for pc in pcs.values()]
118.    await asyncio.gather(*coros)
119.    pcs.clear()
120.
121.
122.if __name__ == "__main__":
123.    parser = argparse.ArgumentParser(
124.        description="WebRTC audio / video / data-channels demo"
125.    )
126.    parser.add_argument("--cert-file", help="SSL certificate file (for HTTPS)")
127.    parser.add_argument("--key-file", help="SSL key file (for HTTPS)")
128.    parser.add_argument(
129.        "--host", default="0.0.0.0", help="Host for HTTP server (default: 0.0.0.0)"
130.    )
131.    parser.add_argument(
132.        "--port", type=int, default=8080, help="Port for HTTP server (default: 8080)"
133.    )
134.    parser.add_argument("--record-to", help="Write received media to a file.")
135.    parser.add_argument("--verbose", "-v", action="count")
136.    args = parser.parse_args()
137.
138.    if args.verbose:
139.        logging.basicConfig(level=logging.DEBUG)
140.    else:
141.        logging.basicConfig(level=logging.INFO)
142.
143.    if args.cert_file:
144.        ssl_context = ssl.SSLContext()
145.        ssl_context.load_cert_chain(args.cert_file, args.key_file)
146.    else:
147.        ssl_context = None
148.
149.    app = web.Application()
150.    app.on_shutdown.append(on_shutdown)
151.    # app.router.add_get("/", index)
152.    # app.router.add_get("/client.js", javascript)
153.    # app.router.add_post("/offer", offer)
154.    app.router.add_post("/whip/{pc_id}", offer)
155.    app.router.add_delete("/whip/{pc_id}", on_whip_disconnect)
156.    web.run_app(
157.        app, access_log=None, host=args.host, port=args.port, ssl_context=ssl_context
158.    )

主要な修正点を解説します。
18行目ではpcs変数をset型からdict型にしています。pc_idを指定して、特定の切断処理だけを行うためです。22行目ではVideoTransformTrackクラスをコメントアウト(削除)しています。本サンプルは映像受信専用にするため、VideoTransformTrackは使いません。

18. pcs = dict()
19. # pcs = set()

29〜35行目ではoffer SDPの受け取り方法を変更しています。元のサンプルではJSON形式で送られていますが、WHIPではボディがそのままSDPのテキストです。typeは"offer"と決まっています。念の為Content-Typeが"application/sdp"であることをチェックしています。35行目ではWHIP URLを元にIDを決め、PeerConnectionを辞書型で変数にキープします。後述の切断処理で使用します。

29.     if request.content_type != "application/sdp":
30.         return web.Response(status=400)
31.     offerText = await request.text()
32.     offer = RTCSessionDescription(sdp=offerText, type="offer")
33.     pc_id = request.match_info.get("pc_id", "Anonymous")
34.     pc = RTCPeerConnection()
35.     pcs[pc_id] = pc

63〜77行目では、pc.addTrackをコメントアウトしています。この処理は映像返信なので使用しません。OBS WHIPは映像を送る専用で、映像を受け取ることはできません。

63.     @pc.on("track")
64.     def on_track(track):
65.         log_info("Track %s received", track.kind)
66.
67.         if track.kind == "audio":
68.             # pc.addTrack(player.audio)
69.             recorder.addTrack(track)
70.         elif track.kind == "video":
71.             # pc.addTrack(
72.             #     VideoTransformTrack(
73.             #         relay.subscribe(track), transform=params["video_transform"]
74.             #     )
75.             # )
76.             if args.record_to:
77.                 recorder.addTrack(relay.subscribe(track))

92〜103行目ではanswer SDPを送っています。offer SDPと同様に、Content-Typeは"application/sdp"、ボディがJSONではなく、そのままSDPのテキストです。ヘッダにはLocationヘッダをつけて、切断用URLを通知する仕様です。ステータスコードも200ではなく201にするのがWHIPの仕様です。

92.     # return web.Response(
93.     #     content_type="application/json",
94.     #     text=json.dumps(
95.     #         {"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
96.     #     ),
97.     # )
98.     return web.Response(
99.         status=201,
100.        content_type="application/sdp",
101.        headers={"Location": f"/whip/{pc_id}"},
102.        text=pc.localDescription.sdp,
103.    )

105〜112行目は切断処理です。OBSで「配信終了」ボタンを押した時に呼ばれます。pcs変数から削除し、pc.close()関数を呼びます。レスポンスはステータス200でボディは空です。

105.async def on_whip_disconnect(request):
106.    pc_id = request.match_info.get("pc_id", "Anonymous")
107.    logger.info(pc_id + " whip disconnect")
108.    pc = pcs.get(pc_id)
109.    if pc is not None:
110.        await pc.close()
111.        del pcs[pc_id]
112.    return web.Response()

117行目ではpcs変数をset型からdict型に変えたために、.values()を付け足しています。

114.async def on_shutdown(app):
115.    # close peer connections
116.    # coros = [pc.close() for pc in pcs]
117.    coros = [pc.close() for pc in pcs.values()]
118.    await asyncio.gather(*coros)
119.    pcs.clear()

151行目から155行目では元のコードのHTML, JavaScriptのやりとりをコメントアウトし、WHIPの接続(POST)・切断(DELETE)処理を受け取るように追加しています。

151.    # app.router.add_get("/", index)
152.    # app.router.add_get("/client.js", javascript)
153.    # app.router.add_post("/offer", offer)
154.    app.router.add_post("/whip/{pc_id}", offer)
155.    app.router.add_delete("/whip/{pc_id}", on_whip_disconnect)

では、早速WHIP配信をしてみましょう。受信側でPythonを実行します。

python server.py

OBS側ではいくつか設定が必要です。「設定」ボタンを押し、「配信」ページを開きます。「サービス」から「WHIP」を選択し、「サーバー」欄には「http://(受信サーバのIPアドレス)/whip/hoge」を入れます。hogeの部分は、配信者を識別するための任意の文字列です。Bearerトークには「test」などを入れます。配信者が正規のユーザーであることを確認するための文字列です。不特定の人が使える公開サービスの場合は、Bearerトークンが正しい値かどうかの検証が必要です。

次に「出力」ページを開きます。映像エンコーダに「H264」のどれかを選びます。Macから配信する場合は「Apple VT H264 ハードウェアエンコーダ」、Windowsから配信する場合は「NVIDIA H264」を選択するとGPUを使ってエンコードされます。「x264」にするとCPUでエンコードされます。「レート制御」は回線が高速で安定している場合は品質固定の「CRF」、モバイル回線の場合はビットレート固定の「CBR」がオススメです。「キーフレーム」は2秒ごとがオススメです。0にすると10秒間隔になることもあり、パケット落ちが起こった場合に最大10秒間画面が砂まみれになることもあります。「Bフレームを使用する」のチェックは外します。Bフレームを使用するとエンコード・デコードともに追加の時間が必要になるため、遅延を0.1秒でも減らしたいライブ配信では使われません。

配信開始ボタンを押すと、Python側のログで接続が受け付けられたようなログが表示されます。「Connection state is connecting」と表示されればWHIPの送受信は正常です。その後、UDP接続が行われ、「Connection state is connected」というログが表示されればWebRTCの配信が始まります。その後に大量の赤い文字と「H264Decoder() failed to decode」というエラーが出てくることがありますが、20件程度でエラーログが止まった場合はプログラム通りの正常な動作です。OBSから送られたパケットのうち最初の方は受信側が受け取れないので、2秒に1回の次のキーフレームが検出できるまでエラーログが出続けます。キーフレームが検出できた後はログがピタリと出なくなります。

配信終了ボタンを押すと、Python側のログで切断が受け付けられたようなログが表示されます。「Connection state is closed」と表示されれば、WHIPによる切断は正常です。

次に映像として正常に受信できていることを確認します。動画保存処理に使用しているMediaRecorderクラスは実装が不十分で、送った動画のサイズと異なる640x480でエンコードされることがあります。
aiortc/contrib/media.py 446行目付近に下記のように追加し、送信するフレームサイズを指定します。

    def addTrack(self, track):
...
                stream = self.__container.add_stream("libx264", rate=30)
                stream.pix_fmt = "yuv420p"
                stream.width = 1280 # 追加
                stream.height = 720 # 追加
        self.__tracks[track] = MediaRecorderContext(stream)

server.pyのコマンドライン引数に「--record-to (ファイル名)」を追加して実行、OBSから配信し、30秒ほどして終了すると受信した映像・音声がmp4動画ファイルとしてサーバ側に保存されます。

python server.py --record-to test_h264.mp4

サーバ側では受信した映像・音声をそのままファイルに保存しているのではなく、デコードし、H264・AACコーデックでエンコードして保存しています。この動画が再生できたら、受信とデコードが正常であることを確認できます。

AV1に対応するパッチ

さて、OBSからH264コーデックで送ることができました。次にAV1コーデックで送りたいとします。AV1コーデックはH264から2世代も進化した最新のコーデックです。同じ画質でもビットレートは半分以下にできます。計算量はかなり増えますが、配信用パソコン、受信用パソコンともに余力がある場合は積極的に使いたいところです。OBS30.2ではAV1に対応していますが、サーバ側のaiortcはAV1もH265(HEVC)もVP9も実装されていません。そこでaiortcライブラリに下記のパッチを当てることで、AV1のデコードをすることができます。

pythonのライブラリフォルダから aiortc/codecs/__init__.py というファイルを探して、テキストエディタで開きます。venvを使っている場合は .venv/lib/python3.10/site-packages/aiortc/codecs/__init__.py というようなパスになります。

下記の4箇所にそれぞれ追加します。

# 16行目付近
from .vpx import Vp8Decoder, Vp8Encoder, vp8_depayload
from .av1 import AV1Decoder, av1_depayload # 追加

# 88行目付近
def init_codecs() -> None:
...
    add_video_codec("video/AV1") # VP8の上の行に追加
    add_video_codec("video/VP8")
    for profile_level_id in ("42001f", "42e01f"):
...

# 105行目付近
def depayload(codec: RTCRtpCodecParameters, payload: bytes) -> bytes:
    if codec.name == "VP8":
        return vp8_depayload(payload)
    elif codec.name == "H264":
        return h264_depayload(payload)
    elif codec.name == "AV1": # 追加
        return av1_depayload(payload) # 追加
    else:
        return payload

# 154行目付近
def get_decoder(codec: RTCRtpCodecParameters) -> Decoder:
...
    elif mimeType == "video/h264":
        return H264Decoder()
    elif mimeType == "video/vp8":
        return Vp8Decoder()
    elif mimeType == "video/av1": # 追加
        return AV1Decoder() # 追加
    else:
        raise ValueError(f"No decoder found for MIME type `{mimeType}`")

次に、aiortc/codecs/__init__.py と同じフォルダにav1.pyというファイルを新規作成し、下記のコードをペーストします。このコードはWebRTCで受信した分割パケットのうち、余分なところを切り落としてAV1デコーダが解釈できる形に結合しています。詳しくはこちらを参照ください。

import logging
from typing import List

import av
from av.frame import Frame

from ..jitterbuffer import JitterFrame
from ..mediastreams import VIDEO_TIME_BASE
from .base import Decoder

logger = logging.getLogger(__name__)

class AV1Decoder(Decoder):
    def __init__(self) -> None:
        self.codec = av.CodecContext.create("libaom-av1", "r")

    def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
        try:
            packet = av.Packet(encoded_frame.data)
            packet.pts = encoded_frame.timestamp
            packet.time_base = VIDEO_TIME_BASE
            frames = self.codec.decode(packet)
        except av.AVError as e:
            logger.warning(
                "AV1Decoder() failed to decode, skipping package: " + str(e)
            )
            return []
        return frames


def leb128(data, pos):
    val = 0
    shift = 0
    while True:
        b = data[pos]
        pos += 1
        val = val | ((b & 0x7F) << shift)
        if not b & 0x80:
            break
        shift += 7
    return pos, val


def av1_depayload(data: bytes) -> bytes:
    w_bit = int((data[0] & 0x30) >> 4)
    pos = 1
    bitstream = bytes()
    for _ in range(w_bit - 1):
        pos, val = leb128(data, pos)
        bitstream += data[pos:pos+val]
        pos += val
    val = len(data) - pos
    bitstream += data[pos:]
    return bitstream

OBSのエンコーダを「AOM AV1」に変え、レート制御を「CQP」、キーフレームを「2 s」にして配信開始ボタンを押します。

OBSの最下部に配信中のビットレートが表示されます。動きの激しいシーンは高ビットレートに、動きが少ないシーンでは低ビットレートになっているのが確認できます。

上述のH264で配信した時と同様に、mp4ファイルに保存します。念のため繰り返しますが、受信したパケットをそのまま保存しているのではなく、デコードしたものをH264・AACコーデックでエンコードして保存していますので、このmp4ファイル自体はAV1コーデックではありません。

python server.py --record-to test_av1.mp4

OBSからAV1コーデックでエンコードした映像を配信し、サーバ側で受信・デコードすることが確認できました。

リアルタイムビデオプロセッシングに対応

映像が受信できることは手段であって目的ではありません。その映像をリアルタイムに処理して何をしたいかを決めます。例えば物体検出、顔検出、異常検出などの検出系が考えられます。オーディオであれば文字起こし、文字描画もできるでしょう。ここでは顔検出のAIを追加してみましょう。

MediaPipeライブラリをインストール、モデルファイルをダウンロードします。

pip install mediapipe
wget https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/latest/face_landmarker.task

server.pyの123行目、on_shutdown関数の次とif __name__ == "__main__":の間に下記のコードを追加します。このコードのほとんどはMediaPipe FaceLandmarkの公式サンプルのままです。OpenCVのカメラ読み込み部分をtrack.recv()関数に変えています。

import mediapipe as mp
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np

async def run_track(track):
    from aiortc.contrib.media import MediaStreamError
    # 'https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/latest/face_landmarker.task'
    base_options = mp.tasks.BaseOptions(model_asset_path='face_landmarker.task')
    options = vision.FaceLandmarkerOptions(base_options=base_options,
                                        # output_face_blendshapes=True,
                                        # output_facial_transformation_matrixes=True,
                                        running_mode=vision.RunningMode.VIDEO,
                                        num_faces=1)
    detector = vision.FaceLandmarker.create_from_options(options)
    count = 0
    while True:
        try:
            frame = await track.recv()
            image = frame.to_ndarray(format = "rgb24")
            rgb_frame: mp.Image = mp.Image(image_format = mp.ImageFormat.SRGB, data = image)
            timestamp_ms = int(frame.time * 1000)
            detection_result = detector.detect_for_video(rgb_frame, timestamp_ms)
            annotated_image = draw_landmarks_on_image(rgb_frame.numpy_view(), detection_result)
            count += 1
            if count % 100 == 0 and count <= 1000:
                cv2.imwrite(f'z{count:03d}.jpg', cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))
        except MediaStreamError:
            break
        except:
            import traceback
            traceback.print_exc()


def draw_landmarks_on_image(rgb_image, detection_result):
  face_landmarks_list = detection_result.face_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected faces to visualize.
  for idx in range(len(face_landmarks_list)):
    face_landmarks = face_landmarks_list[idx]

    # Draw the face landmarks.
    face_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    face_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks
    ])

    solutions.drawing_utils.draw_landmarks(
        image=annotated_image,
        landmark_list=face_landmarks_proto,
        connections=mp.solutions.face_mesh.FACEMESH_TESSELATION,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp.solutions.drawing_styles
        .get_default_face_mesh_tesselation_style())
    solutions.drawing_utils.draw_landmarks(
        image=annotated_image,
        landmark_list=face_landmarks_proto,
        connections=mp.solutions.face_mesh.FACEMESH_CONTOURS,
        landmark_drawing_spec=None,
        connection_drawing_spec=mp.solutions.drawing_styles
        .get_default_face_mesh_contours_style())
    solutions.drawing_utils.draw_landmarks(
        image=annotated_image,
        landmark_list=face_landmarks_proto,
        connections=mp.solutions.face_mesh.FACEMESH_IRISES,
          landmark_drawing_spec=None,
          connection_drawing_spec=mp.solutions.drawing_styles
          .get_default_face_mesh_iris_connections_style())

  return annotated_image

server.py 78行目に下記の2行を追加します。

    @pc.on("track")
    def on_track(track):
...
        elif track.kind == "video":
            # pc.addTrack(
            #     VideoTransformTrack(
            #         relay.subscribe(track), transform=params["video_transform"]
            #     )
            # )
            relay_track = relay.subscribe(track, False) # 追加
            asyncio.create_task(run_track(relay_track)) # 追加
            if args.record_to:
                recorder.addTrack(relay.subscribe(track))

server.pyを実行し、OBSで人の顔が映った映像を配信すると下記のような画像が保存されます。

まとめ

OBSのWHIP配信機能を使ってAV1コーデックでエンコードした映像を配信し、リモートのGPUサーバで受信した映像をAIビデオプロセッシングするサンプルを紹介しました。本記事ではAV1のデコードのみですが、エンコード処理も同様に可能です。またH265コーデックも同様に可能です。ぜひ挑戦してみてください。より詳しい実装方法に興味がある方はお気軽にご相談ください。