Gリポートの開発を担当している趙です。
前回は無料で使えるliveKitを構築をしてみました。
引き続き今回、このサーバーを利用して動画配信をしたいと思います。*ただし映像だけ
利用するSDK
前回ご紹介したように、LiveKitには豊富なSDKが用意されており、
さまざまな言語での実装がサポートされています。
動画配信の方法としてよく使われるのはFFmpegですが、
今後は動画をフレームごとに処理しながら配信する可能性も考慮し、今回はPythonライブラリを利用して実装します。
動画配信のサンプルとして publish_hue.py
が用意されており、それをベースに改造します。
サンプルコードの概要
ビデオの色相(Hue)を周期的に変化させることで、色が変わるビデオストリームを作成して、指定したルームに配信します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import asyncio import colorsys import logging import os from signal import SIGINT, SIGTERM import numpy as np from livekit import api, rtc WIDTH, HEIGHT = 1280, 720 # ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set async def main(room: rtc.Room): token = ( api.AccessToken() .with_identity("python-publisher") .with_name("Python Publisher") .with_grants( api.VideoGrants( room_join=True, room="my-room", ) ) .to_jwt() ) url = os.getenv("LIVEKIT_URL") logging.info("connecting to %s", url) try: await room.connect(url, token) logging.info("connected to room %s", room.name) except rtc.ConnectError as e: logging.error("failed to connect to the room: %s", e) return # publish a track source = rtc.VideoSource(WIDTH, HEIGHT) track = rtc.LocalVideoTrack.create_video_track("hue", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_CAMERA publication = await room.local_participant.publish_track(track, options) logging.info("published track %s", publication.sid) asyncio.ensure_future(draw_color_cycle(source)) async def draw_color_cycle(source: rtc.VideoSource): argb_frame = bytearray(WIDTH * HEIGHT * 4) arr = np.frombuffer(argb_frame, dtype=np.uint8) framerate = 1 / 30 hue = 0.0 while True: start_time = asyncio.get_event_loop().time() rgb = colorsys.hsv_to_rgb(hue, 1.0, 1.0) rgb = [(x * 255) for x in rgb] # type: ignore argb_color = np.array(rgb + [255], dtype=np.uint8) arr.flat[::4] = argb_color[0] arr.flat[1::4] = argb_color[1] arr.flat[2::4] = argb_color[2] arr.flat[3::4] = argb_color[3] frame = rtc.VideoFrame(WIDTH, HEIGHT, rtc.VideoBufferType.RGBA, argb_frame) source.capture_frame(frame) hue = (hue + framerate / 3) % 1.0 code_duration = asyncio.get_event_loop().time() - start_time await asyncio.sleep(1 / 30 - code_duration) if __name__ == "__main__": logging.basicConfig( level=logging.INFO, handlers=[logging.FileHandler("publish_hue.log"), logging.StreamHandler()], ) loop = asyncio.get_event_loop() room = rtc.Room(loop=loop) async def cleanup(): await room.disconnect() loop.stop() asyncio.ensure_future(main(room)) for signal in [SIGINT, SIGTERM]: loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup())) try: loop.run_forever() finally: loop.close() |
main関数とdraw_color_cycle関数に注目したいです。
main関数の役割
- トークンの生成と接続: LiveKitサーバーへの接続に必要なトークンを生成し、サーバーに接続します。
- ビデオトラックの公開: 新しいビデオソースとトラックを作成し、ルームに公開します。
- ビデオストリームの更新: 色が変わるビデオストリームを非同期で描画するタスクを開始します。
draw_color_cycle関数は以下のように機能します
- ビデオフレームの準備: ビデオフレームを格納するバッファを準備し、numpy配列を使ってアクセスします。
- 色の変化: 色相(hue)を周期的に変化させ、HSVからRGBに変換し、ビデオフレームに設定します。
- フレームのキャプチャ: 変更されたビデオフレームをビデオソースにキャプチャします。
- フレームレートの管理: フレームレートに合わせて処理を調整し、ループを継続します。
ここでやりたいことは、draw_color_cycleをまねて、色相の代わりに動画フレームをキャプチャすること
コード改造
- まずは動画のフレームをキャプチャすることから
Pythonに馴染みのある方ならよく目にするOpenCV(cv2)を使ってフレーム抽出します。
*テスト用動画はこちら
1 2 3 4 |
video_capture = cv2.VideoCapture(INPUT) while True: success, frame = video_capture.read() |
video_capture.read()が成功した場合、frameに動画のフレームデータが保存されるが、
フレームそのままでは配信できないため、rtc.VideoFrameに変換することが必要で、
まずはframeの形式を調べます。
1 2 3 4 5 6 7 8 9 10 11 12 |
num_channels = frame.shape[2] if len(frame.shape) == 3 else 1 # # Determine the frame type based on the number of channels if num_channels == 1: print("Grayscale frame") elif num_channels == 3: print("RGB frame") elif num_channels == 4: print("RGBA frame") elif num_channels == 2: print("YUV frame") else: |
結果RGB frameでした、アルファチャンネルを追加してフレーム変換
1 2 3 4 5 6 7 |
alpha_channel = np.full((frame.shape[0], frame.shape[1]), 255, dtype=np.uint8) rgba_frame = cv2.merge((frame, alpha_channel)) height, width = frame.shape[:2] # get frame size new_frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.BGRA, rgba_frame.tobytes()) |
あとはできたnew_frameをキャプチャするだけ
1 |
source.capture_frame(new_frame) |
動作チェック
前回構築したLiveKitサーバーに配信してみました!
エコモットでは、WebRTCをはじめとするさまざまな技術を活用し、
IoTに関連した様々な案件に取り組んでいます。
私たちと一緒にモノづくりに挑戦していただける方を募集中です。