airpocketのアイコン画像
airpocket 2023年12月20日作成 (2023年12月22日更新) © MIT
セットアップや使用方法 セットアップや使用方法 閲覧数 894
airpocket 2023年12月20日作成 (2023年12月22日更新) © MIT セットアップや使用方法 セットアップや使用方法 閲覧数 894

zero2 Wで動かした超高速YOLOXをストリーミングしてリアルタイムモニタリングする

zero2 Wで動かした超高速YOLOXをストリーミングしてリアルタイムモニタリングする

はじめに

前の記事ではRaspberry Pi zero2 Wで超軽量&超高速YOLOXを実行しつつmp4ファイルとして出力しました。
しかし、リアルタイムモニタリングしないと不便なことも多いのでストリーミングできる様変更しました。

こんな感じで動きます。

環境

前の記事の環境に加え、Flaskをインストールします。

pip install flask

作業用ディレクトリは YOLOX_streamingとします。

mkdir ~/YOLOX_streaming cd ~/YOLOX_streaming

前の記事で作ったtfliteモデルをコピーします。前に作業したフォルダが~/YOLOXであれば次のコマンドでコピーできます。

cp ~/YOLOX/yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite

index.htmlを準備する。

mdkir templates nano index.html

以下のコードを書き込んで保存

<html> <head> <title>{{ title }} YOLOX DEMO</title> </head> <body> <h3>from {{ user.username }}.</h3> <h3>YOLOX-Body-Head-Hand Live Streaming.</h3> <img src="{{ url_for('video_feed') }}"> <h3>model:{{ user.modelname }}</h3> </body> </html>

実行用スクリプトを準備する

作業用フォルダにapp.pyを作成します。

cd ~/YOLOX_streaming nano app.py

以下のコードを書き込んで保存します。

