AnyTech Engineer Blog

AnyTech Engineer Blogは、AnyTechのエンジニアたちによる調査や成果、Tipsなどを公開するブログです。

DeepLiquid Lite開発について ~カメラサーバ実装編~

DeepLiquid Lite開発について ~カメラサーバ実装編~

関連する過去の記事

tech.anytech.co.jp

AnyTechの赤川です。

AnyTechでは、「手軽に試せる流体解析AIパッケージ」としてDeepLiquid Lite(以下、Liteと記載)というものを提供しております。DeepLiquid Liteはブラウザベースで稼働するソフトウェアとなっており、その開発メンバーとして参画しています。

今回は、Liteの推論には欠かせないカメラ機能を実装しているサーバ(以下、カメラサーバと記載)の実装を行う上で試行錯誤した内容を共有できればと思います。

技術スタック

今回解説するカメラサーバで利用されている主な技術スタックは以下になります。

  • 開発言語:Python
  • 利用ライブラリ
    • opencv-python
    • numpy
    • fastapi
    • threading

なお、Liteのバックエンドについては以下の記事で解説していますので、ご興味ある方は参照してもらえたらと思います。

tech.anytech.co.jp

そもそもLiteのカメラサーバとは?

Liteは、カメラサーバで取得したカメラのスナップショットを推論サーバにて取得して推論を実行しています。

カメラサーバが映像の取得ができなくなるとLiteの一番重要な部分である監視対象の分析ができなくなってしまうため、常に映像が取得できる状態を保証しておく必要があります。

OpenCVを用いた映像取得方法の基礎

LiteのバックエンドはPythonで実装されており、一部のカメラの種類を除いてOpenCVにより映像を取得しています。ネットワークカメラを例に挙げると、以下のように実装するとネットワークカメラから映像を取得することができます。

import cv2

NETWORK_CAMERA_URL = "..."

capture = cv2.VideoCapture(NETWORK_CAMERA_URL)

while capture.isOpened():
    ret, frame = capture.read()
    if not ret:
        break
        
    ...
    
capture.release()

OpenCVで映像を取得するためにはcv2.VideoCaptureを利用します。キャプチャが開かれている間はcv2.VideoCapture.isOpened関数の戻り値がTrueになるので、このフラグ値がTrueの間キャプチャを読み取る実装をしています。

キャプチャからフレームを取得するにはcv2.VideoCapture.read関数を利用することで取得できます。この関数の戻り値は二つあり、一つ目はフレームが読み込まれた場合はTrue、読み込めずNULLだった場合はFalseになります。そのため、一つ目の戻り値がFalseだった場合は映像取得のループを終了させるように実装します。

キャプチャの利用を終了する際はcv2.VideoCapture.release関数を呼び出してコネクションを終了させます。

以上がOpenCVでキャプチャを開いて映像をフレームとして取得する方法の基礎になります。

スレッド化によるブロッキングの解消

なぜブロッキングを解消する必要があったのか

先ほどのコードを実行すれば映像が取得できる間は撮り続けることができ、映像が取得できない状態にあると自動的にキャプチャが閉じることになります。しかし、上記コードをそのまま利用してしまうと映像取得部分の処理でブロックが発生してしまい、その他の機能にアクセスできなくなるため、スレッドを用いて映像を取得する必要がありました。

実験として、先ほどのコードがブロッキング要因となることを確認するため、以下のようにfastapiを用いたAPIを実装することにしましょう。

# no_thread_main.py
from fastapi import FastAPI
import cv2

app = FastAPI()

@app.get("/hoge")
async def hoge():
    cap = cv2.VideoCapture("...")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

    cap.release()
    return "hoge"


@app.get("/fuga")
async def fuga():
    return "fuga"

