airpocketのアイコン画像
airpocket 2023年12月01日作成 (2023年12月01日更新) © MIT
セットアップや使用方法 セットアップや使用方法 閲覧数 598
airpocket 2023年12月01日作成 (2023年12月01日更新) © MIT セットアップや使用方法 セットアップや使用方法 閲覧数 598

うわさのYOLOをtfliteにしてPi5でもっと高速に動かす

うわさのYOLOをtfliteにしてPi5でもっと高速に動かす

はじめに

この記事は、うわさのYOLOをRaspberry Pi 5で試すで使ったモデルをonnxからtfliteに変換してさらに高速に動かしてみます。
元ネタは@PINTO03091さんのこちらです。

環境

Raspberry Pi 5
Bookworm 64bit desktop
python 3.11.2

環境を作ります。

うわさのYOLOをRaspberry Pi 5で試すで作ったvenv環境にtfliteのruntimeを入れます。

まずvenvで作った仮想環境に入っておきます。(環境名をonnxにしてしまったのはご愛敬

source onnx/bin/activate

tfliteのruntimeを入れる

適当な作業用フォルダを選んでPINTOさん謹製tfliteを入れます。

$ sudo apt install -y \
swig libjpeg-dev zlib1g-dev python3-dev \
unzip wget python3-pip curl git cmake make
$ pip3 install numpy==1.24.3
$ TFVER=2.12.0
$ PYVER=311
$ ARCH=aarch64
$ pip3 install \
--no-cache-dir \
https://github.com/PINTO0309/TensorflowLite-bin/releases/download/v${TFVER}/tflite_runtime-${TFVER/-/}-cp${PYVER}-none-linux_${ARCH}.whl

以上でtfliteが入りました。

tflite用のモデルを持ってくる

tflite用のモデルもRaspberry Pi上で作りたかったのですが難しかったのでWinPC上でonnxからtfliteへ変換しました。
変換方法はこちらの記事をご覧ください。

以下の7種類のtflite用のモデルへ変換されています、Raspberry Piの作業フォルダへこれらのモデルをコピーしてためしてみます。2つのモデルはうまく動きませんが残る5つのモデルは動きました。

動いたモデル

gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_float16.tflite
gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_float32.tflite
gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_integer_quant.tflite
gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_integer_quant_with_int16_act.tflite
gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_dynamic_range_quant.tflite

デモコード

onnxのデモ用コードを修正してtflite用のdemo_goldyolo_tflite.pyとして使用します。

次の通り動かすと量子化モデル、4スレッド動作で最高5msecを切る速度です。マジか、、、爆速!

python demo_goldyolo_tflite.py -m gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_integer_quant.tflite -th 4

-m で使用モデル、-th でスレッド数(1~4)指定できます。

#!/usr/bin/env python

import copy
import cv2
import time
import numpy as np
from argparse import ArgumentParser
from typing import Tuple, Optional, List
from tflite_runtime.interpreter import Interpreter
import re

class GoldYOLOONNX(object):
    def __init__(
        self,
        model_path      :Optional[str] = 'gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_float16.tflite',
        class_score_th  :Optional[float] = 0.35,
        num_threads     :Optional[int] = 4,
        providers       :Optional[List] = [
            (
                'TensorrtExecutionProvider', {
                    'trt_engine_cache_enable': True,
                    'trt_engine_cache_path': '.',
                    'trt_fp16_enable': True,
                }
            ),
            'CUDAExecutionProvider',
            'CPUExecutionProvider',
        ],
    ):
        """GoldYOLOONNX

        Parameters
        ----------
        model_path: Optional[str]
            ONNX file path for GoldYOLO

        class_score_th: Optional[float]
            Score threshold. Default: 0.35

        providers: Optional[List]
            Name of onnx execution providers
            Default:
            [
                (
                    'TensorrtExecutionProvider', {
                        'trt_engine_cache_enable': True,
                        'trt_engine_cache_path': '.',
                        'trt_fp16_enable': True,
                    }
                ),
                'CUDAExecutionProvider',
                'CPUExecutionProvider',
            ]
        """
        self.interpreter = Interpreter(model_path=model_path, num_threads=num_threads)
        self.interpreter.allocate_tensors()
        # Threshold
        self.class_score_th = class_score_th

        # Model loading
        self.input_details = self.interpreter.get_input_details()
        match = re.search(r"'(.*?)'", str(self.input_details[0]["dtype"]))
        self.str_dtype = match.group(1)[6:]
        self.input_shapes = self.input_details[0]["shape"]

    def __call__(
        self,
        image: np.ndarray,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """YOLOv7ONNX

        Parameters
        ----------
        image: np.ndarray
            Entire image

        Returns
        -------
        boxes: np.ndarray
            Predicted boxes: [N, x1, y1, x2, y2]

        scores: np.ndarray
            Predicted box scores: [N, score]
        """
        temp_image = copy.deepcopy(image)

        # PreProcess
        resized_image = self.__preprocess(
            temp_image,
        )

        # Inference
        inference_image = np.asarray([resized_image], dtype=np.dtype(self.str_dtype))   #float32
        self.interpreter.set_tensor(self.input_details[0]['index'], inference_image)
        self.interpreter.invoke()
        output_details = self.interpreter.get_output_details()
        boxes = self.interpreter.get_tensor(output_details[0]['index'])
        #print(boxes)
        
        # PostProcess
        result_boxes, result_scores = \
            self.__postprocess(
                image=temp_image,
                boxes=boxes,
            )

        return result_boxes, result_scores


    def __preprocess(
        self,
        image: np.ndarray,
        swap: Optional[Tuple[int,int,int]] = (0,1,2),
    ) -> np.ndarray:
        """__preprocess

        Parameters
        ----------
        image: np.ndarray
            Entire image

        swap: tuple
            HWC to CHW: (2,0,1)*
            CHW to HWC: (1,2,0)
            HWC to HWC: (0,1,2)
            CHW to CHW: (0,1,2)

        Returns
        -------
        resized_image: np.ndarray
            Resized and normalized image.
        """
        # Normalization + BGR->RGB
        resized_image = cv2.resize(
            image,
            (
                int(self.input_shapes[2]),
                int(self.input_shapes[1]),
            )
        )
        resized_image = np.divide(resized_image, 255.0)
        resized_image = resized_image[..., ::-1]
        resized_image = resized_image.transpose(swap)
        resized_image = np.ascontiguousarray(
            resized_image,
            dtype=np.float32,
        )
        return resized_image


    def __postprocess(
        self,
        image: np.ndarray,
        boxes: np.ndarray,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """__postprocess

        Parameters
        ----------
        image: np.ndarray
            Entire image.

        boxes: np.ndarray
            float32[N, 7]

        Returns
        -------
        result_boxes: np.ndarray
            Predicted boxes: [N, x1, y1, x2, y2]

        result_scores: np.ndarray
            Predicted box confs: [N, score]
        """
        image_height = image.shape[0]
        image_width = image.shape[1]

        """
        Detector is
            N -> Number of boxes detected
            batchno -> always 0: BatchNo.0

        batchno_classid_x1y1x2y2_score: float32[N,7]
        """
        result_boxes = []
        result_scores = []
        if len(boxes) > 0:
            scores = boxes[:, 6:7]
            keep_idxs = scores[:, 0] > self.class_score_th
            scores_keep = scores[keep_idxs, :]
            boxes_keep = boxes[keep_idxs, :]

            if len(boxes_keep) > 0:
                for box, score in zip(boxes_keep, scores_keep):
                    class_id = int(box[1])
                    x_min = int(max(box[2], 0) * image_width / self.input_shapes[2])
                    y_min = int(max(box[3], 0) * image_height / self.input_shapes[1])
                    x_max = int(min(box[4], self.input_shapes[2]) * image_width / self.input_shapes[2])
                    y_max = int(min(box[5], self.input_shapes[1]) * image_height / self.input_shapes[1])

                    result_boxes.append(
                        [x_min, y_min, x_max, y_max, class_id]
                    )
                    result_scores.append(
                        score
                    )

        return np.asarray(result_boxes), np.asarray(result_scores)


def is_parsable_to_int(s):
    try:
        int(s)
        return True
    except ValueError:
        return False


def main():
    parser = ArgumentParser()
    parser.add_argument(
        '-th',
        '--num_threads',
        type=int,
        default=4,
    )
    parser.add_argument(
        '-m',
        '--model',
        type=str,
        default='gold_yolo_n_body_head_hand_post_0461_0.4428_1x3x128x160_float16.tflite',
    )
    parser.add_argument(
        '-v',
        '--video',
        type=str,
        default="0",
    )
    args = parser.parse_args()
    
    model = GoldYOLOONNX(
        model_path=args.model,
        num_threads=args.num_threads,
    )

    cap = cv2.VideoCapture(
        int(args.video) if is_parsable_to_int(args.video) else args.video
    )
    cap_fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v')
    video_writer = cv2.VideoWriter(
        filename='output.mp4',
        fourcc=fourcc,
        fps=cap_fps,
        frameSize=(w, h),
    )

    while cap.isOpened():
        res, image = cap.read()
        if not res:
            break

        debug_image = copy.deepcopy(image)

        start_time = time.perf_counter()
        boxes, scores = model(debug_image)
        elapsed_time = time.perf_counter() - start_time
        cv2.putText(
            debug_image,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (255, 255, 255),
            2,
            cv2.LINE_AA,
        )
        cv2.putText(
            debug_image,
            f'{elapsed_time*1000:.2f} ms',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.7,
            (0, 0, 255),
            1,
            cv2.LINE_AA,
        )

        for box, score in zip(boxes, scores):
            classid: int = box[4]
            color = (255,255,255)
            if classid == 0:
                color = (255,0,0)
            elif classid == 1:
                color = (0,0,255)
            elif classid == 2:
                color = (0,255,0)
            cv2.rectangle(
                debug_image,
                (box[0], box[1]),
                (box[2], box[3]),
                (255,255,255),
                2,
            )
            cv2.rectangle(
                debug_image,
                (box[0], box[1]),
                (box[2], box[3]),
                color,
                1,
            )
            cv2.putText(
                debug_image,
                f'{score[0]:.2f}',
                (
                    box[0] if box[0]+50 < debug_image.shape[1] else debug_image.shape[1]-50,
                    box[1]-10 if box[1]-25 > 0 else 20
                ),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (255, 255, 255),
                2,
                cv2.LINE_AA,
            )
            cv2.putText(
                debug_image,
                f'{score[0]:.2f}',
                (
                    box[0] if box[0]+50 < debug_image.shape[1] else debug_image.shape[1]-50,
                    box[1]-10 if box[1]-25 > 0 else 20
                ),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                color,
                1,
                cv2.LINE_AA,
            )

        key = cv2.waitKey(1)
        if key == 27: # ESC
            break

        cv2.imshow("test", debug_image)
        video_writer.write(debug_image)

    if video_writer:
        video_writer.release()

    if cap:
        cap.release()

if __name__ == "__main__":
    main()
airpocketのアイコン画像
電子工作、プログラミング、AI、DIY、XR、IoT M5Stack / Raspberry Pi / Arduino / spresense / K210 / ESP32 / Maix / maicro:bit / oculus / Jetson Nano / minipupper etc
ログインしてコメントを投稿する