from flask import render_template, Flask, Response import cv2 from picamera2 import Picamera2 import time import numpy as np from typing import List app = Flask(__name__) # params WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_bgr_uint8.tflite" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_bgr_uint8.tflite" NUM_CLASSES = 3 SCORE_THRESHOLD = 0.50 IOU_THRESHOLD = 0.4 CAP_WIDTH = 320 #出力動画の幅 CAP_HEIGHT = 240 #出力動画の高さ LAW_WIDTH = 2304 #カメラ内のraw画像の幅 LAW_HEIGHT = 1296 #カメラ内のraw画像の高さ folder_path ="/tmp/img" movie_length = 100 #撮影するフレーム数 time_list = [] num_threads = 4 #スレッド数 1-4を選択 exposure_time = 5000 #イメージセンサの露出時間 analog_gain = 20.0 #イメージセンサのgain # detection model class for yolox class DetectionModel: # constructor def __init__( self, *, weight: str, ): self.__initialize(weight=weight) # initialize def __initialize( self, *, weight: str, ): from tflite_runtime.interpreter import Interpreter # type: ignore self._interpreter = Interpreter(model_path=weight, num_threads=num_threads) self._input_details = self._interpreter.get_input_details() self._output_details = self._interpreter.get_output_details() self._input_shapes = [ input.get('shape', None) for input in self._input_details ] self._input_names = [ input.get('name', None) for input in self._input_details ] self._output_shapes = [ output.get('shape', None) for output in self._output_details ] self._output_names = [ output.get('name', None) for output in self._output_details ] self._model = self._interpreter.get_signature_runner() self._h_index = 1 self._w_index = 2 strides = [8, 16, 32] self.grids, self.expanded_strides = \ self.__create_grids_and_expanded_strides(strides=strides) # create grids and expanded strides def __create_grids_and_expanded_strides( self, *, strides: List[int], ): grids = [] expanded_strides = [] hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides] wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides] for hsize, wsize, stride in zip(hsizes, wsizes, strides): xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize)) grid = np.stack((xv, yv), 2).reshape(1, -1, 2) grids.append(grid) shape = grid.shape[:2] expanded_strides.append(np.full((*shape, 1), stride)) grids = np.concatenate(grids, 1) expanded_strides = np.concatenate(expanded_strides, 1) return grids, expanded_strides # detect objects def __call__( self, *, image: np.ndarray, score_threshold: float, iou_threshold: float, ): self.image_shape = image.shape prep_image, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image) datas = { f'{input_name}': input_data \ for input_name, input_data in zip(self._input_names, [np.asarray([prep_image], dtype=np.uint8)]) } outputs = [ output for output in \ self._model( **datas ).values() ][0] boxes, scores, class_ids = \ self.__postprocess( output_blob=outputs, resize_ratio_w=resize_ratio_w, resize_ratio_h=resize_ratio_h, ) boxes, scores, class_ids = \ self.__nms( boxes=boxes, scores=scores, class_ids=class_ids, score_threshold=score_threshold, iou_threshold=iou_threshold, ) return class_ids, scores, boxes # preprocess def __preprocess( self, *, image: np.ndarray, ): resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1] resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0] resized_image = \ cv2.resize( image, dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index]) ) return resized_image, resize_ratio_w, resize_ratio_h # postprocess def __postprocess( self, *, output_blob: np.ndarray, resize_ratio_w: float, resize_ratio_h: float, ): output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides predictions: np.ndarray = output_blob[0] boxes = predictions[:, :4] boxes_xywh = np.ones_like(boxes) # yolox-ti boxes[:, 0] = boxes[:, 0] / resize_ratio_w boxes[:, 1] = boxes[:, 1] / resize_ratio_h boxes[:, 2] = boxes[:, 2] / resize_ratio_w boxes[:, 3] = boxes[:, 3] / resize_ratio_h boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5) boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5) boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0]) boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1]) scores = predictions[:, 4:5] * predictions[:, 5:] class_ids = scores.argmax(1) scores = scores[np.arange(len(class_ids)), class_ids] return boxes_xywh, scores, class_ids # non maximum suppression def __nms( self, *, boxes: np.ndarray, scores: np.ndarray, class_ids: np.ndarray, score_threshold: float, iou_threshold: float, ): indices = \ cv2.dnn.NMSBoxesBatched( bboxes=boxes, scores=scores, class_ids=class_ids, score_threshold=score_threshold, nms_threshold=iou_threshold, ) # OpenCV 4.7.0 or later keep_boxes = [] keep_scores = [] keep_class_ids = [] for index in indices: keep_boxes.append(boxes[index]) keep_scores.append(scores[index]) keep_class_ids.append(class_ids[index]) if len(keep_boxes) > 0: keep_boxes = np.vectorize(int)(keep_boxes) return keep_boxes, keep_scores, keep_class_ids # get raudom colors def get_colors(num: int): colors = [] np.random.seed(0) for _ in range(num): color = np.random.randint(0, 256, [3]).astype(np.uint8) colors.append(color.tolist()) return colors def gen_frames(): print("gen_frames") count = 0 # create detection model class for yolox model = DetectionModel(weight=WEIGHTS) # init camera cap = Picamera2() config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)}) cap.configure(config) cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain}) cap.start() # detect objects score_threshold = SCORE_THRESHOLD iou_threshold = IOU_THRESHOLD while True: print("count = ",count) start_time_frame = time.perf_counter() frame = cap.capture_array() frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) start_time = time.perf_counter() class_ids, scores, boxes = \ model( image=frame, score_threshold=score_threshold, iou_threshold=iou_threshold, ) elapsed_time = time.perf_counter() - start_time cv2.putText( frame, f'{elapsed_time*1000:.2f} ms', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA, ) cv2.putText( frame, f'{elapsed_time*1000:.2f} ms', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 1, cv2.LINE_AA, ) # draw objects num_classes = NUM_CLASSES colors = get_colors(num_classes) for box, score, class_id in zip(boxes, scores, class_ids): color = colors[class_id] thickness = 2 line_type = cv2.LINE_AA cv2.rectangle(frame, box, color, thickness, line_type) #フレームデータをjpgに圧縮 ret, buffer = cv2.imencode('.jpg',frame) # bytesデータ化 frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') elapsed_time_frame = time.perf_counter() - start_time_frame print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame)) count +=1 @app.route('/video_feed') def video_feed(): #imgタグに埋め込まれるResponseオブジェクトを返す return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/') @app.route('/index') def index(): user = {'username' : 'Raspberry Pi zero2 W', 'modelname': WEIGHTS} return render_template('index.html', title='home', user=user)