このコードでは、/hogeへGETメソッドを実行するとキャプチャを開いて映像を取得し続けるように実装しており、/fugaへGETメソッドを実行するとfugaとテキストが返るようになっています。この時、理想的には/hoge/fugaを同時に呼び出した際にキャプチャの映像を取得しつつfugaというテキストが取得されて欲しいですが、実際には/hogeを呼び出した瞬間にキャプチャを読み込む処理の部分でブロッキングが発生し、以降APIアクセスを試みてもアクセスが実行できなくなります。なお、/hogeへのアクセスを中断した場合、しばらくすると/fugaへアクセスできるようになります。

# サーバ起動
uvicorn no_thread_main:app --reload

# GET /fuga
curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json'
> "fuga"

# GET /hoge
curl -X 'GET' 'http://localhost:8000/hoge' -H 'accept: application/json'
> 待ってもレスポンスはなくタイムアウトします

# GET /fuga(/hoge呼び出し中)
curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json'
> 待ってもレスポンスはなくタイムアウトします

# GET /fuga(/hoge呼び出し中断後)
curl -X 'GET' 'http://localhost:8000/fuga' -H 'accept: application/json'
> "fuga"

スレッドを用いたブロッキングの回避

キャプチャを開き映像を取得しつつ読み取り部分でのブロッキングを回避するために、スレッドを利用してキャプチャを扱うクラスを実装しました。

例えば以下のようなCameraThreadクラスを実装することで映像取得機能をスレッドとして扱うことができます。

※ terminate関数を実装している理由としては、任意のタイミングでスレッドを終了したいタイミングがあった場合を想定して記載しています。

※ temrminate関数が例えば同時に呼ばれるような状況があったと仮定すると、thread.joinが同時に呼ばれる可能性があるなどの問題が起こりうるので、例えばthreading.Lockモジュールを利用した排他制御を導入してもいいかもしれません

from threading import Thread
import cv2
import numpy as np


class CameraThread:
    def __init__(self, network_camera_url: str):
        self.thread: Thread | None = None
        self.is_running: bool = False
        self.capture: cv2.VideoCapture | None = None
        self.NETWORK_CAMERA_URL = network_camera_url
        self.frame: np.ndarray | None = None
        
    def _read_frame(self):
            self.is_running = True
            
        while self.is_running:
            ret, self.frame = self.capture.read()
            if not ret:
                break
                
            ...
                
        self.is_running = False
        
    def start(self):
        if self.is_running:
            raise Exception("Camera thread is already running")
            
        self.capture = cv2.VideoCapture(self.NETWORK_CAMERA_URL)
        self.thread = Thread(target=self._read_frame, daemon=True)
        self.thread.start()
          
    def terminate(self):
        if not self.is_running:
            raise Exception("Thread is not running")
            
        self.is_running = False
        self.thread.join()
        self.capture.release()

CameraThreadクラスはスレッドにてカメラの映像取得部分をスレッドにて実装したクラスとなります。このクラスを使うとブロッキングを発生させることなくカメラの映像を取得することが可能になります。例えば以下のように利用できます。

camera_thread = CameraThread("...")
camera_thread.start()
...

CameraThreadクラスを利用すると、例えば先ほどのAPI実装の例は以下のように書き換えることができます。

from fastapi import FastAPI
import cv2
from camera_thread import CameraThread

app = FastAPI()

@app.get("/hoge")
async def hoge():
    camera_thread = CameraThread("...")
    camera_thread.start()
    return "hoge"


@app.get("/fuga")
async def fuga():
    return "fuga"

上記のAPIを実際に実行すると、/hogeをアクセスした後に/fugaを呼び出すことが可能になります。しかし、上記コードでは実行されたスレッドの終了タイミングが明示されておらず、かつローカルスコープで定義されているスレッドのため、他の場所から該当スレッドを終了することができません。threading.Threadでスレッドを作成する際にdaemon=Trueを設定しているので、実行したPythonのコードが終了するタイミングでスレッドが終了されることを期待できる実装にはしているものの、明示的にスレッドを終了させない実装は一般的に問題のあるコードとして扱われることが多いかと思います。そのため、単純にスレッドを導入するだけではうまくいかないケースがありますので注意ください。

