airpocketのアイコン画像
airpocket 2023年12月20日作成 (2023年12月22日更新) © MIT
セットアップや使用方法 セットアップや使用方法 閲覧数 594
airpocket 2023年12月20日作成 (2023年12月22日更新) © MIT セットアップや使用方法 セットアップや使用方法 閲覧数 594

zero2 Wで動かした超高速YOLOXをストリーミングしてリアルタイムモニタリングする

zero2 Wで動かした超高速YOLOXをストリーミングしてリアルタイムモニタリングする

はじめに

前の記事ではRaspberry Pi zero2 Wで超軽量&超高速YOLOXを実行しつつmp4ファイルとして出力しました。
しかし、リアルタイムモニタリングしないと不便なことも多いのでストリーミングできる様変更しました。

こんな感じで動きます。

環境

前の記事の環境に加え、Flaskをインストールします。

pip install flask

作業用ディレクトリは YOLOX_streamingとします。

mkdir ~/YOLOX_streaming
cd ~/YOLOX_streaming

前の記事で作ったtfliteモデルをコピーします。前に作業したフォルダが~/YOLOXであれば次のコマンドでコピーできます。

cp ~/YOLOX/yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite

index.htmlを準備する。

mdkir templates
nano index.html

以下のコードを書き込んで保存

<html>
   <head>
       <title>{{ title }} YOLOX DEMO</title>
   </head>
   <body>
       <h3>from {{ user.username }}.</h3>
       <h3>YOLOX-Body-Head-Hand Live Streaming.</h3>
       <img src="{{ url_for('video_feed') }}">
       <h3>model:{{ user.modelname }}</h3>
   </body>
</html>

実行用スクリプトを準備する

作業用フォルダにapp.pyを作成します。

cd ~/YOLOX_streaming
nano app.py

以下のコードを書き込んで保存します。

from flask import render_template, Flask, Response
import cv2
from picamera2 import Picamera2
import time
import numpy as np
from typing import List

app = Flask(__name__)

# params
WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_bgr_uint8.tflite"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_bgr_uint8.tflite"

NUM_CLASSES = 3
SCORE_THRESHOLD = 0.50
IOU_THRESHOLD = 0.4
CAP_WIDTH   = 320                #出力動画の幅
CAP_HEIGHT = 240                #出力動画の高さ
LAW_WIDTH   = 2304              #カメラ内のraw画像の幅
LAW_HEIGHT = 1296              #カメラ内のraw画像の高さ

folder_path ="/tmp/img"
movie_length = 100           #撮影するフレーム数
time_list = []
num_threads = 4                 #スレッド数 1-4を選択
exposure_time = 5000       #イメージセンサの露出時間
analog_gain = 20.0               #イメージセンサのgain

