関連する過去の記事
AnyTechの赤川です。
AnyTechでは、「手軽に試せる流体解析AIパッケージ」としてDeepLiquid Lite(以下、Liteと記載)というものを提供しております。DeepLiquid Liteはブラウザベースで稼働するソフトウェアとなっており、その開発メンバーとして参画しています。
今回は、Liteの推論には欠かせないカメラ機能を実装しているサーバ(以下、カメラサーバと記載)の実装を行う上で試行錯誤した内容を共有できればと思います。
技術スタック
今回解説するカメラサーバで利用されている主な技術スタックは以下になります。
- 開発言語:Python
- 利用ライブラリ
- opencv-python
- numpy
- fastapi
- threading
なお、Liteのバックエンドについては以下の記事で解説していますので、ご興味ある方は参照してもらえたらと思います。
そもそもLiteのカメラサーバとは?
Liteは、カメラサーバで取得したカメラのスナップショットを推論サーバにて取得して推論を実行しています。
カメラサーバが映像の取得ができなくなるとLiteの一番重要な部分である監視対象の分析ができなくなってしまうため、常に映像が取得できる状態を保証しておく必要があります。
OpenCVを用いた映像取得方法の基礎
LiteのバックエンドはPythonで実装されており、一部のカメラの種類を除いてOpenCVにより映像を取得しています。ネットワークカメラを例に挙げると、以下のように実装するとネットワークカメラから映像を取得することができます。
import cv2 NETWORK_CAMERA_URL = "..." capture = cv2.VideoCapture(NETWORK_CAMERA_URL) while capture.isOpened(): ret, frame = capture.read() if not ret: break ... capture.release()
OpenCVで映像を取得するためにはcv2.VideoCapture
を利用します。キャプチャが開かれている間はcv2.VideoCapture.isOpened
関数の戻り値がTrueになるので、このフラグ値がTrueの間キャプチャを読み取る実装をしています。
キャプチャからフレームを取得するにはcv2.VideoCapture.read
関数を利用することで取得できます。この関数の戻り値は二つあり、一つ目はフレームが読み込まれた場合はTrue、読み込めずNULLだった場合はFalseになります。そのため、一つ目の戻り値がFalseだった場合は映像取得のループを終了させるように実装します。
キャプチャの利用を終了する際はcv2.VideoCapture.release
関数を呼び出してコネクションを終了させます。
以上がOpenCVでキャプチャを開いて映像をフレームとして取得する方法の基礎になります。
スレッド化によるブロッキングの解消
なぜブロッキングを解消する必要があったのか
先ほどのコードを実行すれば映像が取得できる間は撮り続けることができ、映像が取得できない状態にあると自動的にキャプチャが閉じることになります。しかし、上記コードをそのまま利用してしまうと映像取得部分の処理でブロックが発生してしまい、その他の機能にアクセスできなくなるため、スレッドを用いて映像を取得する必要がありました。
実験として、先ほどのコードがブロッキング要因となることを確認するため、以下のようにfastapiを用いたAPIを実装することにしましょう。
# no_thread_main.py from fastapi import FastAPI import cv2 app = FastAPI() @app.get("/hoge") async def hoge(): cap = cv2.VideoCapture("...") while cap.isOpened(): ret, frame = cap.read() if not ret: break cap.release() return "hoge" @app.get("/fuga") async def fuga(): return "fuga"
このコードでは、/hoge
へGETメソッドを実行するとキャプチャを開いて映像を取得し続けるように実装しており、/fuga
へGETメソッドを実行するとfugaとテキストが返るようになっています。この時、理想的には/hoge
と/fuga
を同時に呼び出した際にキャプチャの映像を取得しつつfugaというテキストが取得されて欲しいですが、実際には/hoge
を呼び出した瞬間にキャプチャを読み込む処理の部分でブロッキングが発生し、以降APIアクセスを試みてもアクセスが実行できなくなります。なお、/hoge
へのアクセスを中断した場合、しばらくすると/fuga
へアクセスできるようになります。
# サーバ起動 uvicorn no_thread_main:app --reload # GET /fuga curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json' > "fuga" # GET /hoge curl -X 'GET' 'http://localhost:8000/hoge' -H 'accept: application/json' > 待ってもレスポンスはなくタイムアウトします # GET /fuga(/hoge呼び出し中) curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json' > 待ってもレスポンスはなくタイムアウトします # GET /fuga(/hoge呼び出し中断後) curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json' > "fuga"
スレッドを用いたブロッキングの回避
キャプチャを開き映像を取得しつつ読み取り部分でのブロッキングを回避するために、スレッドを利用してキャプチャを扱うクラスを実装しました。
例えば以下のようなCameraThread
クラスを実装することで映像取得機能をスレッドとして扱うことができます。
※ terminate関数を実装している理由としては、任意のタイミングでスレッドを終了したいタイミングがあった場合を想定して記載しています。
※ temrminate関数が例えば同時に呼ばれるような状況があったと仮定すると、thread.joinが同時に呼ばれる可能性があるなどの問題が起こりうるので、例えばthreading.Lock
モジュールを利用した排他制御を導入してもいいかもしれません
from threading import Thread import cv2 import numpy as np class CameraThread: def __init__(self, network_camera_url: str): self.thread: Thread | None = None self.is_running: bool = False self.capture: cv2.VideoCapture | None = None self.NETWORK_CAMERA_URL = network_camera_url self.frame: np.ndarray | None = None def _read_frame(self): self.is_running = True while self.is_running: ret, self.frame = self.capture.read() if not ret: break ... self.is_running = False def start(self): if self.is_running: raise Exception("Camera thread is already running") self.capture = cv2.VideoCapture(self.NETWORK_CAMERA_URL) self.thread = Thread(target=self._read_frame, daemon=True) self.thread.start() def terminate(self): if not self.is_running: raise Exception("Thread is not running") self.is_running = False self.thread.join() self.capture.release()
CameraThread
クラスはスレッドにてカメラの映像取得部分をスレッドにて実装したクラスとなります。このクラスを使うとブロッキングを発生させることなくカメラの映像を取得することが可能になります。例えば以下のように利用できます。
camera_thread = CameraThread("...")
camera_thread.start()
...
CameraThread
クラスを利用すると、例えば先ほどのAPI実装の例は以下のように書き換えることができます。
from fastapi import FastAPI import cv2 from camera_thread import CameraThread app = FastAPI() @app.get("/hoge") async def hoge(): camera_thread = CameraThread("...") camera_thread.start() return "hoge" @app.get("/fuga") async def fuga(): return "fuga"
上記のAPIを実際に実行すると、/hoge
をアクセスした後に/fuga
を呼び出すことが可能になります。しかし、上記コードでは実行されたスレッドの終了タイミングが明示されておらず、かつローカルスコープで定義されているスレッドのため、他の場所から該当スレッドを終了することができません。threading.Thread
でスレッドを作成する際にdaemon=True
を設定しているので、実行したPythonのコードが終了するタイミングでスレッドが終了されることを期待できる実装にはしているものの、明示的にスレッドを終了させない実装は一般的に問題のあるコードとして扱われることが多いかと思います。そのため、単純にスレッドを導入するだけではうまくいかないケースがありますので注意ください。
後ほどクラス変数を利用した実装の項目がありますが、クラス変数を利用すると個別のスコープ内で生成されたスレッドが終了されないままプログラムが終了することを防ぎやすい実装にすることができます。
スレッドを利用する上で悩んだこと
CameraThread
クラスの実装自体はとてもシンプルだと思いますが、スレッドを実装するにあたり個人的に気をつけているところがあったため、スレッドを利用しなくても何とか実装できないかとても悩みました(結論、スレッドを用いることがベストだと判断しこのような実装を採用しました)。スレッドを利用するにあたり、普段意識している項目は以下です。
- スレッドを導入するとスレッドの状態管理をする必要があるが、管理を適切に行わないと処理が正常に行われない
- スレッドを導入すると実装や処理の流れが複雑になりやすいため、新規にコードをメンテナンスするエンジニアが該当部分を見た時に理解が難しくなりやすい
1について、スレッドを導入するとターゲット関数がどのようなステータスを持って実行しているか(実行中か完了中、エラーが起こっているかなど)を管理するための仕組みが必要になるため、スレッドを用いない場合と比べて管理対象が増える傾向があると思います。そのため、もし同じことを実現しようとした時にスレッドを利用しなくても実装できる場合はその方法についても検討した方がいいと考えています。
2について、スレッドを多用してしまうと処理の流れが追いづらくなりやすいと考えています。例えば今回の例のように単一のクラスでスレッドを扱う分にはまだ極端に煩雑なコードにはなりにくいですが、スレッドを複数箇所で利用し始めると、処理の流れが追いにくくなります。
他にも気にすべき点はあるかと思いますが、スレッドを利用する際は慎重に検討すべきだと考えます。
クラス変数を利用した実装への修正
CameraThread
クラスを用いるとブロッキングさせることなくカメラの映像を取得できると説明しました。しかし問題点としてCameraThread
を同じカメラURLに対して複数箇所で使いたい場合に、個別にインスタンスが必要になってしまう問題があるという問題があります。例えばネットワークカメラでは最大接続数が限られている場合もあり、常に任意の数のキャプチャを開けるとは限りません。そのため、同じカメラに対して複数のCameraThread
を立てて利用することには制限がある可能性があります(Liteのユースケースで言うと、カメラサーバにはスナップショットを撮影して取得するためのAPIの他にカメラ映像を配信するAPIも実装されており、同じカメラに対して複数APIからアクセスする必要があります)。
この問題に対処するために、クラス変数を利用して共通のスレッドとして複数箇所からアクセスできる実装に変更する必要がありました。そのため、CameraThread
を以下のように実装し直しました。
※ classmethod
を用いて実装していますが、staticmethod
でも実装可能です
※ Liteでは一つのカメラサーバでは一つのカメラしか取り扱わない仕様にしていたため以下の実装で問題ありません。一つのサーバで複数のカメラを扱う場合はクラス変数を利用する手法では複雑になってしまう可能性がありますが、ここでは説明は省略します。
from threading import Thread import cv2 import numpy as np NETWORK_CAMERA_URL = "..." class CameraThreadClassVariable: thread: Thread | None = None is_running: bool = False capture: cv2.VideoCapture | None = None NERTWORK_CAMERA_URL = NETWORK_CAMERA_URL frame: np.ndarray | None = None @classmethod def _read_frame(cls): cls.is_running = True while self.is_running: ret, cls.frame = cls.capture.read() if not ret: break ... cls.is_running = False @classmethod def start(cls): if cls.is_running: raise Exception("Camera thread is already running") cls.capture = cv2.VideoCapture(self.NETWORK_CAMERA_URL) cls.thread = Thread(target=cls._read_frame, daemon=True) cls.thread.start() @classmethod def terminate(cls): if not cls.is_running: raise Exception("Thread is not running") cls.is_running = False cls.thread.join() cls.capture.release()
CameraThreadClassVariable
を利用すると、例えばFastAPIで実装された二つのAPI(PNGとJPGの形式で撮影した画像を取得するAPIの実装)から同時に映像にアクセスが可能となります。
※ CameraThreadClassVariable
を初期化するために、APIの実行時にstart_camera_threadを実行しています。time.sleep(1.0)をしているのはスレッドが開始した直後は映像がCameraThreadClassVariable.frame
に格納されていない可能性があるためです。この値は1.0ではなく映像が確実に取得できる時間を最低限設定する必要があります。
from fastapi import FastAPI from fastapi.responses import Response from camera_thread import CameraThreadClassVariable import time import numpy as np import cv2 import io app = FastAPI() def start_camera_thread(): if not CameraThreadClassVariable.is_running: CameraThread.start() time.sleep(1.0) def encode_image(img: np.ndarray, _format: str) -> io.BytesIO: encoded_image = cv2.imencode(_format, img)[1] return io.BytesIO(encoded_image.tobytes()) @app.get("/image/png") async def get_image_png(): start_camera_thread() frame = CameraThreadClassVariable.frame frame_png = encode_image(frame, "png") return Response( content=frame_png, media_type="image/png" ) @app.get("/image/jpg") async def get_image_jpeg(): start_camera_thread() frame = CameraThreadClassVariable.frame frame_png = encode_image(frame, "jpeg") return Response( content=frame_png, media_type="image/jpeg" )
上記実装を用いることで、例えばGET /image/png
とGET /image/jpg
が同時にアクセスされても映像を取得することができます(実行結果は省略しますが、実際にアクセスいただければ画像のバイナリーデータが取得できます)。
常にカメラ撮影スレッドが利用されていることを保証する方法
CameraThreadClassVariable
を利用すると複数箇所から同時に呼ばれるようなユースケースは実装しました。しかし、常にスレッドが実行されている保証はなく、APIが呼ばれるタイミングでスレッドが実行されていない時だけスレッドが開始されるようになっています。
例えばキャプチャが何らかの理由によって終了してしまった後も再開させたい場合は、CameraThreadClassVariable
の状態を常に監視するプロセスが必要になります。例えば以下のようなクラスを用いてCameraThreadClassVariable
の稼働状況を常に監視することができます。
from threading import Thread from camera_thread import CameraThreadClassVariable class CameraThreadManager: camera_thread_manager_thread: Thread | None = None @classmethod def _manage_camera_thread(cls): while True: if CameraThreadClassVariable.is_running: continue CameraThreadClassVariable.start() @classmethod def start(cls): cls.camera_thread_manager_thread = Thread( target=cls._manage_camera_thread, ) cls.camra_thread_manager_thread.start()
ただし、CameraThreadManager
が正常に動いているかどうかを判定する必要があるため、CameraThreadManager
を実装すれば必ずCameraThreadClassVariable
が常にキャプチャを開いていることを保証するわけではないことに注意してください。
例えばDockerを使っている場合、CameraThreadClassVariable
がキャプチャが開いているかを取得するAPIを作成し、それをヘルスチェック用のAPIとして用意しておけば、一定回数キャプチャが開いていない場合はサーバ自体を再起動させるといった対応も取れるため、必要に応じてこのような機能を実装するのもいいかと思います。
まとめ
今回はLiteのバックエンド技術のなかでカメラサーバの実装について紹介させていただきました。OpenCVを用いてカメラ映像を取得する方法を検討中の方はぜひ参考にしていただければ幸いです。