後ほどクラス変数を利用した実装の項目がありますが、クラス変数を利用すると個別のスコープ内で生成されたスレッドが終了されないままプログラムが終了することを防ぎやすい実装にすることができます。

スレッドを利用する上で悩んだこと

CameraThreadクラスの実装自体はとてもシンプルだと思いますが、スレッドを実装するにあたり個人的に気をつけているところがあったため、スレッドを利用しなくても何とか実装できないかとても悩みました(結論、スレッドを用いることがベストだと判断しこのような実装を採用しました)。スレッドを利用するにあたり、普段意識している項目は以下です。

  1. スレッドを導入するとスレッドの状態管理をする必要があるが、管理を適切に行わないと処理が正常に行われない
  2. スレッドを導入すると実装や処理の流れが複雑になりやすいため、新規にコードをメンテナンスするエンジニアが該当部分を見た時に理解が難しくなりやすい

1について、スレッドを導入するとターゲット関数がどのようなステータスを持って実行しているか(実行中か完了中、エラーが起こっているかなど)を管理するための仕組みが必要になるため、スレッドを用いない場合と比べて管理対象が増える傾向があると思います。そのため、もし同じことを実現しようとした時にスレッドを利用しなくても実装できる場合はその方法についても検討した方がいいと考えています。

2について、スレッドを多用してしまうと処理の流れが追いづらくなりやすいと考えています。例えば今回の例のように単一のクラスでスレッドを扱う分にはまだ極端に煩雑なコードにはなりにくいですが、スレッドを複数箇所で利用し始めると、処理の流れが追いにくくなります。

他にも気にすべき点はあるかと思いますが、スレッドを利用する際は慎重に検討すべきだと考えます。

クラス変数を利用した実装への修正

CameraThread クラスを用いるとブロッキングさせることなくカメラの映像を取得できると説明しました。しかし問題点としてCameraThreadを同じカメラURLに対して複数箇所で使いたい場合に、個別にインスタンスが必要になってしまう問題があるという問題があります。例えばネットワークカメラでは最大接続数が限られている場合もあり、常に任意の数のキャプチャを開けるとは限りません。そのため、同じカメラに対して複数のCameraThreadを立てて利用することには制限がある可能性があります(Liteのユースケースで言うと、カメラサーバにはスナップショットを撮影して取得するためのAPIの他にカメラ映像を配信するAPIも実装されており、同じカメラに対して複数APIからアクセスする必要があります)。

この問題に対処するために、クラス変数を利用して共通のスレッドとして複数箇所からアクセスできる実装に変更する必要がありました。そのため、CameraThreadを以下のように実装し直しました。

classmethodを用いて実装していますが、staticmethodでも実装可能です

※ Liteでは一つのカメラサーバでは一つのカメラしか取り扱わない仕様にしていたため以下の実装で問題ありません。一つのサーバで複数のカメラを扱う場合はクラス変数を利用する手法では複雑になってしまう可能性がありますが、ここでは説明は省略します。

from threading import Thread
import cv2
import numpy as np


NETWORK_CAMERA_URL = "..."


class CameraThreadClassVariable:
    thread: Thread | None = None
    is_running: bool = False
    capture: cv2.VideoCapture | None = None
    NERTWORK_CAMERA_URL = NETWORK_CAMERA_URL
    frame: np.ndarray | None = None
    
    @classmethod
    def _read_frame(cls):
            cls.is_running = True
            
        while self.is_running:
            ret, cls.frame = cls.capture.read()
            if not ret:
                break
                
            ...
                
        cls.is_running = False
        
    @classmethod
    def start(cls):
        if cls.is_running:
            raise Exception("Camera thread is already running")
            
        cls.capture = cv2.VideoCapture(self.NETWORK_CAMERA_URL)
        cls.thread = Thread(target=cls._read_frame, daemon=True)
        cls.thread.start()
          
    @classmethod
    def terminate(cls):
        if not cls.is_running:
            raise Exception("Thread is not running")
            
        cls.is_running = False
        cls.thread.join()
        cls.capture.release()