実行する

次のコマンドを実行する

flask run --host=0.0.0.0

参照アドレスが表示されるのでブラウザで見てください。

おまけ

pip install onnxしてからこちらのコードだと、onnxモデルが動きます。
動作確認したのは前の記事でダウンロードした6個のモデル。
yolox_ti_body_head_hand_n_1x3x128x160.onnx
yolox_ti_body_head_hand_n_1x3x256x320.onnx
yolox_ti_body_head_hand_n_1x3x480x640.onnx
yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx
yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx
yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx
demoコードの元ネタはPINTO_model_zooのこちら

""" code cited from: https://qiita.com/UnaNancyOwen/items/650d79c88a58a3cc30ce """ from flask import render_template, Flask, Response from picamera2 import Picamera2 import cv2 import onnx import time import numpy as np from typing import List app = Flask(__name__) # params # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160.onnx" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320.onnx" WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640.onnx" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx" # WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx" NUM_CLASSES = 3 SCORE_THRESHOLD = 0.60 IOU_THRESHOLD = 0.4 CAP_WIDTH = 640 CAP_HEIGHT = 480 LAW_WIDTH = 2304 #カメラ内のraw画像の幅 LAW_HEIGHT = 1296 #カメラ内のraw画像の高さ folder_path ="/tmp/img" movie_length = 100 #撮影するフレーム数 time_list = [] #num_threads = 4 #スレッド数 1-4を選択 exposure_time = 5000 #イメージセンサの露出時間 analog_gain = 10.0 #イメージセンサのgain # detection model class for yolox class DetectionModel: # constructor def __init__( self, *, weight: str, ): self.__initialize(weight=weight) # set preferable backend def _setPreferableBackend(self, backend): self._interpreter.setPreferableBackend(backend) # set preferable target def _setPreferableTarget(self, target): self._interpreter.setPreferableTarget(target) # initialize def __initialize( self, *, weight: str, ): onnx_model = onnx.load(f=weight) self._input_shapes = [ [dim.dim_value for dim in onnx_model.graph.input[0].type.tensor_type.shape.dim] ] self._input_names = [ input.name for input in onnx_model.graph.input ] self._output_shapes = [ [dim.dim_value for dim in onnx_model.graph.output[0].type.tensor_type.shape.dim] ] self._output_names = [ output.name for output in onnx_model.graph.output ] del onnx_model self._interpreter = cv2.dnn.readNet(weight) self._setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) self._setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) self._model = self._interpreter self._h_index = 2 self._w_index = 3 strides = [8, 16, 32] self.grids, self.expanded_strides = \ self.__create_grids_and_expanded_strides(strides=strides) # create grids and expanded strides def __create_grids_and_expanded_strides( self, *, strides: List[int], ): grids = [] expanded_strides = [] hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides] wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides] for hsize, wsize, stride in zip(hsizes, wsizes, strides): xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize)) grid = np.stack((xv, yv), 2).reshape(1, -1, 2) grids.append(grid) shape = grid.shape[:2] expanded_strides.append(np.full((*shape, 1), stride)) grids = np.concatenate(grids, 1) expanded_strides = np.concatenate(expanded_strides, 1) return grids, expanded_strides # detect objects def __call__( self, *, image: np.ndarray, score_threshold: float, iou_threshold: float, ): self.image_shape = image.shape input_blob, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image) self._model.setInput(input_blob) output_layer = self._model.getUnconnectedOutLayersNames()[0] # "output" outputs = self._model.forward(output_layer) boxes, scores, class_ids = \ self.__postprocess( output_blob=outputs, resize_ratio_w=resize_ratio_w, resize_ratio_h=resize_ratio_h, ) boxes, scores, class_ids = \ self.__nms( boxes=boxes, scores=scores, class_ids=class_ids, score_threshold=score_threshold, iou_threshold=iou_threshold, ) return class_ids, scores, boxes # preprocess def __preprocess( self, *, image: np.ndarray, ): resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1] resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0] resized_image = \ cv2.resize( image, dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index]) ) input_blob = \ cv2.dnn.blobFromImage( image=resized_image, scalefactor=1.0, size=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index]), mean=(0.0, 0.0, 0.0), swapRB=False, crop=False, ) return input_blob, resize_ratio_w, resize_ratio_h # postprocess def __postprocess( self, *, output_blob: np.ndarray, resize_ratio_w: float, resize_ratio_h: float, ): output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides predictions: np.ndarray = output_blob[0] boxes = predictions[:, :4] boxes_xywh = np.ones_like(boxes) # yolox-ti boxes[:, 0] = boxes[:, 0] / resize_ratio_w boxes[:, 1] = boxes[:, 1] / resize_ratio_h boxes[:, 2] = boxes[:, 2] / resize_ratio_w boxes[:, 3] = boxes[:, 3] / resize_ratio_h boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5) boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5) boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0]) boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1]) scores = predictions[:, 4:5] * predictions[:, 5:] class_ids = scores.argmax(1) scores = scores[np.arange(len(class_ids)), class_ids] return boxes_xywh, scores, class_ids # non maximum suppression def __nms( self, *, boxes: np.ndarray, scores: np.ndarray, class_ids: np.ndarray, score_threshold: float, iou_threshold: float, ): indices = \ cv2.dnn.NMSBoxesBatched( bboxes=boxes, scores=scores, class_ids=class_ids, score_threshold=score_threshold, nms_threshold=iou_threshold, ) # OpenCV 4.7.0 or later keep_boxes = [] keep_scores = [] keep_class_ids = [] for index in indices: keep_boxes.append(boxes[index]) keep_scores.append(scores[index]) keep_class_ids.append(class_ids[index]) if len(keep_boxes) > 0: keep_boxes = np.vectorize(int)(keep_boxes) return keep_boxes, keep_scores, keep_class_ids # get raudom colors def get_colors(num: int): colors = [] np.random.seed(0) for _ in range(num): color = np.random.randint(0, 256, [3]).astype(np.uint8) colors.append(color.tolist()) return colors # main def gen_frames(): count = 0 # create detection model class for yolox model = DetectionModel(weight=WEIGHTS) # init camera cap = Picamera2() config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)}) cap.configure(config) cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain}) cap.start() # detect objects score_threshold = SCORE_THRESHOLD iou_threshold = IOU_THRESHOLD while True: start_time_frame = time.perf_counter() # get image image = cap.capture_array() image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # inference start_time = time.perf_counter() class_ids, scores, boxes = \ model( image=image, score_threshold=score_threshold, iou_threshold=iou_threshold, ) elapsed_time = time.perf_counter() - start_time # draw elapsed time cv2.putText( image, f'{elapsed_time*1000:.2f} ms', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA, ) cv2.putText( image, f'{elapsed_time*1000:.2f} ms', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 1, cv2.LINE_AA, ) # draw objects num_classes = NUM_CLASSES colors = get_colors(num_classes) for box, score, class_id in zip(boxes, scores, class_ids): color = colors[class_id] thickness = 2 line_type = cv2.LINE_AA cv2.rectangle(image, box, color, thickness, line_type) # encode image ret, buffer = cv2.imencode('.jpg',image) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') elapsed_time_frame = time.perf_counter() - start_time_frame print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame)) count +=1 @app.route('/video_feed') def video_feed(): #imgタグに埋め込まれるResponseオブジェクトを返す return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/') @app.route('/index') def index(): user = {'username' : 'Raspberry Pi zero2 W', 'modelname': WEIGHTS} return render_template('index.html', title='home', user=user)
airpocketのアイコン画像
電子工作、プログラミング、AI、DIY、XR、IoT M5Stack / Raspberry Pi / Arduino / spresense / K210 / ESP32 / Maix / maicro:bit / oculus / Jetson Nano / minipupper etc
ログインしてコメントを投稿する