Fooping が 2026年01月31日20時40分45秒 に編集
コメント無し
本文の変更
## はじめに この記事は、昨年の Spresense 活用コンテストで制作したロボット「ロスマル」の正統進化バージョンについての解説記事です。 前作の記事はこちら: https://elchika.com/article/26101df9-e828-4ee1-aed3-6a04d3098e14/ 今回は、 ROS2 で動作するロボットユーチューバー版ロスマルを紹介します。 マイク、スピーカー、カメラ、LiDAR、IMU を搭載し、Nav2 による自動走行や、AI を使った対話、シナリオベースでの即興漫才を行うことができます。 ## 作ったもの ロボットユーチューバーによる解説動画 @[youtube](https://www.youtube.com/watch?v=atfLCr3vYIA) ROS2による遠隔コントロール @[x](https://x.com/foopingtech/status/2015329639317790783?s=46)
子供と遊ぶ @[x](https://x.com/foopingtech/status/2017562636658086270?s=46) 展示ブースの紹介エージェント @[x](https://x.com/foopingtech/status/1974344869406769195?s=46)
## 内部写真  - Raspberry Pi 4B(メインコンピュータ)・・・Ubuntu22.04にROS2 humbleをインストール - Raspberry Pi ポータブルディスプレイ(顔表示器) - DDSM210(走行用ブラシレスモータ) - makita 18Vバッテリ(メインバッテリ) - DCDC降圧基板18V-5V(3A品) - YDLIDAR X4 Pro - I2Sマイクモジュール - SPRESENSE メインボードw/6軸IMU(BMI270),HDRカメラ・・・カメラ画像、IMUデータとしてpublish ## Spresenseの使い所 ソニーのSpresenseはmicro rosが使えるので、カメラノードとimuノードから定期的に画像とimuの値をpublishしています。HDRカメラを搭載しているため、暗所でも綺麗に撮影することが可能です。 IMUはROS2のnavigation2で安定して走行するために使用します。 カメラ画像とIMUをpublishするプログラムはこちらです。raspberry pi でmicro-ros-agentを起動して使います。 https://github.com/fooping-tech/Spresense_camera/tree/main/examples/05_microros_camera_imu_dualcore ## 処理の流れ 対話時の処理は以下のような流れで実行します。 ``` 発話音声 -> STT -> NLP -> TTS -> 音声再生,FACE(口パク),MOTION(左右小刻み動作) ``` # プログラム それぞれの機能を担っているノードのプログラムを列挙します。 ## モータ駆動 ```python:motor_node import rclpy from rclpy.node import Node from std_msgs.msg import Float32 from std_srvs.srv import Trigger from geometry_msgs.msg import Twist, Pose2D from nav_msgs.msg import Odometry import serial import time import math ENCODER_RESOLUTION = 4096 def calculate_crc8(data): crc = 0x00 for byte in data: crc ^= byte for _ in range(8): if (crc & 0x01) != 0: crc = (crc >> 1) ^ 0x8C else: crc >>= 1 return crc & 0xFF def create_velocity_command(speed_rpm): if not -210 <= speed_rpm <= 210: raise ValueError("Speed must be between -210 and 210 rpm.") speed_value = int(speed_rpm * 10) high_byte = (speed_value >> 8) & 0xFF low_byte = speed_value & 0xFF command = [0x01, 0x64, high_byte, low_byte, 0x00, 0x00, 0x00, 0x00, 0x00] crc = calculate_crc8(command) command.append(crc) return bytes(command) def create_rotation_count_command(): command = [0x01, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] crc = calculate_crc8(command) command.append(crc) return bytes(command) def parse_rotation_count_response(response): if len(response) < 10: raise ValueError("Invalid response length") rotation_count = int.from_bytes(response[2:6], byteorder='little', signed=True) return rotation_count def parse_velocity_response(response): if len(response) < 10: raise ValueError("Invalid response length") velocity = int.from_bytes(response[2:4], byteorder='little', signed=True) current = int.from_bytes(response[4:6], byteorder='little', signed=True) return velocity, current class DualMotorControlNode(Node): def __init__(self): super().__init__('dual_motor_control_node') self.target_rpm1 = 0.0 self.target_rpm2 = 0.0 self.current_rpm1 = 0.0 self.current_rpm2 = 0.0 self.ser1 = None self.ser2 = None self.port1 = self.declare_parameter('port1', '/dev/ttyUSB_port3').value self.port2 = self.declare_parameter('port2', '/dev/ttyUSB_port4').value self.baudrate = self.declare_parameter('baudrate', 115200).value self.cmd_vel_subscription = self.create_subscription( Twist, 'cmd_vel', self.cmd_vel_callback, 10) self.publisher1 = self.create_publisher(Float32, 'motor1_speed', 10) self.publisher2 = self.create_publisher(Float32, 'motor2_speed', 10) self.odom_publisher = self.create_publisher(Odometry, 'odom', 10) self.distance1_publisher = self.create_publisher(Float32, 'distance1', 10) self.distance2_publisher = self.create_publisher(Float32, 'distance2', 10) self.rotation_count1_publisher = self.create_publisher(Float32, 'rotation_count1', 10) self.delta_rotation1_publisher = self.create_publisher(Float32, 'delta_rotation1', 10) self.wheel_radius_publisher = self.create_publisher(Float32, 'wheel_radius', 10) self.encoder_resolution_publisher = self.create_publisher(Float32, 'encoder_resolution', 10) self.connect_serial() self.wheel_separation = 0.130 self.wheel_radius = 0.036 self.max_rpm = 210.0 self.acceleration = 120.0 # rpm/sec self.recovery_service = self.create_service(Trigger, 'motor_recovery', self.recovery_callback) self.create_timer(5.0, self.check_and_recover_connection) # タイマー周期に合わせて dt を実際の経過時間から計算する self.last_speed_update_time = self.get_clock().now() self.create_timer(0.01, self.update_motor_speed) self.last_time = self.get_clock().now() self.create_timer(0.01, self.publish_odometry) self.pose = Pose2D(x=0.0, y=0.0, theta=0.0) # 最初の回転数は初回に取得する self.last_rotation_count1 = self.get_rotation_count(self.ser1) or 0 self.last_rotation_count2 = self.get_rotation_count(self.ser2) or 0 def connect_serial(self): max_retries = 5 retry_delay = 2.0 for _ in range(max_retries): try: self.ser1 = serial.Serial(self.port1, self.baudrate, timeout=1) self.ser2 = serial.Serial(self.port2, self.baudrate, timeout=1) self.ser1.reset_input_buffer() self.ser2.reset_input_buffer() self.get_logger().info(f"Connected to {self.port1} and {self.port2}") return except serial.SerialException as e: self.get_logger().error(f"Serial error: {e}") time.sleep(retry_delay) self.get_logger().error("Failed to connect to serial ports after multiple attempts") def cmd_vel_callback(self, msg): linear_x = msg.linear.x angular_z = msg.angular.z right_velocity = (linear_x + angular_z * self.wheel_separation / 2.0) / self.wheel_radius left_velocity = (linear_x - angular_z * self.wheel_separation / 2.0) / self.wheel_radius self.target_rpm1 = left_velocity * 60.0 / (2.0 * math.pi) self.target_rpm2 = -right_velocity * 60.0 / (2.0 * math.pi) self.target_rpm1 = max(min(self.target_rpm1, self.max_rpm), -self.max_rpm) self.target_rpm2 = max(min(self.target_rpm2, self.max_rpm), -self.max_rpm) def update_motor_speed(self): current_time = self.get_clock().now() dt = (current_time - self.last_speed_update_time).nanoseconds / 1e9 self.last_speed_update_time = current_time delta_rpm = self.acceleration * dt # Motor 1 の加速制御 if self.target_rpm1 > self.current_rpm1: self.current_rpm1 = min(self.current_rpm1 + delta_rpm, self.target_rpm1) elif self.target_rpm1 < self.current_rpm1: self.current_rpm1 = max(self.current_rpm1 - delta_rpm, self.target_rpm1) # Motor 2 の加速制御 if self.target_rpm2 > self.current_rpm2: self.current_rpm2 = min(self.current_rpm2 + delta_rpm, self.target_rpm2) elif self.target_rpm2 < self.current_rpm2: self.current_rpm2 = max(self.current_rpm2 - delta_rpm, self.target_rpm2) self.control_motor(self.ser1, self.current_rpm1, "Motor 1") self.control_motor(self.ser2, self.current_rpm2, "Motor 2") def control_motor(self, ser, speed, motor_name): if ser is None or not ser.is_open: self.get_logger().warn(f"Serial port for {motor_name} is not open") return try: command = create_velocity_command(speed) ser.write(command) response = ser.read(10) if len(response) == 10 and response[0] == 0x01 and response[1] == 0x64: velocity, current = parse_velocity_response(response) else: self.get_logger().warn(f"Invalid response for {motor_name} velocity command") except ValueError as e: self.get_logger().error(f"{motor_name}: {str(e)}") except serial.SerialException as e: self.get_logger().error(f"{motor_name} Serial error: {str(e)}") self.attempt_recovery(ser, motor_name) def attempt_recovery(self, ser, motor_name): try: ser.close() time.sleep(1) ser.open() self.get_logger().info(f"Successfully recovered {motor_name} connection") except serial.SerialException as e: self.get_logger().error(f"Failed to recover {motor_name} connection: {str(e)}") def recovery_callback(self, request, response): self.get_logger().info("Attempting motor recovery") self.connect_serial() response.success = True response.message = "Recovery attempt completed" return response def check_and_recover_connection(self): if self.ser1 is None or not self.ser1.is_open: self.get_logger().warn("Motor 1 connection lost, attempting recovery") self.attempt_recovery(self.ser1, "Motor 1") if self.ser2 is None or not self.ser2.is_open: self.get_logger().warn("Motor 2 connection lost, attempting recovery") self.attempt_recovery(self.ser2, "Motor 2") def get_rotation_count(self, ser): try: command = create_rotation_count_command() ser.write(command) response = ser.read(10) if len(response) == 10 and response[0] == 0x01 and response[1] == 0x74: rotation_count = parse_rotation_count_response(response) return rotation_count else: self.get_logger().warn("Invalid response for rotation count command") return None except serial.SerialException as e: self.get_logger().error(f"Serial error: {str(e)}") return None def publish_odometry(self): current_time = self.get_clock().now() dt = (current_time - self.last_time).nanoseconds / 1e9 self.last_time = current_time rotation_count1 = self.get_rotation_count(self.ser1) rotation_count2 = self.get_rotation_count(self.ser2) if rotation_count1 is None or rotation_count2 is None: return delta_rotation1 = rotation_count1 - self.last_rotation_count1 delta_rotation2 = rotation_count2 - self.last_rotation_count2 self.last_rotation_count1 = rotation_count1 self.last_rotation_count2 = rotation_count2 # 右側モーターは符号を反転 distance1 = delta_rotation1 * (2 * math.pi * self.wheel_radius) / ENCODER_RESOLUTION distance2 = -delta_rotation2 * (2 * math.pi * self.wheel_radius) / ENCODER_RESOLUTION distance1_msg = Float32() distance1_msg.data = distance1 self.distance1_publisher.publish(distance1_msg) distance2_msg = Float32() distance2_msg.data = distance2 self.distance2_publisher.publish(distance2_msg) linear_velocity = (distance1 + distance2) / (2.0 * dt) angular_velocity = (distance2 - distance1) / (self.wheel_separation * dt) self.pose.x += linear_velocity * math.cos(self.pose.theta) * dt self.pose.y += linear_velocity * math.sin(self.pose.theta) * dt self.pose.theta += angular_velocity * dt odom_msg = Odometry() odom_msg.header.stamp = current_time.to_msg() odom_msg.header.frame_id = "odom" odom_msg.child_frame_id = "base_link" odom_msg.pose.pose.position.x = self.pose.x odom_msg.pose.pose.position.y = self.pose.y odom_msg.pose.pose.orientation.z = math.sin(self.pose.theta / 2.0) odom_msg.pose.pose.orientation.w = math.cos(self.pose.theta / 2.0) odom_msg.twist.twist.linear.x = linear_velocity odom_msg.twist.twist.angular.z = angular_velocity # self.odom_publisher.publish(odom_msg) def main(args=None): rclpy.init(args=args) node = DualMotorControlNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: if node.ser1 is not None and node.ser1.is_open: node.ser1.close() if node.ser2 is not None and node.ser2.is_open: node.ser2.close() node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## 顔の描画 顔はpygameを用いて描画しています。 ```python:face_node #!/usr/bin/env python3 import pygame import sys import random import time import rclpy from rclpy.node import Node from std_msgs.msg import Bool import threading import socket # IPアドレス取得用 # ROS2のサブスクライバクラス class RecordingStatusListener(Node): def __init__(self): super().__init__('recording_status_listener') # 絶対トピック名を指定してディスカバリーの問題を回避 self.subscription = self.create_subscription(Bool, '/recording_status', self.listener_callback, 10) self.recording_status = False # 口制御用サブスクライバの追加 self.mouth_subscription = self.create_subscription(Bool, '/mouth_control', self.mouth_callback, 10) self.mouth_talking = False def listener_callback(self, msg): self.recording_status = msg.data self.get_logger().info(f"Recording status updated: {msg.data}") def mouth_callback(self, msg): self.mouth_talking = msg.data self.get_logger().info(f"Mouth control updated: {msg.data}") # 目のクラス class Eye: def __init__(self, screen, x, y, radius, color, shake_intensity=2): self.screen = screen self.base_x = x self.base_y = y self.radius = radius self.color = color self.shake_intensity = shake_intensity self.is_blinking = False self.blink_start_time = 0 self.blink_duration = 0 def start_blink(self): self.is_blinking = True self.blink_start_time = time.time() self.blink_duration = random.uniform(0.1, 0.2) def update(self): if self.is_blinking and time.time() - self.blink_start_time > self.blink_duration: self.is_blinking = False def draw(self, recording=False): self.update() if recording: # 録画中の場合:目を細め、上を向いてキョロキョロ動く # キョロキョロモーションを大きめにするためシェイク幅を2倍に shake_x = random.uniform(-self.shake_intensity * 2, self.shake_intensity * 2) shake_y = random.uniform(-self.shake_intensity * 2, self.shake_intensity * 2) upward_offset = -self.radius // 4 # 上方向へシフト center_x = int(self.base_x + shake_x) center_y = int(self.base_y + shake_y + upward_offset) # 目を角丸の横長の矩形にして描画 eye_width = int(self.radius * 2.5) # 横に少し伸ばす eye_height = int(self.radius * 0.6) # 縦は細めに rect = pygame.Rect(0, 0, eye_width, eye_height) rect.center = (center_x, center_y) pygame.draw.rect(self.screen, self.color, rect, border_radius=int(self.radius * 0.3)) else: # 通常時:まばたき中は線、そうでなければ通常の円を描画 if self.is_blinking: eye_height = self.radius // 3 pygame.draw.line(self.screen, self.color, (self.base_x - self.radius, self.base_y), (self.base_x + self.radius, self.base_y), eye_height) else: shake_x = random.uniform(-self.shake_intensity, self.shake_intensity) shake_y = random.uniform(-self.shake_intensity, self.shake_intensity) pygame.draw.circle(self.screen, self.color, (int(self.base_x + shake_x), int(self.base_y + shake_y)), self.radius) # 口のクラス(口パクのON/OFF切り替え可能) class Mouth: def __init__(self, screen, x_center, base_width, base_height, y, color): self.screen = screen self.x_center = x_center self.base_width = base_width self.base_height = base_height self.y = y self.color = color self.is_talking = False # 口パクON/OFFフラグ self.talk_start_time = 0 self.talk_duration = 0 self.current_width = base_width self.current_height = base_height def toggle_talking(self): """口パクのON/OFFを切り替える""" self.is_talking = not self.is_talking def start_talking(self): """口パクを開始する""" if self.is_talking: self.talk_start_time = time.time() self.talk_duration = random.uniform(0.5, 1.5) def update(self): if self.is_talking: if time.time() - self.talk_start_time > self.talk_duration: self.start_talking() # 口の幅と高さをランダムに変化 self.current_width = random.randint(int(self.base_width * 0.5), int(self.base_width * 1.5)) self.current_height = random.randint(int(self.base_height * 0.4), int(self.base_height * 2)) else: self.current_width = self.base_width self.current_height = self.base_height // 3 def draw(self, recording=False): self.update() # recording時は目と同じくらい上にシフト(ここでは基準高さの1/4分上へ) upward_offset = -self.base_height // 4 if recording else 0 pygame.draw.rect(self.screen, self.color, (self.x_center - self.current_width // 2, self.y + upward_offset - self.current_height // 2, self.current_width, self.current_height)) # ROS2のスピンを専用スレッドで回す関数 def ros_spin(node): rclpy.spin(node) def get_ip_address(): """現在のIPアドレスを取得する""" try: # ダミー接続を作成してIPアドレスを取得 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) # GoogleのDNSサーバーを使用 ip_address = s.getsockname()[0] s.close() return ip_address except Exception: return "IP取得失敗" def main(): # ROS2の初期化とサブスクライバの作成 rclpy.init(args=None) status_listener = RecordingStatusListener() # 専用スレッドでROS2ノードのスピンを開始 ros_thread = threading.Thread(target=ros_spin, args=(status_listener,), daemon=True) ros_thread.start() pygame.init() screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN) pygame.display.set_caption("Robot Face Fullscreen (ROS2 recording_status)") screen_width, screen_height = screen.get_size() BLACK = (0, 0, 0) WHITE = (255, 255, 255) GRAY = (128, 128, 128) # グレーのRGB値 # IPアドレスを取得 ip_address = get_ip_address() # IPアドレスを表示するためのフォント設定 font = pygame.font.Font(None, 18) # フォントサイズ eye_radius = int(screen_width * 0.1) eye_x_offset = int(screen_width * 0.25) eye_y = int(screen_height * 0.4) left_eye = Eye(screen, screen_width // 2 - eye_x_offset, eye_y, eye_radius, WHITE, shake_intensity=2) right_eye = Eye(screen, screen_width // 2 + eye_x_offset, eye_y, eye_radius, WHITE, shake_intensity=2) mouth_y = int(screen_height * 0.7) base_mouth_width = int(screen_width * 0.2) base_mouth_height = int(screen_height * 0.1) mouth = Mouth(screen, screen_width // 2, base_mouth_width, base_mouth_height, mouth_y, WHITE) clock = pygame.time.Clock() next_blink_time = time.time() + random.uniform(2, 5) next_talk_time = time.time() + random.uniform(1, 4) running = True while running: screen.fill(BLACK) current_time = time.time() # ROSのスピンは専用スレッドが回しているので、最新の状態を取得 recording_status = status_listener.recording_status # ROSから受け取った mouth_control の状態を口のパク状態に反映 mouth.is_talking = status_listener.mouth_talking if current_time >= next_blink_time: left_eye.start_blink() right_eye.start_blink() next_blink_time = current_time + random.uniform(2, 5) if mouth.is_talking and current_time >= next_talk_time: mouth.start_talking() next_talk_time = current_time + random.uniform(1, 4) # recording_statusに応じて目と口の描画モードを切り替え left_eye.draw(recording=recording_status) right_eye.draw(recording=recording_status) mouth.draw(recording=recording_status) # IPアドレスを左上に表示(グレー色で描画) ip_text = font.render(f"IP: {ip_address}", True, GRAY) screen.blit(ip_text, (10, 10)) # 左上に表示(座標: x=10, y=10) pygame.display.flip() clock.tick(60) for event in pygame.event.get(): if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE): running = False elif event.type == pygame.KEYDOWN: # スペースキーは手動で口パク状態を切り替える(ROS以外の制御) if event.key == pygame.K_SPACE: mouth.toggle_talking() pygame.quit() rclpy.shutdown() sys.exit() if __name__ == '__main__': main() ``` ## TTS(text to speech) テキストを音声に変換するノードです。 TTSにはvoicevoxを使用しました。RaspberryPiで音声変換を行うと処理に時間がかかるため、外部リソース(パソコンなど)で変換するのがおすすめです。 ```python:tts_node import rclpy from rclpy.node import Node from rcl_interfaces.msg import SetParametersResult from std_msgs.msg import String, Bool import requests import json import subprocess import tempfile import os import time from ament_index_python.packages import get_package_share_directory class TTSNode(Node): def __init__(self): super().__init__('tts_node') # パラメータ宣言(デフォルト値付き) self.declare_parameter('voicevox.url', "http://YOUR_IP_ADDRESS:50021") self.declare_parameter('voicevox.speaker', 1) self.declare_parameter('voicevox.speed_scale', 1.2) self.declare_parameter('voicevox.volume_scale', 1.0) self.declare_parameter('voicevox.intonation_scale', 1.0) self.declare_parameter('voicevox.pre_phoneme_length', 0.1) self.declare_parameter('voicevox.post_phoneme_length', 0.1) # 動的パラメータ変更用のコールバック登録 self.add_on_set_parameters_callback(self.parameter_callback) # 設定ファイルの読み込み(エラーファイル用) package_share_dir = get_package_share_directory('motor1') self.error_sound_path = os.path.join(package_share_dir, 'sounds', 'error.wav') # 初期パラメータの読み込み self.voicevox_url = self.get_parameter('voicevox.url').get_parameter_value().string_value self.speaker = self.get_parameter('voicevox.speaker').get_parameter_value().integer_value self.speed_scale = self.get_parameter('voicevox.speed_scale').get_parameter_value().double_value self.volume_scale = self.get_parameter('voicevox.volume_scale').get_parameter_value().double_value self.intonation_scale = self.get_parameter('voicevox.intonation_scale').get_parameter_value().double_value self.pre_phoneme_length = self.get_parameter('voicevox.pre_phoneme_length').get_parameter_value().double_value self.post_phoneme_length = self.get_parameter('voicevox.post_phoneme_length').get_parameter_value().double_value # サブスクライバーの設定 self.subscription = self.create_subscription( String, 'tts/text', self.tts_callback, 10 ) # 音声再生状態のパブリッシャー self.playing_publisher = self.create_publisher(Bool, '/mouth_control', 10) self.get_logger().info(f"TTSノード起動(Voicevox URL: {self.voicevox_url}, Speaker: {self.speaker})") def parameter_callback(self, params): """ 動的にパラメータが変更されたときに呼び出されるコールバック """ for param in params: if param.name == 'voicevox.url': self.voicevox_url = param.value self.get_logger().info(f"voicevox.url 更新: {self.voicevox_url}") elif param.name == 'voicevox.speaker': self.speaker = param.value self.get_logger().info(f"voicevox.speaker 更新: {self.speaker}") elif param.name == 'voicevox.speed_scale': self.speed_scale = param.value self.get_logger().info(f"voicevox.speed_scale 更新: {self.speed_scale}") elif param.name == 'voicevox.volume_scale': self.volume_scale = param.value self.get_logger().info(f"voicevox.volume_scale 更新: {self.volume_scale}") elif param.name == 'voicevox.intonation_scale': self.intonation_scale = param.value self.get_logger().info(f"voicevox.intonation_scale 更新: {self.intonation_scale}") elif param.name == 'voicevox.pre_phoneme_length': self.pre_phoneme_length = param.value self.get_logger().info(f"voicevox.pre_phoneme_length 更新: {self.pre_phoneme_length}") elif param.name == 'voicevox.post_phoneme_length': self.post_phoneme_length = param.value self.get_logger().info(f"voicevox.post_phoneme_length 更新: {self.post_phoneme_length}") result = SetParametersResult() result.successful = True result.reason = "" return result def tts_callback(self, msg): text = msg.data self.get_logger().info(f"受信テキスト: {text}") times = {} try: # 1️⃣ 音声合成クエリの取得 start_time = time.time() query_url = f"{self.voicevox_url}/audio_query?speaker={self.speaker}" query_response = requests.post(query_url, params={"text": text}, timeout=5) times["audio_query_request"] = time.time() - start_time if query_response.status_code != 200: raise requests.exceptions.HTTPError(f"HTTPエラー {query_response.status_code}") query_data = query_response.json() query_data["speedScale"] = self.speed_scale query_data["volumeScale"] = self.volume_scale query_data["intonationScale"] = self.intonation_scale query_data["prePhonemeLength"] = self.pre_phoneme_length query_data["postPhonemeLength"] = self.post_phoneme_length # 2️⃣ 音声合成リクエスト start_time = time.time() synthesis_url = f"{self.voicevox_url}/synthesis?speaker={self.speaker}" synthesis_response = requests.post( synthesis_url, headers={"Content-Type": "application/json"}, data=json.dumps(query_data), timeout=10 ) times["synthesis_request"] = time.time() - start_time if synthesis_response.status_code != 200: raise requests.exceptions.HTTPError(f"HTTPエラー {synthesis_response.status_code}") # 3️⃣ 音声を一時ファイルに保存して再生 start_time = time.time() with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp_wav: tmp_wav.write(synthesis_response.content) tmp_wav.flush() times["wav_write"] = time.time() - start_time self.get_logger().info(f"音声再生: {text}") # 🎵 再生開始を通知(True) self.publish_playing_status(True) # 再生 start_time = time.time() subprocess.run(["aplay", tmp_wav.name]) times["playback"] = time.time() - start_time # 🎵 再生終了を通知(False) self.publish_playing_status(False) self.get_logger().info(f"🔹 音声クエリ取得: {times['audio_query_request']:.2f} 秒") self.get_logger().info(f"🔹 音声合成処理: {times['synthesis_request']:.2f} 秒") self.get_logger().info(f"🔹 WAV保存時間: {times['wav_write']:.2f} 秒") self.get_logger().info(f"🔹 音声再生時間: {times['playback']:.2f} 秒") except requests.exceptions.ConnectionError: self.get_logger().error("Voicevoxエラー: 接続に失敗しました。サーバーが動作しているか確認してください。") self.play_error_sound() except requests.exceptions.Timeout: self.get_logger().error("Voicevoxエラー: タイムアウトが発生しました。サーバーの負荷が高い可能性があります。") self.play_error_sound() except requests.exceptions.HTTPError as e: self.get_logger().error(f"Voicevoxエラー: {str(e)}") self.play_error_sound() except json.JSONDecodeError: self.get_logger().error("Voicevoxエラー: サーバーのレスポンスが不正です。") self.play_error_sound() except Exception as e: self.get_logger().error(f"Voicevoxエラー: {str(e)}") self.play_error_sound() def play_error_sound(self): """エラー時に代替WAVファイルを再生""" self.publish_playing_status(True) # エラー音の再生開始を通知 if os.path.exists(self.error_sound_path): self.get_logger().warn(f"エラー音を再生: {self.error_sound_path}") subprocess.run(["aplay", self.error_sound_path]) else: self.get_logger().error(f"エラー音のWAVファイルが見つかりません: {self.error_sound_path}") self.publish_playing_status(False) # エラー音の再生終了を通知 def publish_playing_status(self, is_playing): """音声再生の開始/終了をトピックとして通知""" msg = Bool() msg.data = is_playing self.playing_publisher.publish(msg) status_text = "開始" if is_playing else "終了" self.get_logger().info(f"🔔 音声再生 {status_text} 通知を送信しました") def main(args=None): rclpy.init(args=args) node = TTSNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` モーションノード ロボットが発話中、小刻みに車輪を動かして発話感を増幅します。 ``` #!/usr/bin/env python3 import rclpy from rclpy.node import Node from std_msgs.msg import Bool, Float32 from geometry_msgs.msg import Twist import time import threading import math class TalkingMotionNode(Node): def __init__(self): super().__init__('talking_motion_node') # /mouth_control のサブスクライバー self.subscription = self.create_subscription( Bool, '/mouth_control', self.mouth_control_callback, 10 ) # /cmd_vel のパブリッシャー self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10) self.is_talking = False # 会話中フラグ self.thread = None # 動作スレッド def mouth_control_callback(self, msg): """ /mouth_control のメッセージを処理 """ if msg.data and not self.is_talking: self.is_talking = True self.thread = threading.Thread(target=self.shake_motion) self.thread.start() elif not msg.data: self.is_talking = False def shake_motion(self): """ 会話中に±5 RPMのモーター変動を繰り返す """ base_rpm = 0.0 # 基本回転速度(停止時は0) delta_rpm = 5.0 # 小刻みな変動幅 direction = 1 # 1: 正方向, -1: 負方向 while self.is_talking: current_rpm = base_rpm + direction * delta_rpm direction *= -1 # 方向を反転 # cmd_vel メッセージを作成 twist_msg = Twist() twist_msg.angular.z = math.radians(current_rpm) # 角速度に変換 self.cmd_vel_publisher.publish(twist_msg) time.sleep(0.2) # 200msごとに動作 # 会話終了後に停止 twist_msg = Twist() twist_msg.angular.z = 0.0 self.cmd_vel_publisher.publish(twist_msg) def main(args=None): rclpy.init(args=args) node = TalkingMotionNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## TTSHttpPublisherノード httpサーバーへテキストを送信するとtts/textへpublishします。 Youtuberとして機能させるときは外部リソース(パソコンなど)で発話内容を生成し、このノードで発話用テキストを受け取ります。 ```python:tts_http_node import rclpy from rclpy.node import Node from std_msgs.msg import String from http.server import BaseHTTPRequestHandler, HTTPServer import threading import json class TTSHttpPublisher(Node): def __init__(self): super().__init__('tts_http_publisher') self.publisher_ = self.create_publisher(String, 'tts/text', 10) self.get_logger().info('HTTP TTS Publisher Node started.') # HTTPサーバーを別スレッドで起動 self.server_thread = threading.Thread(target=self.start_http_server) self.server_thread.daemon = True self.server_thread.start() def start_http_server(self): class RequestHandler(BaseHTTPRequestHandler): def do_POST(handler_self): content_length = int(handler_self.headers['Content-Length']) post_data = handler_self.rfile.read(content_length) try: data = json.loads(post_data.decode('utf-8')) text = data.get("text", "") if text: self.get_logger().info(f"HTTPで受信: {text}") msg = String() msg.data = text self.publisher_.publish(msg) handler_self.send_response(200) handler_self.end_headers() handler_self.wfile.write(b"Text received and published.") else: handler_self.send_response(400) handler_self.end_headers() handler_self.wfile.write(b"Missing 'text' in JSON.") except json.JSONDecodeError: handler_self.send_response(400) handler_self.end_headers() handler_self.wfile.write(b"Invalid JSON.") server_address = ('', 8080) httpd = HTTPServer(server_address, RequestHandler) self.get_logger().info("HTTPサーバー起動: ポート8080で待機中") httpd.serve_forever() def main(args=None): rclpy.init(args=args) node = TTSHttpPublisher() try: rclpy.spin(node) except KeyboardInterrupt: pass node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## STTノード RaspberryPiに繋いだi2sマイクから音声を抜き出しテキストに変換します ```python:stt_node #!/usr/bin/env python3 import rclpy from rclpy.node import Node from rcl_interfaces.msg import SetParametersResult from std_msgs.msg import String, Bool import pyaudio import wave import numpy as np import time import sys import webrtcvad import requests from collections import deque import threading class VADSTTPublisher(Node): def __init__(self): super().__init__('vad_stt_publisher') # パラメータの宣言(デフォルト値を指定) self.declare_parameter("server_url", "YOUR_STT_SERVER") self.declare_parameter("vad_mode", 3) self.declare_parameter("silence_duration", 2.0) self.declare_parameter("pre_record_duration", 0.5) # 初期パラメーターを取得 self.server_url = self.get_parameter("server_url").value self.vad_mode = int(self.get_parameter("vad_mode").value) self.silence_duration = float(self.get_parameter("silence_duration").value) self.pre_record_duration = float(self.get_parameter("pre_record_duration").value) # 動的パラメータ変更を監視 self.add_on_set_parameters_callback(self.parameter_callback) self.publisher_ = self.create_publisher(String, 'stt/text', 10) # recording_status用のBool型パブリッシャを追加 self.recording_status_publisher = self.create_publisher(Bool, 'recording_status', 10) self.vad = webrtcvad.Vad() self.vad.set_mode(self.vad_mode) self.audio = pyaudio.PyAudio() self.get_logger().info("VAD STT Publisher ノードが起動しました") # バックグラウンドスレッドで録音ループを開始 self.recording_thread = threading.Thread(target=self.record_loop, daemon=True) self.recording_thread.start() def parameter_callback(self, params): """パラメータの変更をリアルタイムに適用""" for param in params: if param.name == "server_url": self.server_url = param.value self.get_logger().info(f"サーバーURLを変更: {self.server_url}") elif param.name == "vad_mode": self.vad_mode = int(param.value) self.vad.set_mode(self.vad_mode) self.get_logger().info(f"VADモードを変更: {self.vad_mode}") elif param.name == "silence_duration": self.silence_duration = float(param.value) self.get_logger().info(f"無音判定時間を変更: {self.silence_duration} 秒") elif param.name == "pre_record_duration": self.pre_record_duration = float(param.value) self.get_logger().info(f"録音前バッファ時間を変更: {self.pre_record_duration} 秒") return SetParametersResult(successful=True) def publish_recording_status(self, status: bool): """録音状態をBool型メッセージとしてパブリッシュする""" msg = Bool() msg.data = status self.recording_status_publisher.publish(msg) self.get_logger().info(f"recording_statusを {status} に更新") def record_loop(self): """録音→STT送信→パブリッシュを繰り返すループ""" while rclpy.ok(): try: self.get_logger().info("【新しい録音セッション開始】") self.record_and_publish() self.get_logger().info("【録音セッション終了、待機状態に戻ります】") except Exception as e: self.get_logger().error("録音ループ中に例外発生: " + str(e)) time.sleep(0.5) def send_audio_to_server(self, filename): """録音した音声ファイルをSTTサーバーへ送信し、テキストを取得してパブリッシュ""" try: with open(filename, "rb") as audio_file: files = {"file": audio_file} response = requests.post(self.server_url, files=files) text = response.json().get("text", "") self.get_logger().info(f"STT結果: {text}") msg = String() msg.data = text self.publisher_.publish(msg) self.get_logger().info("STTテキストをパブリッシュしました") except Exception as e: self.get_logger().error("音声ファイルの送信に失敗: " + str(e)) def record_and_publish(self): """VAD 録音を実施し、録音完了後に STT 送信・パブリッシュする""" stream = self.audio.open(format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=320) self.get_logger().info("環境ノイズ計測中... 1秒待機") time.sleep(1) self.get_logger().info("待機中... 話しかけると録音開始") pre_frames = deque(maxlen=int((self.pre_record_duration * 16000) / 320)) frames = [] recording = False silence_counter = 0 # 録音開始前は録音状態ではない self.publish_recording_status(False) while rclpy.ok(): data = stream.read(320, exception_on_overflow=False) is_speech = self.vad.is_speech(data, 16000) if is_speech: if not recording: self.get_logger().info("録音開始") recording = True frames.extend(pre_frames) # 録音開始時に状態をTrueにパブリッシュ self.publish_recording_status(True) silence_counter = 0 pre_frames.append(data) if recording: frames.append(data) if not is_speech: silence_counter += 1 if silence_counter > (self.silence_duration * 16000 / 320): self.get_logger().info("録音停止") break stream.stop_stream() stream.close() # 録音が終了したので状態をFalseにパブリッシュ if recording: self.publish_recording_status(False) if frames: output_file = "/tmp/output_vad.wav" waveFile = wave.open(output_file, 'wb') waveFile.setnchannels(1) waveFile.setsampwidth(self.audio.get_sample_size(pyaudio.paInt16)) waveFile.setframerate(16000) waveFile.writeframes(b''.join(frames)) waveFile.close() self.get_logger().info(f"録音した音声を {output_file} に保存しました") self.send_audio_to_server(output_file) else: self.get_logger().warn("録音された音声がありませんでした") def destroy_node(self): self.audio.terminate() super().destroy_node() def main(args=None): rclpy.init(args=args) node = VADSTTPublisher() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("KeyboardInterruptを受信") finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## NLPノード gemini apiを使用してテキストを送信し、応答テキストを生成します。 ```python:nlp_node import rclpy from rclpy.node import Node from std_msgs.msg import String import google.generativeai as genai import yaml import os from ament_index_python.packages import get_package_share_directory # パッケージのシェアディレクトリを取得 package_share_dir = get_package_share_directory('YOUR_DIRECTORY') # secretsディレクトリからAPIキーを読み込む secrets_path = os.path.join(package_share_dir, 'secrets', 'keys.yaml') try: with open(secrets_path, 'r') as f: secrets = yaml.safe_load(f) except Exception as e: raise RuntimeError(f"APIキーの読み込みに失敗しました: {e}") # secretsからAPIキーを取得 gemini_api_key = secrets.get('gemini_api_key') if not gemini_api_key: raise RuntimeError("gemini_api_key が secrets/keys.yaml に定義されていません。") class NLPGeminiNode(Node): """Gemini Generative AI APIを利用するROS2ノード""" def __init__(self): super().__init__('nlp_gemini_node') # APIキー設定 genai.configure(api_key=gemini_api_key) # Geminiモデルを初期化(キャッシュ) self.model = genai.GenerativeModel('gemini-2.0-flash') # STTテキストのサブスクリプションとTTSテキストのパブリッシャーを作成 self.subscription = self.create_subscription( String, 'stt/text', self.stt_callback, 10 ) self.tts_publisher = self.create_publisher(String, 'tts/text', 10) # プロンプト読み込み prompts_path = os.path.join(package_share_dir, 'config', 'prompts.txt') self.prompts = self.load_prompts(prompts_path) self.get_logger().info(f"プロンプト読み込み完了: {list(self.prompts.keys())}") def load_prompts(self, file_path: str) -> dict: """ プロンプトファイルを読み込み、辞書形式で返す。 ファイルは '#' で始まる行でキーが定義され、その後に続く行がプロンプト内容となる。 """ prompts = {} try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() current_key = None current_value = [] for line in lines: line = line.strip() # '#'で始まる行は新しいプロンプトのキーを示す if line.startswith('#'): if current_key is not None: prompts[current_key] = ' '.join(current_value).strip() current_key = line[1:].strip() current_value = [] elif current_key: current_value.append(line) if current_key is not None: prompts[current_key] = ' '.join(current_value).strip() return prompts except Exception as e: self.get_logger().error(f"プロンプト読み込みエラー: {e}") return {} def parse_user_input(self, input_text: str) -> str: """ 受信したSTTメッセージから不要なプレフィックス("data:")を除去して返す。 """ if input_text.startswith("data:"): return input_text.split("data:", 1)[1].strip() return input_text.strip() def stt_callback(self, msg: String) -> None: """ STTテキストメッセージ受信時のコールバック。 受信テキストをパースし、Gemini APIへ問い合わせ、TTSテキストとして応答をパブリッシュする。 """ raw_input = msg.data user_input = self.parse_user_input(raw_input) self.get_logger().info(f"認識テキスト: {user_input}") # 'default' プロンプトを利用(プロンプトが存在しなければデフォルトメッセージを使用) prompt_type = 'default' system_prompt = self.prompts.get(prompt_type, 'あなたはフレンドリーなAIです。') full_prompt = f"{system_prompt}\nユーザー: {user_input}" self.get_logger().info(f"送信プロンプト: {full_prompt}") try: response = self.model.generate_content(full_prompt) if response.text: ai_response = response.text.strip() else: ai_response = "ごめんなさい、うまく答えられませんでした。" self.get_logger().info(f"AI応答: {ai_response}") # AIの応答をTTSパブリッシャーで送信 tts_msg = String() tts_msg.data = ai_response self.tts_publisher.publish(tts_msg) except Exception as e: self.get_logger().error(f"Gemini APIエラー: {e}") tts_msg = String() tts_msg.data = "すみません、ちょっとわかりませんでした。" self.tts_publisher.publish(tts_msg) def main(args=None): rclpy.init(args=args) node = NLPGeminiNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ## おわりに ROS2で動くロボット、ロスマルについて紹介しました。 ぜひ作ってみてくださいね!Youtubeのチャンネル登録もよろしくお願いします。  https://www.youtube.com/@rosmaru_ch