CameraThreadClassVariableを利用すると、例えばFastAPIで実装された二つのAPI(PNGとJPGの形式で撮影した画像を取得するAPIの実装)から同時に映像にアクセスが可能となります。

CameraThreadClassVariableを初期化するために、APIの実行時にstart_camera_threadを実行しています。time.sleep(1.0)をしているのはスレッドが開始した直後は映像がCameraThreadClassVariable.frameに格納されていない可能性があるためです。この値は1.0ではなく映像が確実に取得できる時間を最低限設定する必要があります。

from fastapi import FastAPI
from fastapi.responses import Response
from camera_thread import CameraThreadClassVariable
import time
import numpy as np
import cv2
import io

app = FastAPI()

def start_camera_thread():
    if not CameraThreadClassVariable.is_running:
        CameraThread.start()
        time.sleep(1.0)
        
        
def encode_image(img: np.ndarray, _format: str) -> io.BytesIO:
    encoded_image = cv2.imencode(_format, img)[1]
    return io.BytesIO(encoded_image.tobytes())
    

@app.get("/image/png")
async def get_image_png():
    start_camera_thread()
    frame = CameraThreadClassVariable.frame
    frame_png = encode_image(frame, "png")
    return Response(
        content=frame_png,
        media_type="image/png"
    )


@app.get("/image/jpg")
async def get_image_jpeg():
    start_camera_thread()
    frame = CameraThreadClassVariable.frame
    frame_png = encode_image(frame, "jpeg")
    return Response(
        content=frame_png,
        media_type="image/jpeg"
    )

上記実装を用いることで、例えばGET /image/pngGET /image/jpgが同時にアクセスされても映像を取得することができます(実行結果は省略しますが、実際にアクセスいただければ画像のバイナリーデータが取得できます)。

常にカメラ撮影スレッドが利用されていることを保証する方法

CameraThreadClassVariableを利用すると複数箇所から同時に呼ばれるようなユースケースは実装しました。しかし、常にスレッドが実行されている保証はなく、APIが呼ばれるタイミングでスレッドが実行されていない時だけスレッドが開始されるようになっています。

例えばキャプチャが何らかの理由によって終了してしまった後も再開させたい場合は、CameraThreadClassVariableの状態を常に監視するプロセスが必要になります。例えば以下のようなクラスを用いてCameraThreadClassVariableの稼働状況を常に監視することができます。

from threading import Thread
from camera_thread import CameraThreadClassVariable


class CameraThreadManager:
    camera_thread_manager_thread: Thread | None = None
        
    @classmethod
    def _manage_camera_thread(cls):
        while True:
        if CameraThreadClassVariable.is_running:
            continue
                        
        CameraThreadClassVariable.start()
            
    @classmethod     
    def start(cls):
        cls.camera_thread_manager_thread = Thread(
        target=cls._manage_camera_thread,
    )
    cls.camra_thread_manager_thread.start()

ただし、CameraThreadManagerが正常に動いているかどうかを判定する必要があるため、CameraThreadManagerを実装すれば必ずCameraThreadClassVariableが常にキャプチャを開いていることを保証するわけではないことに注意してください。

例えばDockerを使っている場合、CameraThreadClassVariableがキャプチャが開いているかを取得するAPIを作成し、それをヘルスチェック用のAPIとして用意しておけば、一定回数キャプチャが開いていない場合はサーバ自体を再起動させるといった対応も取れるため、必要に応じてこのような機能を実装するのもいいかと思います。

まとめ

今回はLiteのバックエンド技術のなかでカメラサーバの実装について紹介させていただきました。OpenCVを用いてカメラ映像を取得する方法を検討中の方はぜひ参考にしていただければ幸いです。