# detection model class for yolox
class DetectionModel:
    # constructor
    def __init__(
        self,
        *,
        weight: str,
    ):
        self.__initialize(weight=weight)

    # initialize
    def __initialize(
        self,
        *,
        weight: str,
    ):
        from tflite_runtime.interpreter import Interpreter # type: ignore
        self._interpreter = Interpreter(model_path=weight, num_threads=num_threads)
        self._input_details = self._interpreter.get_input_details()
        self._output_details = self._interpreter.get_output_details()
        self._input_shapes = [
            input.get('shape', None) for input in self._input_details
        ]
        self._input_names = [
            input.get('name', None) for input in self._input_details
        ]
        self._output_shapes = [
            output.get('shape', None) for output in self._output_details
        ]
        self._output_names = [
            output.get('name', None) for output in self._output_details
        ]
        self._model = self._interpreter.get_signature_runner()
        self._h_index = 1
        self._w_index = 2
        strides = [8, 16, 32]
        self.grids, self.expanded_strides = \
            self.__create_grids_and_expanded_strides(strides=strides)

    # create grids and expanded strides
    def __create_grids_and_expanded_strides(
        self,
        *,
        strides: List[int],
    ):
        grids = []
        expanded_strides = []

        hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides]
        wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides]

        for hsize, wsize, stride in zip(hsizes, wsizes, strides):
            xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
            grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
            grids.append(grid)
            shape = grid.shape[:2]
            expanded_strides.append(np.full((*shape, 1), stride))

        grids = np.concatenate(grids, 1)
        expanded_strides = np.concatenate(expanded_strides, 1)

        return grids, expanded_strides

    # detect objects
    def __call__(
        self,
        *,
        image: np.ndarray,
        score_threshold: float,
        iou_threshold: float,
    ):
        self.image_shape = image.shape
        prep_image, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image)
        datas = {
            f'{input_name}': input_data \
                for input_name, input_data in zip(self._input_names, [np.asarray([prep_image], dtype=np.uint8)])
        }
        outputs = [
            output for output in \
                self._model(
                    **datas
                ).values()
        ][0]
        boxes, scores, class_ids = \
            self.__postprocess(
                output_blob=outputs,
                resize_ratio_w=resize_ratio_w,
                resize_ratio_h=resize_ratio_h,
            )
        boxes, scores, class_ids = \
            self.__nms(
                boxes=boxes,
                scores=scores,
                class_ids=class_ids,
                score_threshold=score_threshold,
                iou_threshold=iou_threshold,
            )
        return class_ids, scores, boxes

    # preprocess
    def __preprocess(
        self,
        *,
        image: np.ndarray,
    ):
        resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1]
        resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0]
        resized_image = \
            cv2.resize(
                image,
                dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index])
            )
        return resized_image, resize_ratio_w, resize_ratio_h

    # postprocess
    def __postprocess(
        self,
        *,
        output_blob: np.ndarray,
        resize_ratio_w: float,
        resize_ratio_h: float,
    ):
        output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides
        output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides

        predictions: np.ndarray = output_blob[0]
        boxes = predictions[:, :4]
        boxes_xywh = np.ones_like(boxes)

        # yolox-ti
        boxes[:, 0] = boxes[:, 0] / resize_ratio_w
        boxes[:, 1] = boxes[:, 1] / resize_ratio_h
        boxes[:, 2] = boxes[:, 2] / resize_ratio_w
        boxes[:, 3] = boxes[:, 3] / resize_ratio_h
        boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5)
        boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5)
        boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0])
        boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1])

        scores = predictions[:, 4:5] * predictions[:, 5:]
        class_ids = scores.argmax(1)
        scores = scores[np.arange(len(class_ids)), class_ids]

        return boxes_xywh, scores, class_ids

    # non maximum suppression
    def __nms(
        self,
        *,
        boxes: np.ndarray,
        scores: np.ndarray,
        class_ids: np.ndarray,
        score_threshold: float,
        iou_threshold: float,
    ):
        indices = \
            cv2.dnn.NMSBoxesBatched(
                bboxes=boxes,
                scores=scores,
                class_ids=class_ids,
                score_threshold=score_threshold,
                nms_threshold=iou_threshold,
            ) # OpenCV 4.7.0 or later

        keep_boxes = []
        keep_scores = []
        keep_class_ids = []
        for index in indices:
            keep_boxes.append(boxes[index])
            keep_scores.append(scores[index])
            keep_class_ids.append(class_ids[index])

        if len(keep_boxes) > 0:
            keep_boxes = np.vectorize(int)(keep_boxes)

        return keep_boxes, keep_scores, keep_class_ids

# get raudom colors
def get_colors(num: int):
    colors = []
    np.random.seed(0)
    for _ in range(num):
        color = np.random.randint(0, 256, [3]).astype(np.uint8)
        colors.append(color.tolist())
    return colors

def gen_frames():
    print("gen_frames")
    count = 0

    # create detection model class for yolox
    model = DetectionModel(weight=WEIGHTS)

    # init camera
    cap = Picamera2()
    config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)}) 
    cap.configure(config)
    cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain})
    cap.start()

    # detect objects
    score_threshold = SCORE_THRESHOLD
    iou_threshold = IOU_THRESHOLD

    while True:
        print("count = ",count)
        start_time_frame = time.perf_counter()

        frame = cap.capture_array()
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        start_time = time.perf_counter()
        class_ids, scores, boxes = \
            model(
                image=frame,
                score_threshold=score_threshold,
                iou_threshold=iou_threshold,
            )
        elapsed_time = time.perf_counter() - start_time

        cv2.putText(
            frame,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )
        
        cv2.putText(
            frame,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (0, 0, 255),
            1,
            cv2.LINE_AA,
        )
        
        # draw objects
        num_classes = NUM_CLASSES
        colors = get_colors(num_classes)
        for box, score, class_id in zip(boxes, scores, class_ids):
            color = colors[class_id]
            thickness = 2
            line_type = cv2.LINE_AA
            cv2.rectangle(frame, box, color, thickness, line_type)

        #フレームデータをjpgに圧縮
        ret, buffer = cv2.imencode('.jpg',frame)
        # bytesデータ化
        frame = buffer.tobytes()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

        elapsed_time_frame = time.perf_counter() - start_time_frame
        print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame))
        count +=1

