はじめに
この記事は、昨年の Spresense 活用コンテストで制作したロボット「ロスマル」の正統進化バージョンについての解説記事です。
前作の記事はこちら:
https://elchika.com/article/26101df9-e828-4ee1-aed3-6a04d3098e14/
今回は、
ROS2 で動作するロボットユーチューバー版ロスマルを紹介します。
マイク、スピーカー、カメラ、LiDAR、IMU を搭載し、Nav2 による自動走行や、AI を使った対話、シナリオベースでの即興漫才を行うことができます。
作ったもの
ロボットユーチューバーによる解説動画
ROS2による遠隔コントロール
子供と遊ぶ
展示ブースの紹介エージェント
内部写真
- Raspberry Pi 4B(メインコンピュータ)・・・Ubuntu22.04にROS2 humbleをインストール
- Raspberry Pi ポータブルディスプレイ(顔表示器)
- DDSM210(走行用ブラシレスモータ)
- makita 18Vバッテリ(メインバッテリ)
- DCDC降圧基板18V-5V(3A品)
- YDLIDAR X4 Pro
- I2Sマイクモジュール
- SPRESENSE メインボードw/6軸IMU(BMI270),HDRカメラ・・・カメラ画像、IMUデータとしてpublish
Spresenseの使い所
ソニーのSpresenseはmicro rosが使えるので、カメラノードとimuノードから定期的に画像とimuの値をpublishしています。HDRカメラを搭載しているため、暗所でも綺麗に撮影することが可能です。
IMUはROS2のnavigation2で安定して走行するために使用します。
カメラ画像とIMUをpublishするプログラムはこちらです。raspberry pi でmicro-ros-agentを起動して使います。
https://github.com/fooping-tech/Spresense_camera/tree/main/examples/05_microros_camera_imu_dualcore
処理の流れ
対話時の処理は以下のような流れで実行します。
発話音声 -> STT -> NLP -> TTS -> 音声再生,FACE(口パク),MOTION(左右小刻み動作)
プログラム
それぞれの機能を担っているノードのプログラムを列挙します。
モータ駆動
motor_node
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from std_srvs.srv import Trigger
from geometry_msgs.msg import Twist, Pose2D
from nav_msgs.msg import Odometry
import serial
import time
import math
ENCODER_RESOLUTION = 4096
def calculate_crc8(data):
crc = 0x00
for byte in data:
crc ^= byte
for _ in range(8):
if (crc & 0x01) != 0:
crc = (crc >> 1) ^ 0x8C
else:
crc >>= 1
return crc & 0xFF
def create_velocity_command(speed_rpm):
if not -210 <= speed_rpm <= 210:
raise ValueError("Speed must be between -210 and 210 rpm.")
speed_value = int(speed_rpm * 10)
high_byte = (speed_value >> 8) & 0xFF
low_byte = speed_value & 0xFF
command = [0x01, 0x64, high_byte, low_byte, 0x00, 0x00, 0x00, 0x00, 0x00]
crc = calculate_crc8(command)
command.append(crc)
return bytes(command)
def create_rotation_count_command():
command = [0x01, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
crc = calculate_crc8(command)
command.append(crc)
return bytes(command)
def parse_rotation_count_response(response):
if len(response) < 10:
raise ValueError("Invalid response length")
rotation_count = int.from_bytes(response[2:6], byteorder='little', signed=True)
return rotation_count
def parse_velocity_response(response):
if len(response) < 10:
raise ValueError("Invalid response length")
velocity = int.from_bytes(response[2:4], byteorder='little', signed=True)
current = int.from_bytes(response[4:6], byteorder='little', signed=True)
return velocity, current
class DualMotorControlNode(Node):
def __init__(self):
super().__init__('dual_motor_control_node')
self.target_rpm1 = 0.0
self.target_rpm2 = 0.0
self.current_rpm1 = 0.0
self.current_rpm2 = 0.0
self.ser1 = None
self.ser2 = None
self.port1 = self.declare_parameter('port1', '/dev/ttyUSB_port3').value
self.port2 = self.declare_parameter('port2', '/dev/ttyUSB_port4').value
self.baudrate = self.declare_parameter('baudrate', 115200).value
self.cmd_vel_subscription = self.create_subscription(
Twist, 'cmd_vel', self.cmd_vel_callback, 10)
self.publisher1 = self.create_publisher(Float32, 'motor1_speed', 10)
self.publisher2 = self.create_publisher(Float32, 'motor2_speed', 10)
self.odom_publisher = self.create_publisher(Odometry, 'odom', 10)
self.distance1_publisher = self.create_publisher(Float32, 'distance1', 10)
self.distance2_publisher = self.create_publisher(Float32, 'distance2', 10)
self.rotation_count1_publisher = self.create_publisher(Float32, 'rotation_count1', 10)
self.delta_rotation1_publisher = self.create_publisher(Float32, 'delta_rotation1', 10)
self.wheel_radius_publisher = self.create_publisher(Float32, 'wheel_radius', 10)
self.encoder_resolution_publisher = self.create_publisher(Float32, 'encoder_resolution', 10)
self.connect_serial()
self.wheel_separation = 0.130
self.wheel_radius = 0.036
self.max_rpm = 210.0
self.acceleration = 120.0 # rpm/sec
self.recovery_service = self.create_service(Trigger, 'motor_recovery', self.recovery_callback)
self.create_timer(5.0, self.check_and_recover_connection)
# タイマー周期に合わせて dt を実際の経過時間から計算する
self.last_speed_update_time = self.get_clock().now()
self.create_timer(0.01, self.update_motor_speed)
self.last_time = self.get_clock().now()
self.create_timer(0.01, self.publish_odometry)
self.pose = Pose2D(x=0.0, y=0.0, theta=0.0)
# 最初の回転数は初回に取得する
self.last_rotation_count1 = self.get_rotation_count(self.ser1) or 0
self.last_rotation_count2 = self.get_rotation_count(self.ser2) or 0
def connect_serial(self):
max_retries = 5
retry_delay = 2.0
for _ in range(max_retries):
try:
self.ser1 = serial.Serial(self.port1, self.baudrate, timeout=1)
self.ser2 = serial.Serial(self.port2, self.baudrate, timeout=1)
self.ser1.reset_input_buffer()
self.ser2.reset_input_buffer()
self.get_logger().info(f"Connected to {self.port1} and {self.port2}")
return
except serial.SerialException as e:
self.get_logger().error(f"Serial error: {e}")
time.sleep(retry_delay)
self.get_logger().error("Failed to connect to serial ports after multiple attempts")
def cmd_vel_callback(self, msg):
linear_x = msg.linear.x
angular_z = msg.angular.z
right_velocity = (linear_x + angular_z * self.wheel_separation / 2.0) / self.wheel_radius
left_velocity = (linear_x - angular_z * self.wheel_separation / 2.0) / self.wheel_radius
self.target_rpm1 = left_velocity * 60.0 / (2.0 * math.pi)
self.target_rpm2 = -right_velocity * 60.0 / (2.0 * math.pi)
self.target_rpm1 = max(min(self.target_rpm1, self.max_rpm), -self.max_rpm)
self.target_rpm2 = max(min(self.target_rpm2, self.max_rpm), -self.max_rpm)
def update_motor_speed(self):
current_time = self.get_clock().now()
dt = (current_time - self.last_speed_update_time).nanoseconds / 1e9
self.last_speed_update_time = current_time
delta_rpm = self.acceleration * dt
# Motor 1 の加速制御
if self.target_rpm1 > self.current_rpm1:
self.current_rpm1 = min(self.current_rpm1 + delta_rpm, self.target_rpm1)
elif self.target_rpm1 < self.current_rpm1:
self.current_rpm1 = max(self.current_rpm1 - delta_rpm, self.target_rpm1)
# Motor 2 の加速制御
if self.target_rpm2 > self.current_rpm2:
self.current_rpm2 = min(self.current_rpm2 + delta_rpm, self.target_rpm2)
elif self.target_rpm2 < self.current_rpm2:
self.current_rpm2 = max(self.current_rpm2 - delta_rpm, self.target_rpm2)
self.control_motor(self.ser1, self.current_rpm1, "Motor 1")
self.control_motor(self.ser2, self.current_rpm2, "Motor 2")
def control_motor(self, ser, speed, motor_name):
if ser is None or not ser.is_open:
self.get_logger().warn(f"Serial port for {motor_name} is not open")
return
try:
command = create_velocity_command(speed)
ser.write(command)
response = ser.read(10)
if len(response) == 10 and response[0] == 0x01 and response[1] == 0x64:
velocity, current = parse_velocity_response(response)
else:
self.get_logger().warn(f"Invalid response for {motor_name} velocity command")
except ValueError as e:
self.get_logger().error(f"{motor_name}: {str(e)}")
except serial.SerialException as e:
self.get_logger().error(f"{motor_name} Serial error: {str(e)}")
self.attempt_recovery(ser, motor_name)
def attempt_recovery(self, ser, motor_name):
try:
ser.close()
time.sleep(1)
ser.open()
self.get_logger().info(f"Successfully recovered {motor_name} connection")
except serial.SerialException as e:
self.get_logger().error(f"Failed to recover {motor_name} connection: {str(e)}")
def recovery_callback(self, request, response):
self.get_logger().info("Attempting motor recovery")
self.connect_serial()
response.success = True
response.message = "Recovery attempt completed"
return response
def check_and_recover_connection(self):
if self.ser1 is None or not self.ser1.is_open:
self.get_logger().warn("Motor 1 connection lost, attempting recovery")
self.attempt_recovery(self.ser1, "Motor 1")
if self.ser2 is None or not self.ser2.is_open:
self.get_logger().warn("Motor 2 connection lost, attempting recovery")
self.attempt_recovery(self.ser2, "Motor 2")
def get_rotation_count(self, ser):
try:
command = create_rotation_count_command()
ser.write(command)
response = ser.read(10)
if len(response) == 10 and response[0] == 0x01 and response[1] == 0x74:
rotation_count = parse_rotation_count_response(response)
return rotation_count
else:
self.get_logger().warn("Invalid response for rotation count command")
return None
except serial.SerialException as e:
self.get_logger().error(f"Serial error: {str(e)}")
return None
def publish_odometry(self):
current_time = self.get_clock().now()
dt = (current_time - self.last_time).nanoseconds / 1e9
self.last_time = current_time
rotation_count1 = self.get_rotation_count(self.ser1)
rotation_count2 = self.get_rotation_count(self.ser2)
if rotation_count1 is None or rotation_count2 is None:
return
delta_rotation1 = rotation_count1 - self.last_rotation_count1
delta_rotation2 = rotation_count2 - self.last_rotation_count2
self.last_rotation_count1 = rotation_count1
self.last_rotation_count2 = rotation_count2
# 右側モーターは符号を反転
distance1 = delta_rotation1 * (2 * math.pi * self.wheel_radius) / ENCODER_RESOLUTION
distance2 = -delta_rotation2 * (2 * math.pi * self.wheel_radius) / ENCODER_RESOLUTION
distance1_msg = Float32()
distance1_msg.data = distance1
self.distance1_publisher.publish(distance1_msg)
distance2_msg = Float32()
distance2_msg.data = distance2
self.distance2_publisher.publish(distance2_msg)
linear_velocity = (distance1 + distance2) / (2.0 * dt)
angular_velocity = (distance2 - distance1) / (self.wheel_separation * dt)
self.pose.x += linear_velocity * math.cos(self.pose.theta) * dt
self.pose.y += linear_velocity * math.sin(self.pose.theta) * dt
self.pose.theta += angular_velocity * dt
odom_msg = Odometry()
odom_msg.header.stamp = current_time.to_msg()
odom_msg.header.frame_id = "odom"
odom_msg.child_frame_id = "base_link"
odom_msg.pose.pose.position.x = self.pose.x
odom_msg.pose.pose.position.y = self.pose.y
odom_msg.pose.pose.orientation.z = math.sin(self.pose.theta / 2.0)
odom_msg.pose.pose.orientation.w = math.cos(self.pose.theta / 2.0)
odom_msg.twist.twist.linear.x = linear_velocity
odom_msg.twist.twist.angular.z = angular_velocity
# self.odom_publisher.publish(odom_msg)
def main(args=None):
rclpy.init(args=args)
node = DualMotorControlNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.ser1 is not None and node.ser1.is_open:
node.ser1.close()
if node.ser2 is not None and node.ser2.is_open:
node.ser2.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
顔の描画
顔はpygameを用いて描画しています。
face_node
#!/usr/bin/env python3
import pygame
import sys
import random
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
import threading
import socket # IPアドレス取得用
# ROS2のサブスクライバクラス
class RecordingStatusListener(Node):
def __init__(self):
super().__init__('recording_status_listener')
# 絶対トピック名を指定してディスカバリーの問題を回避
self.subscription = self.create_subscription(Bool, '/recording_status', self.listener_callback, 10)
self.recording_status = False
# 口制御用サブスクライバの追加
self.mouth_subscription = self.create_subscription(Bool, '/mouth_control', self.mouth_callback, 10)
self.mouth_talking = False
def listener_callback(self, msg):
self.recording_status = msg.data
self.get_logger().info(f"Recording status updated: {msg.data}")
def mouth_callback(self, msg):
self.mouth_talking = msg.data
self.get_logger().info(f"Mouth control updated: {msg.data}")
# 目のクラス
class Eye:
def __init__(self, screen, x, y, radius, color, shake_intensity=2):
self.screen = screen
self.base_x = x
self.base_y = y
self.radius = radius
self.color = color
self.shake_intensity = shake_intensity
self.is_blinking = False
self.blink_start_time = 0
self.blink_duration = 0
def start_blink(self):
self.is_blinking = True
self.blink_start_time = time.time()
self.blink_duration = random.uniform(0.1, 0.2)
def update(self):
if self.is_blinking and time.time() - self.blink_start_time > self.blink_duration:
self.is_blinking = False
def draw(self, recording=False):
self.update()
if recording:
# 録画中の場合:目を細め、上を向いてキョロキョロ動く
# キョロキョロモーションを大きめにするためシェイク幅を2倍に
shake_x = random.uniform(-self.shake_intensity * 2, self.shake_intensity * 2)
shake_y = random.uniform(-self.shake_intensity * 2, self.shake_intensity * 2)
upward_offset = -self.radius // 4 # 上方向へシフト
center_x = int(self.base_x + shake_x)
center_y = int(self.base_y + shake_y + upward_offset)
# 目を角丸の横長の矩形にして描画
eye_width = int(self.radius * 2.5) # 横に少し伸ばす
eye_height = int(self.radius * 0.6) # 縦は細めに
rect = pygame.Rect(0, 0, eye_width, eye_height)
rect.center = (center_x, center_y)
pygame.draw.rect(self.screen, self.color, rect, border_radius=int(self.radius * 0.3))
else:
# 通常時:まばたき中は線、そうでなければ通常の円を描画
if self.is_blinking:
eye_height = self.radius // 3
pygame.draw.line(self.screen, self.color,
(self.base_x - self.radius, self.base_y),
(self.base_x + self.radius, self.base_y),
eye_height)
else:
shake_x = random.uniform(-self.shake_intensity, self.shake_intensity)
shake_y = random.uniform(-self.shake_intensity, self.shake_intensity)
pygame.draw.circle(self.screen, self.color,
(int(self.base_x + shake_x), int(self.base_y + shake_y)),
self.radius)
# 口のクラス(口パクのON/OFF切り替え可能)
class Mouth:
def __init__(self, screen, x_center, base_width, base_height, y, color):
self.screen = screen
self.x_center = x_center
self.base_width = base_width
self.base_height = base_height
self.y = y
self.color = color
self.is_talking = False # 口パクON/OFFフラグ
self.talk_start_time = 0
self.talk_duration = 0
self.current_width = base_width
self.current_height = base_height
def toggle_talking(self):
"""口パクのON/OFFを切り替える"""
self.is_talking = not self.is_talking
def start_talking(self):
"""口パクを開始する"""
if self.is_talking:
self.talk_start_time = time.time()
self.talk_duration = random.uniform(0.5, 1.5)
def update(self):
if self.is_talking:
if time.time() - self.talk_start_time > self.talk_duration:
self.start_talking()
# 口の幅と高さをランダムに変化
self.current_width = random.randint(int(self.base_width * 0.5), int(self.base_width * 1.5))
self.current_height = random.randint(int(self.base_height * 0.4), int(self.base_height * 2))
else:
self.current_width = self.base_width
self.current_height = self.base_height // 3
def draw(self, recording=False):
self.update()
# recording時は目と同じくらい上にシフト(ここでは基準高さの1/4分上へ)
upward_offset = -self.base_height // 4 if recording else 0
pygame.draw.rect(self.screen, self.color,
(self.x_center - self.current_width // 2, self.y + upward_offset - self.current_height // 2,
self.current_width, self.current_height))
# ROS2のスピンを専用スレッドで回す関数
def ros_spin(node):
rclpy.spin(node)
def get_ip_address():
"""現在のIPアドレスを取得する"""
try:
# ダミー接続を作成してIPアドレスを取得
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80)) # GoogleのDNSサーバーを使用
ip_address = s.getsockname()[0]
s.close()
return ip_address
except Exception:
return "IP取得失敗"
def main():
# ROS2の初期化とサブスクライバの作成
rclpy.init(args=None)
status_listener = RecordingStatusListener()
# 専用スレッドでROS2ノードのスピンを開始
ros_thread = threading.Thread(target=ros_spin, args=(status_listener,), daemon=True)
ros_thread.start()
pygame.init()
screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN)
pygame.display.set_caption("Robot Face Fullscreen (ROS2 recording_status)")
screen_width, screen_height = screen.get_size()
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GRAY = (128, 128, 128) # グレーのRGB値
# IPアドレスを取得
ip_address = get_ip_address()
# IPアドレスを表示するためのフォント設定
font = pygame.font.Font(None, 18) # フォントサイズ
eye_radius = int(screen_width * 0.1)
eye_x_offset = int(screen_width * 0.25)
eye_y = int(screen_height * 0.4)
left_eye = Eye(screen, screen_width // 2 - eye_x_offset, eye_y, eye_radius, WHITE, shake_intensity=2)
right_eye = Eye(screen, screen_width // 2 + eye_x_offset, eye_y, eye_radius, WHITE, shake_intensity=2)
mouth_y = int(screen_height * 0.7)
base_mouth_width = int(screen_width * 0.2)
base_mouth_height = int(screen_height * 0.1)
mouth = Mouth(screen, screen_width // 2, base_mouth_width, base_mouth_height, mouth_y, WHITE)
clock = pygame.time.Clock()
next_blink_time = time.time() + random.uniform(2, 5)
next_talk_time = time.time() + random.uniform(1, 4)
running = True
while running:
screen.fill(BLACK)
current_time = time.time()
# ROSのスピンは専用スレッドが回しているので、最新の状態を取得
recording_status = status_listener.recording_status
# ROSから受け取った mouth_control の状態を口のパク状態に反映
mouth.is_talking = status_listener.mouth_talking
if current_time >= next_blink_time:
left_eye.start_blink()
right_eye.start_blink()
next_blink_time = current_time + random.uniform(2, 5)
if mouth.is_talking and current_time >= next_talk_time:
mouth.start_talking()
next_talk_time = current_time + random.uniform(1, 4)
# recording_statusに応じて目と口の描画モードを切り替え
left_eye.draw(recording=recording_status)
right_eye.draw(recording=recording_status)
mouth.draw(recording=recording_status)
# IPアドレスを左上に表示(グレー色で描画)
ip_text = font.render(f"IP: {ip_address}", True, GRAY)
screen.blit(ip_text, (10, 10)) # 左上に表示(座標: x=10, y=10)
pygame.display.flip()
clock.tick(60)
for event in pygame.event.get():
if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE):
running = False
elif event.type == pygame.KEYDOWN:
# スペースキーは手動で口パク状態を切り替える(ROS以外の制御)
if event.key == pygame.K_SPACE:
mouth.toggle_talking()
pygame.quit()
rclpy.shutdown()
sys.exit()
if __name__ == '__main__':
main()
TTS(text to speech)
テキストを音声に変換するノードです。
TTSにはvoicevoxを使用しました。RaspberryPiで音声変換を行うと処理に時間がかかるため、外部リソース(パソコンなど)で変換するのがおすすめです。
tts_node
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import SetParametersResult
from std_msgs.msg import String, Bool
import requests
import json
import subprocess
import tempfile
import os
import time
from ament_index_python.packages import get_package_share_directory
class TTSNode(Node):
def __init__(self):
super().__init__('tts_node')
# パラメータ宣言(デフォルト値付き)
self.declare_parameter('voicevox.url', "http://YOUR_IP_ADDRESS:50021")
self.declare_parameter('voicevox.speaker', 1)
self.declare_parameter('voicevox.speed_scale', 1.2)
self.declare_parameter('voicevox.volume_scale', 1.0)
self.declare_parameter('voicevox.intonation_scale', 1.0)
self.declare_parameter('voicevox.pre_phoneme_length', 0.1)
self.declare_parameter('voicevox.post_phoneme_length', 0.1)
# 動的パラメータ変更用のコールバック登録
self.add_on_set_parameters_callback(self.parameter_callback)
# 設定ファイルの読み込み(エラーファイル用)
package_share_dir = get_package_share_directory('motor1')
self.error_sound_path = os.path.join(package_share_dir, 'sounds', 'error.wav')
# 初期パラメータの読み込み
self.voicevox_url = self.get_parameter('voicevox.url').get_parameter_value().string_value
self.speaker = self.get_parameter('voicevox.speaker').get_parameter_value().integer_value
self.speed_scale = self.get_parameter('voicevox.speed_scale').get_parameter_value().double_value
self.volume_scale = self.get_parameter('voicevox.volume_scale').get_parameter_value().double_value
self.intonation_scale = self.get_parameter('voicevox.intonation_scale').get_parameter_value().double_value
self.pre_phoneme_length = self.get_parameter('voicevox.pre_phoneme_length').get_parameter_value().double_value
self.post_phoneme_length = self.get_parameter('voicevox.post_phoneme_length').get_parameter_value().double_value
# サブスクライバーの設定
self.subscription = self.create_subscription(
String,
'tts/text',
self.tts_callback,
10
)
# 音声再生状態のパブリッシャー
self.playing_publisher = self.create_publisher(Bool, '/mouth_control', 10)
self.get_logger().info(f"TTSノード起動(Voicevox URL: {self.voicevox_url}, Speaker: {self.speaker})")
def parameter_callback(self, params):
"""
動的にパラメータが変更されたときに呼び出されるコールバック
"""
for param in params:
if param.name == 'voicevox.url':
self.voicevox_url = param.value
self.get_logger().info(f"voicevox.url 更新: {self.voicevox_url}")
elif param.name == 'voicevox.speaker':
self.speaker = param.value
self.get_logger().info(f"voicevox.speaker 更新: {self.speaker}")
elif param.name == 'voicevox.speed_scale':
self.speed_scale = param.value
self.get_logger().info(f"voicevox.speed_scale 更新: {self.speed_scale}")
elif param.name == 'voicevox.volume_scale':
self.volume_scale = param.value
self.get_logger().info(f"voicevox.volume_scale 更新: {self.volume_scale}")
elif param.name == 'voicevox.intonation_scale':
self.intonation_scale = param.value
self.get_logger().info(f"voicevox.intonation_scale 更新: {self.intonation_scale}")
elif param.name == 'voicevox.pre_phoneme_length':
self.pre_phoneme_length = param.value
self.get_logger().info(f"voicevox.pre_phoneme_length 更新: {self.pre_phoneme_length}")
elif param.name == 'voicevox.post_phoneme_length':
self.post_phoneme_length = param.value
self.get_logger().info(f"voicevox.post_phoneme_length 更新: {self.post_phoneme_length}")
result = SetParametersResult()
result.successful = True
result.reason = ""
return result
def tts_callback(self, msg):
text = msg.data
self.get_logger().info(f"受信テキスト: {text}")
times = {}
try:
# 1️⃣ 音声合成クエリの取得
start_time = time.time()
query_url = f"{self.voicevox_url}/audio_query?speaker={self.speaker}"
query_response = requests.post(query_url, params={"text": text}, timeout=5)
times["audio_query_request"] = time.time() - start_time
if query_response.status_code != 200:
raise requests.exceptions.HTTPError(f"HTTPエラー {query_response.status_code}")
query_data = query_response.json()
query_data["speedScale"] = self.speed_scale
query_data["volumeScale"] = self.volume_scale
query_data["intonationScale"] = self.intonation_scale
query_data["prePhonemeLength"] = self.pre_phoneme_length
query_data["postPhonemeLength"] = self.post_phoneme_length
# 2️⃣ 音声合成リクエスト
start_time = time.time()
synthesis_url = f"{self.voicevox_url}/synthesis?speaker={self.speaker}"
synthesis_response = requests.post(
synthesis_url,
headers={"Content-Type": "application/json"},
data=json.dumps(query_data),
timeout=10
)
times["synthesis_request"] = time.time() - start_time
if synthesis_response.status_code != 200:
raise requests.exceptions.HTTPError(f"HTTPエラー {synthesis_response.status_code}")
# 3️⃣ 音声を一時ファイルに保存して再生
start_time = time.time()
with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp_wav:
tmp_wav.write(synthesis_response.content)
tmp_wav.flush()
times["wav_write"] = time.time() - start_time
self.get_logger().info(f"音声再生: {text}")
# 🎵 再生開始を通知(True)
self.publish_playing_status(True)
# 再生
start_time = time.time()
subprocess.run(["aplay", tmp_wav.name])
times["playback"] = time.time() - start_time
# 🎵 再生終了を通知(False)
self.publish_playing_status(False)
self.get_logger().info(f"🔹 音声クエリ取得: {times['audio_query_request']:.2f} 秒")
self.get_logger().info(f"🔹 音声合成処理: {times['synthesis_request']:.2f} 秒")
self.get_logger().info(f"🔹 WAV保存時間: {times['wav_write']:.2f} 秒")
self.get_logger().info(f"🔹 音声再生時間: {times['playback']:.2f} 秒")
except requests.exceptions.ConnectionError:
self.get_logger().error("Voicevoxエラー: 接続に失敗しました。サーバーが動作しているか確認してください。")
self.play_error_sound()
except requests.exceptions.Timeout:
self.get_logger().error("Voicevoxエラー: タイムアウトが発生しました。サーバーの負荷が高い可能性があります。")
self.play_error_sound()
except requests.exceptions.HTTPError as e:
self.get_logger().error(f"Voicevoxエラー: {str(e)}")
self.play_error_sound()
except json.JSONDecodeError:
self.get_logger().error("Voicevoxエラー: サーバーのレスポンスが不正です。")
self.play_error_sound()
except Exception as e:
self.get_logger().error(f"Voicevoxエラー: {str(e)}")
self.play_error_sound()
def play_error_sound(self):
"""エラー時に代替WAVファイルを再生"""
self.publish_playing_status(True) # エラー音の再生開始を通知
if os.path.exists(self.error_sound_path):
self.get_logger().warn(f"エラー音を再生: {self.error_sound_path}")
subprocess.run(["aplay", self.error_sound_path])
else:
self.get_logger().error(f"エラー音のWAVファイルが見つかりません: {self.error_sound_path}")
self.publish_playing_status(False) # エラー音の再生終了を通知
def publish_playing_status(self, is_playing):
"""音声再生の開始/終了をトピックとして通知"""
msg = Bool()
msg.data = is_playing
self.playing_publisher.publish(msg)
status_text = "開始" if is_playing else "終了"
self.get_logger().info(f"🔔 音声再生 {status_text} 通知を送信しました")
def main(args=None):
rclpy.init(args=args)
node = TTSNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
モーションノード
ロボットが発話中、小刻みに車輪を動かして発話感を増幅します。
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool, Float32
from geometry_msgs.msg import Twist
import time
import threading
import math
class TalkingMotionNode(Node):
def __init__(self):
super().__init__('talking_motion_node')
# /mouth_control のサブスクライバー
self.subscription = self.create_subscription(
Bool,
'/mouth_control',
self.mouth_control_callback,
10
)
# /cmd_vel のパブリッシャー
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
self.is_talking = False # 会話中フラグ
self.thread = None # 動作スレッド
def mouth_control_callback(self, msg):
""" /mouth_control のメッセージを処理 """
if msg.data and not self.is_talking:
self.is_talking = True
self.thread = threading.Thread(target=self.shake_motion)
self.thread.start()
elif not msg.data:
self.is_talking = False
def shake_motion(self):
""" 会話中に±5 RPMのモーター変動を繰り返す """
base_rpm = 0.0 # 基本回転速度(停止時は0)
delta_rpm = 5.0 # 小刻みな変動幅
direction = 1 # 1: 正方向, -1: 負方向
while self.is_talking:
current_rpm = base_rpm + direction * delta_rpm
direction *= -1 # 方向を反転
# cmd_vel メッセージを作成
twist_msg = Twist()
twist_msg.angular.z = math.radians(current_rpm) # 角速度に変換
self.cmd_vel_publisher.publish(twist_msg)
time.sleep(0.2) # 200msごとに動作
# 会話終了後に停止
twist_msg = Twist()
twist_msg.angular.z = 0.0
self.cmd_vel_publisher.publish(twist_msg)
def main(args=None):
rclpy.init(args=args)
node = TalkingMotionNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
TTSHttpPublisherノード
httpサーバーへテキストを送信するとtts/textへpublishします。
Youtuberとして機能させるときは外部リソース(パソコンなど)で発話内容を生成し、このノードで発話用テキストを受け取ります。
tts_http_node
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import json
class TTSHttpPublisher(Node):
def __init__(self):
super().__init__('tts_http_publisher')
self.publisher_ = self.create_publisher(String, 'tts/text', 10)
self.get_logger().info('HTTP TTS Publisher Node started.')
# HTTPサーバーを別スレッドで起動
self.server_thread = threading.Thread(target=self.start_http_server)
self.server_thread.daemon = True
self.server_thread.start()
def start_http_server(self):
class RequestHandler(BaseHTTPRequestHandler):
def do_POST(handler_self):
content_length = int(handler_self.headers['Content-Length'])
post_data = handler_self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
text = data.get("text", "")
if text:
self.get_logger().info(f"HTTPで受信: {text}")
msg = String()
msg.data = text
self.publisher_.publish(msg)
handler_self.send_response(200)
handler_self.end_headers()
handler_self.wfile.write(b"Text received and published.")
else:
handler_self.send_response(400)
handler_self.end_headers()
handler_self.wfile.write(b"Missing 'text' in JSON.")
except json.JSONDecodeError:
handler_self.send_response(400)
handler_self.end_headers()
handler_self.wfile.write(b"Invalid JSON.")
server_address = ('', 8080)
httpd = HTTPServer(server_address, RequestHandler)
self.get_logger().info("HTTPサーバー起動: ポート8080で待機中")
httpd.serve_forever()
def main(args=None):
rclpy.init(args=args)
node = TTSHttpPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
STTノード
RaspberryPiに繋いだi2sマイクから音声を抜き出しテキストに変換します
stt_node
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import SetParametersResult
from std_msgs.msg import String, Bool
import pyaudio
import wave
import numpy as np
import time
import sys
import webrtcvad
import requests
from collections import deque
import threading
class VADSTTPublisher(Node):
def __init__(self):
super().__init__('vad_stt_publisher')
# パラメータの宣言(デフォルト値を指定)
self.declare_parameter("server_url", "YOUR_STT_SERVER")
self.declare_parameter("vad_mode", 3)
self.declare_parameter("silence_duration", 2.0)
self.declare_parameter("pre_record_duration", 0.5)
# 初期パラメーターを取得
self.server_url = self.get_parameter("server_url").value
self.vad_mode = int(self.get_parameter("vad_mode").value)
self.silence_duration = float(self.get_parameter("silence_duration").value)
self.pre_record_duration = float(self.get_parameter("pre_record_duration").value)
# 動的パラメータ変更を監視
self.add_on_set_parameters_callback(self.parameter_callback)
self.publisher_ = self.create_publisher(String, 'stt/text', 10)
# recording_status用のBool型パブリッシャを追加
self.recording_status_publisher = self.create_publisher(Bool, 'recording_status', 10)
self.vad = webrtcvad.Vad()
self.vad.set_mode(self.vad_mode)
self.audio = pyaudio.PyAudio()
self.get_logger().info("VAD STT Publisher ノードが起動しました")
# バックグラウンドスレッドで録音ループを開始
self.recording_thread = threading.Thread(target=self.record_loop, daemon=True)
self.recording_thread.start()
def parameter_callback(self, params):
"""パラメータの変更をリアルタイムに適用"""
for param in params:
if param.name == "server_url":
self.server_url = param.value
self.get_logger().info(f"サーバーURLを変更: {self.server_url}")
elif param.name == "vad_mode":
self.vad_mode = int(param.value)
self.vad.set_mode(self.vad_mode)
self.get_logger().info(f"VADモードを変更: {self.vad_mode}")
elif param.name == "silence_duration":
self.silence_duration = float(param.value)
self.get_logger().info(f"無音判定時間を変更: {self.silence_duration} 秒")
elif param.name == "pre_record_duration":
self.pre_record_duration = float(param.value)
self.get_logger().info(f"録音前バッファ時間を変更: {self.pre_record_duration} 秒")
return SetParametersResult(successful=True)
def publish_recording_status(self, status: bool):
"""録音状態をBool型メッセージとしてパブリッシュする"""
msg = Bool()
msg.data = status
self.recording_status_publisher.publish(msg)
self.get_logger().info(f"recording_statusを {status} に更新")
def record_loop(self):
"""録音→STT送信→パブリッシュを繰り返すループ"""
while rclpy.ok():
try:
self.get_logger().info("【新しい録音セッション開始】")
self.record_and_publish()
self.get_logger().info("【録音セッション終了、待機状態に戻ります】")
except Exception as e:
self.get_logger().error("録音ループ中に例外発生: " + str(e))
time.sleep(0.5)
def send_audio_to_server(self, filename):
"""録音した音声ファイルをSTTサーバーへ送信し、テキストを取得してパブリッシュ"""
try:
with open(filename, "rb") as audio_file:
files = {"file": audio_file}
response = requests.post(self.server_url, files=files)
text = response.json().get("text", "")
self.get_logger().info(f"STT結果: {text}")
msg = String()
msg.data = text
self.publisher_.publish(msg)
self.get_logger().info("STTテキストをパブリッシュしました")
except Exception as e:
self.get_logger().error("音声ファイルの送信に失敗: " + str(e))
def record_and_publish(self):
"""VAD 録音を実施し、録音完了後に STT 送信・パブリッシュする"""
stream = self.audio.open(format=pyaudio.paInt16, channels=1, rate=16000,
input=True, frames_per_buffer=320)
self.get_logger().info("環境ノイズ計測中... 1秒待機")
time.sleep(1)
self.get_logger().info("待機中... 話しかけると録音開始")
pre_frames = deque(maxlen=int((self.pre_record_duration * 16000) / 320))
frames = []
recording = False
silence_counter = 0
# 録音開始前は録音状態ではない
self.publish_recording_status(False)
while rclpy.ok():
data = stream.read(320, exception_on_overflow=False)
is_speech = self.vad.is_speech(data, 16000)
if is_speech:
if not recording:
self.get_logger().info("録音開始")
recording = True
frames.extend(pre_frames)
# 録音開始時に状態をTrueにパブリッシュ
self.publish_recording_status(True)
silence_counter = 0
pre_frames.append(data)
if recording:
frames.append(data)
if not is_speech:
silence_counter += 1
if silence_counter > (self.silence_duration * 16000 / 320):
self.get_logger().info("録音停止")
break
stream.stop_stream()
stream.close()
# 録音が終了したので状態をFalseにパブリッシュ
if recording:
self.publish_recording_status(False)
if frames:
output_file = "/tmp/output_vad.wav"
waveFile = wave.open(output_file, 'wb')
waveFile.setnchannels(1)
waveFile.setsampwidth(self.audio.get_sample_size(pyaudio.paInt16))
waveFile.setframerate(16000)
waveFile.writeframes(b''.join(frames))
waveFile.close()
self.get_logger().info(f"録音した音声を {output_file} に保存しました")
self.send_audio_to_server(output_file)
else:
self.get_logger().warn("録音された音声がありませんでした")
def destroy_node(self):
self.audio.terminate()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VADSTTPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("KeyboardInterruptを受信")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
NLPノード
gemini apiを使用してテキストを送信し、応答テキストを生成します。
nlp_node
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import google.generativeai as genai
import yaml
import os
from ament_index_python.packages import get_package_share_directory
# パッケージのシェアディレクトリを取得
package_share_dir = get_package_share_directory('YOUR_DIRECTORY')
# secretsディレクトリからAPIキーを読み込む
secrets_path = os.path.join(package_share_dir, 'secrets', 'keys.yaml')
try:
with open(secrets_path, 'r') as f:
secrets = yaml.safe_load(f)
except Exception as e:
raise RuntimeError(f"APIキーの読み込みに失敗しました: {e}")
# secretsからAPIキーを取得
gemini_api_key = secrets.get('gemini_api_key')
if not gemini_api_key:
raise RuntimeError("gemini_api_key が secrets/keys.yaml に定義されていません。")
class NLPGeminiNode(Node):
"""Gemini Generative AI APIを利用するROS2ノード"""
def __init__(self):
super().__init__('nlp_gemini_node')
# APIキー設定
genai.configure(api_key=gemini_api_key)
# Geminiモデルを初期化(キャッシュ)
self.model = genai.GenerativeModel('gemini-2.0-flash')
# STTテキストのサブスクリプションとTTSテキストのパブリッシャーを作成
self.subscription = self.create_subscription(
String,
'stt/text',
self.stt_callback,
10
)
self.tts_publisher = self.create_publisher(String, 'tts/text', 10)
# プロンプト読み込み
prompts_path = os.path.join(package_share_dir, 'config', 'prompts.txt')
self.prompts = self.load_prompts(prompts_path)
self.get_logger().info(f"プロンプト読み込み完了: {list(self.prompts.keys())}")
def load_prompts(self, file_path: str) -> dict:
"""
プロンプトファイルを読み込み、辞書形式で返す。
ファイルは '#' で始まる行でキーが定義され、その後に続く行がプロンプト内容となる。
"""
prompts = {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
current_key = None
current_value = []
for line in lines:
line = line.strip()
# '#'で始まる行は新しいプロンプトのキーを示す
if line.startswith('#'):
if current_key is not None:
prompts[current_key] = ' '.join(current_value).strip()
current_key = line[1:].strip()
current_value = []
elif current_key:
current_value.append(line)
if current_key is not None:
prompts[current_key] = ' '.join(current_value).strip()
return prompts
except Exception as e:
self.get_logger().error(f"プロンプト読み込みエラー: {e}")
return {}
def parse_user_input(self, input_text: str) -> str:
"""
受信したSTTメッセージから不要なプレフィックス("data:")を除去して返す。
"""
if input_text.startswith("data:"):
return input_text.split("data:", 1)[1].strip()
return input_text.strip()
def stt_callback(self, msg: String) -> None:
"""
STTテキストメッセージ受信時のコールバック。
受信テキストをパースし、Gemini APIへ問い合わせ、TTSテキストとして応答をパブリッシュする。
"""
raw_input = msg.data
user_input = self.parse_user_input(raw_input)
self.get_logger().info(f"認識テキスト: {user_input}")
# 'default' プロンプトを利用(プロンプトが存在しなければデフォルトメッセージを使用)
prompt_type = 'default'
system_prompt = self.prompts.get(prompt_type, 'あなたはフレンドリーなAIです。')
full_prompt = f"{system_prompt}\nユーザー: {user_input}"
self.get_logger().info(f"送信プロンプト: {full_prompt}")
try:
response = self.model.generate_content(full_prompt)
if response.text:
ai_response = response.text.strip()
else:
ai_response = "ごめんなさい、うまく答えられませんでした。"
self.get_logger().info(f"AI応答: {ai_response}")
# AIの応答をTTSパブリッシャーで送信
tts_msg = String()
tts_msg.data = ai_response
self.tts_publisher.publish(tts_msg)
except Exception as e:
self.get_logger().error(f"Gemini APIエラー: {e}")
tts_msg = String()
tts_msg.data = "すみません、ちょっとわかりませんでした。"
self.tts_publisher.publish(tts_msg)
def main(args=None):
rclpy.init(args=args)
node = NLPGeminiNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
おわりに
ROS2で動くロボット、ロスマルについて紹介しました。
ぜひ作ってみてくださいね!Youtubeのチャンネル登録もよろしくお願いします。
https://www.youtube.com/@rosmaru_ch
投稿者の人気記事





-
Fooping
さんが
前の土曜日の19:53
に
編集
をしました。
(メッセージ: 初版)
-
Fooping
さんが
前の土曜日の20:02
に
編集
をしました。
-
Fooping
さんが
前の土曜日の20:40
に
編集
をしました。
ログインしてコメントを投稿する