はじめに
前の記事ではRaspberry Pi zero2 Wで超軽量&超高速YOLOXを実行しつつmp4ファイルとして出力しました。
しかし、リアルタイムモニタリングしないと不便なことも多いのでストリーミングできる様変更しました。
こんな感じで動きます。
環境
前の記事の環境に加え、Flaskをインストールします。
pip install flask
作業用ディレクトリは YOLOX_streamingとします。
mkdir ~/YOLOX_streaming
cd ~/YOLOX_streaming
前の記事で作ったtfliteモデルをコピーします。前に作業したフォルダが~/YOLOXであれば次のコマンドでコピーできます。
cp ~/YOLOX/yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite
index.htmlを準備する。
mdkir templates
cd templates
nano index.html
以下のコードを書き込んで保存
<html>
<head>
<title>{{ title }} YOLOX DEMO</title>
</head>
<body>
<h3>from {{ user.username }}.</h3>
<h3>YOLOX-Body-Head-Hand Live Streaming.</h3>
<img src="{{ url_for('video_feed') }}">
<h3>model:{{ user.modelname }}</h3>
</body>
</html>
実行用スクリプトを準備する
作業用フォルダにapp.pyを作成します。
cd ~/YOLOX_streaming
nano app.py
以下のコードを書き込んで保存します。
from flask import render_template, Flask, Response
import cv2
from picamera2 import Picamera2
import time
import numpy as np
from typing import List
app = Flask(__name__)
# params
WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_bgr_uint8.tflite"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_bgr_uint8.tflite"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_bgr_uint8.tflite"
NUM_CLASSES = 3
SCORE_THRESHOLD = 0.50
IOU_THRESHOLD = 0.4
CAP_WIDTH = 320 #出力動画の幅
CAP_HEIGHT = 240 #出力動画の高さ
LAW_WIDTH = 2304 #カメラ内のraw画像の幅
LAW_HEIGHT = 1296 #カメラ内のraw画像の高さ
folder_path ="/tmp/img"
movie_length = 100 #撮影するフレーム数
time_list = []
num_threads = 4 #スレッド数 1-4を選択
exposure_time = 5000 #イメージセンサの露出時間
analog_gain = 20.0 #イメージセンサのgain
# detection model class for yolox
class DetectionModel:
# constructor
def __init__(
self,
*,
weight: str,
):
self.__initialize(weight=weight)
# initialize
def __initialize(
self,
*,
weight: str,
):
from tflite_runtime.interpreter import Interpreter # type: ignore
self._interpreter = Interpreter(model_path=weight, num_threads=num_threads)
self._input_details = self._interpreter.get_input_details()
self._output_details = self._interpreter.get_output_details()
self._input_shapes = [
input.get('shape', None) for input in self._input_details
]
self._input_names = [
input.get('name', None) for input in self._input_details
]
self._output_shapes = [
output.get('shape', None) for output in self._output_details
]
self._output_names = [
output.get('name', None) for output in self._output_details
]
self._model = self._interpreter.get_signature_runner()
self._h_index = 1
self._w_index = 2
strides = [8, 16, 32]
self.grids, self.expanded_strides = \
self.__create_grids_and_expanded_strides(strides=strides)
# create grids and expanded strides
def __create_grids_and_expanded_strides(
self,
*,
strides: List[int],
):
grids = []
expanded_strides = []
hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides]
wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides]
for hsize, wsize, stride in zip(hsizes, wsizes, strides):
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
grids.append(grid)
shape = grid.shape[:2]
expanded_strides.append(np.full((*shape, 1), stride))
grids = np.concatenate(grids, 1)
expanded_strides = np.concatenate(expanded_strides, 1)
return grids, expanded_strides
# detect objects
def __call__(
self,
*,
image: np.ndarray,
score_threshold: float,
iou_threshold: float,
):
self.image_shape = image.shape
prep_image, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image)
datas = {
f'{input_name}': input_data \
for input_name, input_data in zip(self._input_names, [np.asarray([prep_image], dtype=np.uint8)])
}
outputs = [
output for output in \
self._model(
**datas
).values()
][0]
boxes, scores, class_ids = \
self.__postprocess(
output_blob=outputs,
resize_ratio_w=resize_ratio_w,
resize_ratio_h=resize_ratio_h,
)
boxes, scores, class_ids = \
self.__nms(
boxes=boxes,
scores=scores,
class_ids=class_ids,
score_threshold=score_threshold,
iou_threshold=iou_threshold,
)
return class_ids, scores, boxes
# preprocess
def __preprocess(
self,
*,
image: np.ndarray,
):
resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1]
resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0]
resized_image = \
cv2.resize(
image,
dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index])
)
return resized_image, resize_ratio_w, resize_ratio_h
# postprocess
def __postprocess(
self,
*,
output_blob: np.ndarray,
resize_ratio_w: float,
resize_ratio_h: float,
):
output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides
output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides
predictions: np.ndarray = output_blob[0]
boxes = predictions[:, :4]
boxes_xywh = np.ones_like(boxes)
# yolox-ti
boxes[:, 0] = boxes[:, 0] / resize_ratio_w
boxes[:, 1] = boxes[:, 1] / resize_ratio_h
boxes[:, 2] = boxes[:, 2] / resize_ratio_w
boxes[:, 3] = boxes[:, 3] / resize_ratio_h
boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5)
boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5)
boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0])
boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1])
scores = predictions[:, 4:5] * predictions[:, 5:]
class_ids = scores.argmax(1)
scores = scores[np.arange(len(class_ids)), class_ids]
return boxes_xywh, scores, class_ids
# non maximum suppression
def __nms(
self,
*,
boxes: np.ndarray,
scores: np.ndarray,
class_ids: np.ndarray,
score_threshold: float,
iou_threshold: float,
):
indices = \
cv2.dnn.NMSBoxesBatched(
bboxes=boxes,
scores=scores,
class_ids=class_ids,
score_threshold=score_threshold,
nms_threshold=iou_threshold,
) # OpenCV 4.7.0 or later
keep_boxes = []
keep_scores = []
keep_class_ids = []
for index in indices:
keep_boxes.append(boxes[index])
keep_scores.append(scores[index])
keep_class_ids.append(class_ids[index])
if len(keep_boxes) > 0:
keep_boxes = np.vectorize(int)(keep_boxes)
return keep_boxes, keep_scores, keep_class_ids
# get raudom colors
def get_colors(num: int):
colors = []
np.random.seed(0)
for _ in range(num):
color = np.random.randint(0, 256, [3]).astype(np.uint8)
colors.append(color.tolist())
return colors
def gen_frames():
print("gen_frames")
count = 0
# create detection model class for yolox
model = DetectionModel(weight=WEIGHTS)
# init camera
cap = Picamera2()
config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)})
cap.configure(config)
cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain})
cap.start()
# detect objects
score_threshold = SCORE_THRESHOLD
iou_threshold = IOU_THRESHOLD
while True:
print("count = ",count)
start_time_frame = time.perf_counter()
frame = cap.capture_array()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
start_time = time.perf_counter()
class_ids, scores, boxes = \
model(
image=frame,
score_threshold=score_threshold,
iou_threshold=iou_threshold,
)
elapsed_time = time.perf_counter() - start_time
cv2.putText(
frame,
f'{elapsed_time*1000:.2f} ms',
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
cv2.LINE_AA,
)
cv2.putText(
frame,
f'{elapsed_time*1000:.2f} ms',
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 255),
1,
cv2.LINE_AA,
)
# draw objects
num_classes = NUM_CLASSES
colors = get_colors(num_classes)
for box, score, class_id in zip(boxes, scores, class_ids):
color = colors[class_id]
thickness = 2
line_type = cv2.LINE_AA
cv2.rectangle(frame, box, color, thickness, line_type)
#フレームデータをjpgに圧縮
ret, buffer = cv2.imencode('.jpg',frame)
# bytesデータ化
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
elapsed_time_frame = time.perf_counter() - start_time_frame
print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame))
count +=1
@app.route('/video_feed')
def video_feed():
#imgタグに埋め込まれるResponseオブジェクトを返す
return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
@app.route('/index')
def index():
user = {'username' : 'Raspberry Pi zero2 W',
'modelname': WEIGHTS}
return render_template('index.html', title='home', user=user)
実行する
次のコマンドを実行する
flask run --host=0.0.0.0
参照アドレスが表示されるのでブラウザで見てください。
おまけ
pip install onnxしてからこちらのコードだと、onnxモデルが動きます。
動作確認したのは前の記事でダウンロードした6個のモデル。
yolox_ti_body_head_hand_n_1x3x128x160.onnx
yolox_ti_body_head_hand_n_1x3x256x320.onnx
yolox_ti_body_head_hand_n_1x3x480x640.onnx
yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx
yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx
yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx
demoコードの元ネタはPINTO_model_zooのこちら
"""
code cited from: https://qiita.com/UnaNancyOwen/items/650d79c88a58a3cc30ce
"""
from flask import render_template, Flask, Response
from picamera2 import Picamera2
import cv2
import onnx
import time
import numpy as np
from typing import List
app = Flask(__name__)
# params
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320.onnx"
WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x128x160_uint8.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x256x320_uint8.onnx"
# WEIGHTS = "yolox_ti_body_head_hand_n_1x3x480x640_uint8.onnx"
NUM_CLASSES = 3
SCORE_THRESHOLD = 0.60
IOU_THRESHOLD = 0.4
CAP_WIDTH = 640
CAP_HEIGHT = 480
LAW_WIDTH = 2304 #カメラ内のraw画像の幅
LAW_HEIGHT = 1296 #カメラ内のraw画像の高さ
folder_path ="/tmp/img"
movie_length = 100 #撮影するフレーム数
time_list = []
#num_threads = 4 #スレッド数 1-4を選択
exposure_time = 5000 #イメージセンサの露出時間
analog_gain = 10.0 #イメージセンサのgain
# detection model class for yolox
class DetectionModel:
# constructor
def __init__(
self,
*,
weight: str,
):
self.__initialize(weight=weight)
# set preferable backend
def _setPreferableBackend(self, backend):
self._interpreter.setPreferableBackend(backend)
# set preferable target
def _setPreferableTarget(self, target):
self._interpreter.setPreferableTarget(target)
# initialize
def __initialize(
self,
*,
weight: str,
):
onnx_model = onnx.load(f=weight)
self._input_shapes = [
[dim.dim_value for dim in onnx_model.graph.input[0].type.tensor_type.shape.dim]
]
self._input_names = [
input.name for input in onnx_model.graph.input
]
self._output_shapes = [
[dim.dim_value for dim in onnx_model.graph.output[0].type.tensor_type.shape.dim]
]
self._output_names = [
output.name for output in onnx_model.graph.output
]
del onnx_model
self._interpreter = cv2.dnn.readNet(weight)
self._setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
self._setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
self._model = self._interpreter
self._h_index = 2
self._w_index = 3
strides = [8, 16, 32]
self.grids, self.expanded_strides = \
self.__create_grids_and_expanded_strides(strides=strides)
# create grids and expanded strides
def __create_grids_and_expanded_strides(
self,
*,
strides: List[int],
):
grids = []
expanded_strides = []
hsizes = [self._input_shapes[0][self._h_index] // stride for stride in strides]
wsizes = [self._input_shapes[0][self._w_index] // stride for stride in strides]
for hsize, wsize, stride in zip(hsizes, wsizes, strides):
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
grids.append(grid)
shape = grid.shape[:2]
expanded_strides.append(np.full((*shape, 1), stride))
grids = np.concatenate(grids, 1)
expanded_strides = np.concatenate(expanded_strides, 1)
return grids, expanded_strides
# detect objects
def __call__(
self,
*,
image: np.ndarray,
score_threshold: float,
iou_threshold: float,
):
self.image_shape = image.shape
input_blob, resize_ratio_w, resize_ratio_h = self.__preprocess(image=image)
self._model.setInput(input_blob)
output_layer = self._model.getUnconnectedOutLayersNames()[0] # "output"
outputs = self._model.forward(output_layer)
boxes, scores, class_ids = \
self.__postprocess(
output_blob=outputs,
resize_ratio_w=resize_ratio_w,
resize_ratio_h=resize_ratio_h,
)
boxes, scores, class_ids = \
self.__nms(
boxes=boxes,
scores=scores,
class_ids=class_ids,
score_threshold=score_threshold,
iou_threshold=iou_threshold,
)
return class_ids, scores, boxes
# preprocess
def __preprocess(
self,
*,
image: np.ndarray,
):
resize_ratio_w = self._input_shapes[0][self._w_index] / self.image_shape[1]
resize_ratio_h = self._input_shapes[0][self._h_index] / self.image_shape[0]
resized_image = \
cv2.resize(
image,
dsize=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index])
)
input_blob = \
cv2.dnn.blobFromImage(
image=resized_image,
scalefactor=1.0,
size=(self._input_shapes[0][self._w_index], self._input_shapes[0][self._h_index]),
mean=(0.0, 0.0, 0.0),
swapRB=False,
crop=False,
)
return input_blob, resize_ratio_w, resize_ratio_h
# postprocess
def __postprocess(
self,
*,
output_blob: np.ndarray,
resize_ratio_w: float,
resize_ratio_h: float,
):
output_blob[..., :2] = (output_blob[..., :2] + self.grids) * self.expanded_strides
output_blob[..., 2:4] = np.exp(output_blob[..., 2:4]) * self.expanded_strides
predictions: np.ndarray = output_blob[0]
boxes = predictions[:, :4]
boxes_xywh = np.ones_like(boxes)
# yolox-ti
boxes[:, 0] = boxes[:, 0] / resize_ratio_w
boxes[:, 1] = boxes[:, 1] / resize_ratio_h
boxes[:, 2] = boxes[:, 2] / resize_ratio_w
boxes[:, 3] = boxes[:, 3] / resize_ratio_h
boxes_xywh[:, 0] = (boxes[:, 0] - boxes[:, 2] * 0.5)
boxes_xywh[:, 1] = (boxes[:, 1] - boxes[:, 3] * 0.5)
boxes_xywh[:, 2] = ((boxes[:, 0] + boxes[:, 2] * 0.5) - boxes_xywh[:, 0])
boxes_xywh[:, 3] = ((boxes[:, 1] + boxes[:, 3] * 0.5) - boxes_xywh[:, 1])
scores = predictions[:, 4:5] * predictions[:, 5:]
class_ids = scores.argmax(1)
scores = scores[np.arange(len(class_ids)), class_ids]
return boxes_xywh, scores, class_ids
# non maximum suppression
def __nms(
self,
*,
boxes: np.ndarray,
scores: np.ndarray,
class_ids: np.ndarray,
score_threshold: float,
iou_threshold: float,
):
indices = \
cv2.dnn.NMSBoxesBatched(
bboxes=boxes,
scores=scores,
class_ids=class_ids,
score_threshold=score_threshold,
nms_threshold=iou_threshold,
) # OpenCV 4.7.0 or later
keep_boxes = []
keep_scores = []
keep_class_ids = []
for index in indices:
keep_boxes.append(boxes[index])
keep_scores.append(scores[index])
keep_class_ids.append(class_ids[index])
if len(keep_boxes) > 0:
keep_boxes = np.vectorize(int)(keep_boxes)
return keep_boxes, keep_scores, keep_class_ids
# get raudom colors
def get_colors(num: int):
colors = []
np.random.seed(0)
for _ in range(num):
color = np.random.randint(0, 256, [3]).astype(np.uint8)
colors.append(color.tolist())
return colors
# main
def gen_frames():
count = 0
# create detection model class for yolox
model = DetectionModel(weight=WEIGHTS)
# init camera
cap = Picamera2()
config = cap.create_still_configuration(main={"size":(CAP_WIDTH, CAP_HEIGHT)},raw={"size":(LAW_WIDTH,LAW_HEIGHT)})
cap.configure(config)
cap.set_controls({"ExposureTime":exposure_time, "AnalogueGain": analog_gain})
cap.start()
# detect objects
score_threshold = SCORE_THRESHOLD
iou_threshold = IOU_THRESHOLD
while True:
start_time_frame = time.perf_counter()
# get image
image = cap.capture_array()
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# inference
start_time = time.perf_counter()
class_ids, scores, boxes = \
model(
image=image,
score_threshold=score_threshold,
iou_threshold=iou_threshold,
)
elapsed_time = time.perf_counter() - start_time
# draw elapsed time
cv2.putText(
image,
f'{elapsed_time*1000:.2f} ms',
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(255, 255, 255),
2,
cv2.LINE_AA,
)
cv2.putText(
image,
f'{elapsed_time*1000:.2f} ms',
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 255),
1,
cv2.LINE_AA,
)
# draw objects
num_classes = NUM_CLASSES
colors = get_colors(num_classes)
for box, score, class_id in zip(boxes, scores, class_ids):
color = colors[class_id]
thickness = 2
line_type = cv2.LINE_AA
cv2.rectangle(image, box, color, thickness, line_type)
# encode image
ret, buffer = cv2.imencode('.jpg',image)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
elapsed_time_frame = time.perf_counter() - start_time_frame
print("frame_number = " + str(count) + " / time = " + str(elapsed_time_frame))
count +=1
@app.route('/video_feed')
def video_feed():
#imgタグに埋め込まれるResponseオブジェクトを返す
return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
@app.route('/index')
def index():
user = {'username' : 'Raspberry Pi zero2 W',
'modelname': WEIGHTS}
return render_template('index.html', title='home', user=user)
投稿者の人気記事
-
airpocket
さんが
2023/12/20
に
編集
をしました。
(メッセージ: 初版)
-
airpocket
さんが
2023/12/22
に
編集
をしました。
-
airpocket
さんが
2023/12/22
に
編集
をしました。
-
airpocket
さんが
2023/12/22
に
編集
をしました。
-
airpocket
さんが
2024/11/27
に
編集
をしました。
ログインしてコメントを投稿する