@app.route('/video_feed')
def video_feed():
    #imgタグに埋め込まれるResponseオブジェクトを返す
    return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/')
@app.route('/index')
def index():
   
    user = {'username' : 'Raspberry Pi zero2 W',
            'modelname': WEIGHTS}
    return render_template('index.html', title='home', user=user)

実行する

次のコマンドを実行する

flask run --host=0.0.0.0

参照アドレスが表示されるのでブラウザで見てください。

おまけ

pip install onnxしてからこちらのコードだと、onnxモデルが動きます。
動作確認したのは前の記事でダウンロードした6個のモデル。
yolox_ti_body_head_hand_n_1x3x128x160.onnx
yolox_ti_body_head_hand_n_1x3x256x320.onnx
yolox_ti_body_head_hand_n_1x3x480x640.onnx
yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx
yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx
yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx
demoコードの元ネタはPINTO_model_zooのこちら

"""
code cited from: https://qiita.com/UnaNancyOwen/items/650d79c88a58a3cc30ce
"""

from flask import render_template, Flask, Response
from picamera2 import Picamera2
import cv2
import onnx
import time
import numpy as np
from typing import List

app = Flask(__name__)

# params

# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320.onnx"
WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx"
NUM_CLASSES = 3
SCORE_THRESHOLD = 0.60
IOU_THRESHOLD = 0.4
CAP_WIDTH = 640
CAP_HEIGHT = 480
LAW_WIDTH   = 2304              #カメラ内のraw画像の幅
LAW_HEIGHT = 1296              #カメラ内のraw画像の高さ

folder_path ="/tmp/img"
movie_length = 100           #撮影するフレーム数
time_list = []
#num_threads = 4                 #スレッド数 1-4を選択
exposure_time = 5000       #イメージセンサの露出時間
analog_gain = 10.0               #イメージセンサのgain


# detection model class for yolox
class DetectionModel:
    # constructor
    def __init__(
        self,
        *,
        weight: str,
    ):
        self.__initialize(weight=weight)

    # set preferable backend
    def _setPreferableBackend(self, backend):
        self._interpreter.setPreferableBackend(backend)

    # set preferable target
    def _setPreferableTarget(self, target):
        self._interpreter.setPreferableTarget(target)

    # initialize
    def __initialize(
        self,
        *,
        weight: str,
    ):
        onnx_model = onnx.load(f=weight)
        self._input_shapes = [
            [dim.dim_value for dim in onnx_model.graph.input[0].type.tensor_type.shape.dim]
        ]
        self._input_names = [
            input.name for input in onnx_model.graph.input
        ]
        self._output_shapes = [
            [dim.dim_value for dim in onnx_model.graph.output[0].type.tensor_type.shape.dim]
        ]
        self._output_names = [
            output.name for output in onnx_model.graph.output
        ]
        del onnx_model

        self._interpreter = cv2.dnn.readNet(weight)
        self._setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
        self._setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
        self._model = self._interpreter
        self._h_index = 2
        self._w_index = 3
        strides = [8, 16, 32]
        self.grids, self.expanded_strides = \
            self.__create_grids_and_expanded_strides(strides=strides)

    # create grids and expanded strides
    def __create_grids_and_expanded_strides(
        self,
        *,
        strides: List[int],
    ):
        grids = []
        expanded_strides = []

        hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides]
        wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides]

        for hsize, wsize, stride in zip(hsizes, wsizes, strides):
            xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
            grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
            grids.append(grid)
            shape = grid.shape[:2]
            expanded_strides.append(np.full((*shape, 1), stride))

        grids = np.concatenate(grids, 1)
        expanded_strides = np.concatenate(expanded_strides, 1)

        return grids, expanded_strides

    # detect objects
    def __call__(
        self,
        *,
        image: np.ndarray,
        score_threshold: float,
        iou_threshold: float,
    ):
        self.image_shape = image.shape
        input_blob, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image)
        self._model.setInput(input_blob)
        output_layer = self._model.getUnconnectedOutLayersNames()[0] # "output"
        outputs = self._model.forward(output_layer)

        boxes, scores, class_ids = \
            self.__postprocess(
                output_blob=outputs,
                resize_ratio_w=resize_ratio_w,
                resize_ratio_h=resize_ratio_h,
            )
        boxes, scores, class_ids = \
            self.__nms(
                boxes=boxes,
                scores=scores,
                class_ids=class_ids,
                score_threshold=score_threshold,
                iou_threshold=iou_threshold,
            )
        return class_ids, scores, boxes

    # preprocess
    def __preprocess(
        self,
        *,
        image: np.ndarray,
    ):
        resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1]
        resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0]
        resized_image = \
            cv2.resize(
                image,
                dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index])
            )
        input_blob = \
            cv2.dnn.blobFromImage(
                image=resized_image,
                scalefactor=1.0,
                size=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index]),
                mean=(0.0, 0.0, 0.0),
                swapRB=False,
                crop=False,
            )
        return input_blob, resize_ratio_w, resize_ratio_h

    # postprocess
    def __postprocess(
        self,
        *,
        output_blob: np.ndarray,
        resize_ratio_w: float,
        resize_ratio_h: float,
    ):
        output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides
        output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides

        predictions: np.ndarray = output_blob[0]
        boxes = predictions[:, :4]
        boxes_xywh = np.ones_like(boxes)

        # yolox-ti
        boxes[:, 0] = boxes[:, 0] / resize_ratio_w
        boxes[:, 1] = boxes[:, 1] / resize_ratio_h
        boxes[:, 2] = boxes[:, 2] / resize_ratio_w
        boxes[:, 3] = boxes[:, 3] / resize_ratio_h
        boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5)
        boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5)
        boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0])
        boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1])

        scores = predictions[:, 4:5] * predictions[:, 5:]
        class_ids = scores.argmax(1)
        scores = scores[np.arange(len(class_ids)), class_ids]

        return boxes_xywh, scores, class_ids

    # non maximum suppression
    def __nms(
        self,
        *,
        boxes: np.ndarray,
        scores: np.ndarray,
        class_ids: np.ndarray,
        score_threshold: float,
        iou_threshold: float,
    ):
        indices = \
            cv2.dnn.NMSBoxesBatched(
                bboxes=boxes,
                scores=scores,
                class_ids=class_ids,
                score_threshold=score_threshold,
                nms_threshold=iou_threshold,
            ) # OpenCV 4.7.0 or later

        keep_boxes = []
        keep_scores = []
        keep_class_ids = []
        for index in indices:
            keep_boxes.append(boxes[index])
            keep_scores.append(scores[index])
            keep_class_ids.append(class_ids[index])

        if len(keep_boxes) > 0:
            keep_boxes = np.vectorize(int)(keep_boxes)

        return keep_boxes, keep_scores, keep_class_ids

# get raudom colors
def get_colors(num: int):
    colors = []
    np.random.seed(0)
    for _ in range(num):
        color = np.random.randint(0, 256, [3]).astype(np.uint8)
        colors.append(color.tolist())
    return colors

# main
def gen_frames():
    count = 0

    # create detection model class for yolox
    model = DetectionModel(weight=WEIGHTS)

    # init camera
    cap = Picamera2()
    config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)}) 
    cap.configure(config)
    cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain})
    cap.start()

    # detect objects
    score_threshold = SCORE_THRESHOLD
    iou_threshold = IOU_THRESHOLD

    while True:
        start_time_frame = time.perf_counter() 

        # get image
        image = cap.capture_array()
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # inference
        start_time = time.perf_counter()
        class_ids, scores, boxes = \
            model(
                image=image,
                score_threshold=score_threshold,
                iou_threshold=iou_threshold,
            )
        elapsed_time = time.perf_counter() - start_time

        # draw elapsed time
        cv2.putText(
            image,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )
        cv2.putText(
            image,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (0, 0, 255),
            1,
            cv2.LINE_AA,
        )

        # draw objects
        num_classes = NUM_CLASSES
        colors = get_colors(num_classes)
        for box, score, class_id in zip(boxes, scores, class_ids):
            color = colors[class_id]
            thickness = 2
            line_type = cv2.LINE_AA
            cv2.rectangle(image, box, color, thickness, line_type)

        # encode image
        ret, buffer = cv2.imencode('.jpg',image)
        frame = buffer.tobytes()
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

        elapsed_time_frame = time.perf_counter() - start_time_frame
        print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame))
        count +=1

@app.route('/video_feed')
def video_feed():
    #imgタグに埋め込まれるResponseオブジェクトを返す
    return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/')
@app.route('/index')
def index():
   
    user = {'username' : 'Raspberry Pi zero2 W',
            'modelname': WEIGHTS}
    return render_template('index.html', title='home', user=user)
airpocketのアイコン画像
電子工作、プログラミング、AI、DIY、XR、IoT M5Stack / Raspberry Pi / Arduino / spresense / K210 / ESP32 / Maix / maicro:bit / oculus / Jetson Nano / minipupper etc
ログインしてコメントを投稿する