L_tiemのアイコン画像
L_tiem 2026年01月31日作成 (2026年02月28日更新)
セットアップや使用方法 セットアップや使用方法 閲覧数 79
L_tiem 2026年01月31日作成 (2026年02月28日更新) セットアップや使用方法 セットアップや使用方法 閲覧数 79

Spresenseで実現する制御システム ~システム同定からMPCを見据えたマルチコア実装まで~

Spresenseで実現する制御システム ~システム同定からMPCを見据えたマルチコア実装まで~

0. 目次

  1. はじめに
  2. システムの構成・コンセプト
  3. システム同定および制御テストの流れ
  4. 設計のポイント
  5. 実際に使用したときの様子
  6. 展望
  7. ソースコード抜粋

1. はじめに
経験則に頼ったPID調整は限界が生じると考えています。本プロジェクトでは、SonyのSpresenseが持つ強力な演算能力とマルチコアアーキテクチャを活用し、実機の動特性を数式化する「システム同定」から、そのモデルを用いた「高度な制御」までを一気通貫で行えるプラットフォームを開発しました。 「デジタルツイン」の概念を組み込み開発に持ち込み、PCの計算資源とマイコンのリアルタイム性を高度に融合させることを目指しています。

2. システム構成・コンセプト
本システムでは、PCとマイコンを分離した役割分担(エッジ&コンピューティング)を採用しています。

PC(Python/CustomTkinter): 複雑な行列演算を伴う数理モデルの推定、制御パラメータの最適化、リッチなGUIによる可視化を担当。

Spresense(C++/Arduino): モーターのPWM駆動、エンコーダーの高速サンプリング、およびリアルタイムな制御ループの実行を担当。

ハードウェア構成

Main Board: Spresense Main Board

Extension Board: 信号の入出力およびPC通信用

Actuator: エンコーダー付きDCモーター(FIT0482)

MotorDriver: L298N

ハードウェア構成

ソフトウェア構成

Spresense側: メインコア(状態遷移・通信管理)、サブコア(高速計測・制御)。

PC側: データ解析、PBRS信号生成、ARX/FIR/ARARX同定アルゴリズム、PID最適化シミュレータ。

ソフトウェア構成

  1. 同定処理および制御テストの流れ
    実機の挙動を正確にモデル化するため、以下のステップを踏みます。

信号生成: PC側でM系列信号(PRBS)を生成し、Spresense経由でモーターに印加。

高速計測: Spresenseが入出力データをサンプリング。

データ転送: 独自バイナリプロトコルにより、PCへ欠落のないデータ転送を実行。

モデル推定: ARXモデルを用い、最小二乗法によってパラメータを算出。

同定結果の閲覧:同定結果の適合率および検証データに対する同定結果のグラフの表示

PID最適化:同定したモデルを基にPIDパラメーターの最適化。

制御テストデータの送信:制御テストのために最適化パラメーターをSpresenseへ送信

制御テストの実行:位置制御、速度制御、モデル予測制御を行う。

実行シーケンス

  1. 設計のポイント

① 堅牢なハイブリッド通信プロトコル
PC-Spresense間の通信には、「柔軟なJSON」と「高速なバイナリ」を組み合わせた独自プロトコルを設計しました。

  • 制御コマンド (JSON): 状態遷移やパラメータ設定に使用。

  • 高速データ転送 (バイナリ): 独自パケット構造(Magic Number + Payload + Checksum)を定義。数千点の浮動小数点データをシリアル帯域の限界まで高速に、かつ整合性を保って転送します。

② レイヤー分離とエラーハンドリング
組み込み開発におけるデバッグ効率を最大化するため、ソフトウェアアーキテクチャにレイヤー分離(Transport / Protocol / Domain)を導入しました。AboutError.hpp によりエラー発生箇所を特定し、GUIへ即座に通知する堅牢なシステムとしています。

レイヤーとエラータイプ

#pragma once enum class ErrorLayer { None, Transport, Protocol, Domain }; enum class ErrorCode { None, SerialPortNotOpened, InvalidJSON, JsonSizeExceeded, UnknownError, MissingType, UnknownType, InvalidFormat, MismatchedSize, MissingPayload, UnknownCommand, InvalidParameter, IdentificationFailed, OptimizationFailed, InvalidConfig, SDWriteFailed, SDInitFailed, MPInitFailed, MPSendFailed, WhiteNoiseGenerationFailed, MeasurementFailed, PreprocessingFailed, OutOfMemory, Success, ConfigApplyFailed, ExperimentAlreadyRunning, ExperimentStartFailed, ControlExecutionFailed, ExperimentNotRunning, InvalidStateTransition, OutlierRemovalFailed }; struct ErrorInfo { ErrorLayer layer; ErrorCode code; const char* message; };

③ マルチコア性能の活用(設計思想)
Spresenseの真価を引き出すため、タスクを各コアに独立させる設計を採っています。

Main Core: 通信スタック、UIイベント管理。

Sub Core: タイマー割り込みによるモーターPWM制御、エンコーダー信号の高速デコード。 これにより、PCとの重い通信処理中でも、モーター制御の「決定論的なリアルタイム性」を維持します。

④ MPC(モデル予測制御)の実装に向けて
本プラットフォームの最終目標は、同定したモデルをSpresense側にフィードバックし、制約条件付きの最適化問題を解くMPC(モデル予測制御)の実装です。Spresenseの強力なFPU/DSPを活用することで、小型デバイスによる高度な現代制御の民主化を目指しています。

  1. 実際に使用したときの様子
    現在はメインコアをベースに、PC側のリッチなGUIと連携してシステム同定を行う環境が整っています。

この画面から実験の時間、サンプリング周期、および、実験の信号の生成のパラメーターの設定を行います。

実験設定画面①

サイドバーを下にスライドすると実験設定画面②のように信号を生成し、設定をSpresenseに送信するボタンが表示されます。
このボタンを押すことでSpresenseに設定を送信し、Spresense側で内容が確認された場合、実験の開始ボタンを押すことができるようになり、実験開始ボタンを押すことで実験が開始されます。

実験設定画面②

実験が終了したあと、実験ログにモデル同定タブの前処理ボタンが押すように指示が表示されます。

キャプションを入力できます

モデル同定画面では前処理の実行、およびARXモデルを含むシステム同定を実行できます。

キャプションを入力できます

前処理ボタンを押した後、所望のモデルのボタンを押すことでシステム同定が実行されます。以下はARXモデルのボタンを押したときの画面です。

キャプションを入力できます

以下は、サンプリング時間を30秒として同定を行ったときのモデルの適合率と予測値のグラフです。非常に高い精度で予測がされていることがわかります。

キャプションを入力できます

同定完了後、制御テスト画面です。現在は位置制御、速度制御、モデル予測制御(まだ未完成)を選択できます。

制御テスト画面①

位置制御のPIDパラメーターを最適化した結果以下のようになりました。

位置制御の最適化結果

最適化完了後、制御テスト実行のボタンを押すことで制御テストを実行することができます。

6. 展望

Sub Coreへの完全オフロード: 制御周期の更なる高速化。

オンボードMPC: Spresense内での将来軌道予測計算の実装。

適応制御: 負荷変動に合わせたモデルのリアルタイム更新。

7. 全体のコード

SpresenseとPCのファイルのディレクトリ構成と各ファイルの概要を示す。

7.1 ディレクトリ構成

以下にSpresenseとPCのファイルそれぞれのディレクトリ構成を示す。

Spresense (Arduino Project)
├── system_identification-spresense.ino # プロジェクトのエントリーポイント(Setup/Loop)
└── src/
├── AboutError.hpp # システム全体で共通のエラーコード定義
├── SpresenseFix.h # 特定環境下での互換性維持用ヘッダ

├── communication/ # PCおよびコア間通信の抽象化層
│ ├── IErrorReporter.hpp # エラー通知用のインターフェース定義
│ ├── MPProtocol.hpp # マルチコア間通信(MPCom)用プロトコル
│ ├── BinaryGenerator/ # 計測データのバイナリパケット構築ロジック
│ ├── MPCommunicator/ # サブコアとのデータ・コマンド授受
│ └── SerialCommunicator/ # PC(Python)とのシリアル通信管理

├── control/ # 制御アルゴリズムの実装
│ ├── ControlExecutionEngine # 制御ループの実行スケジュール管理
│ ├── PIDController/ # PID制御の基本クラス
│ ├── MPCController/ # モデル予測制御(将来の拡張用)
│ ├── PIDOptimizer/ # オンボードでのPIDパラメータ最適化
│ └── controls/ # 具体的な制御則(位置・速度・台形加速)

├── experiment/ # システム同定のための実験管理
│ ├── ExperimentController # 同定実験全体のシーケンス管理
│ ├── MeasurementController # 高速サンプリングとバッファリング
│ └── SignalGenerator/ # M系列信号やPRBSなどの同定用信号生成

├── hardware/ # 物理デバイスのドライバ抽象化層
│ ├── EncoderInterface/ # エンコーダーのパルス計数と速度計算
│ └── MotorInterface/ # PWM出力とモータードライバ制御

├── managers/ # リソースおよびシステム状態の管理
│ ├── ConfigManager/ # 実験パラメータや設定値の保持
│ ├── ErrorManager/ # 発生したエラーの収集と上位層への報告
│ ├── SDCardManager/ # 大容量計測データのSD保存・読み出し
│ ├── PreprocessManager/ # 実験データの前処理(フィルタリング等)
│ └── ShutdownManager/ # システムの安全な停止プロセス管理

├── protocol/ # 通信フォーマットの解析
│ └── JsonDispatcher # PCから届いたJSONコマンドを各モジュールへ振り分け

└── state/ # システムの状態管理
├── StateManager # 現在の状態(待機、実験中、エラー等)の管理
└── SystemState.hpp # 状態遷移図に基づいた列挙型定義

以下はPC側のファイルの構成である。

PC (Python)
├── main.py (実行エントリーポイント)
├── comm/ (通信・計算ロジック層)
│ ├── identification_module.py # ARXモデル同定
│ ├── pid_optimizer.py # PIDパラメータ最適化
│ ├── serial_manager.py # バイナリ/JSONハイブリッド通信
│ └── signal_generator.py # M系列/ホワイトノイズ生成
├── ui/ (表示・インターフェース層)
│ ├── main_window.py # メイン画面レイアウト
│ ├── experiment_tab.py # 実験設定タブ
│ ├── data_tab.py # データ解析・同定タブ
│ └── control_tab.py # 制御テスト・MPCタブ
└── utils/ # 共通ユーティリティ

7.2 コードの抜粋

以下にコードの一部を抜粋して掲載する

(a)system_identification-spresense.ino-堅牢なリアルタイム製を支えるステートマシン-

本システムのメインエントリーポイントです。高度な制御システムにおいて、最も避けるべきは「状態の矛盾」による暴走です。本プログラムでは、システムの状態をenum class Stateで厳密に定義し、イベント駆動型のステートマシンとして設計しました。

  • 設計の要点

    • 状態遷移の可視化:待機(idle)から計測(Measuring)、データ送信(SendRawData)に至るまで載フローを中央管理し、予期せぬ動作を排除しています。

    • 軽量なメインループ:loop() 内では状態に応じたハンドラを呼び出すのみにとどめ、実際載制御ロジックはサブモジュールへ委譲することで高い保守性と可読性を確保しています。

    • エラーの集中管理:どのフェーズで異常が発生しても、共通のエラー状態へ遷移し、安全にモーターを停止させた上でPCへ通知する「フェイルセーフ」を実装しています

system_identification-spresense.ino

#include <Arduino.h> #ifdef pgm_read_ptr #undef pgm_read_ptr #endif #define pgm_read_ptr(addr) (*(void * const *)(addr)) #include <SDHCI.h> #include <File.h> #include "src/SpresenseFix.h" #include "src/AboutError.hpp" #include "src/state/SystemState.hpp" #include "src/state/StateManager.hpp" #include "src/managers/ErrorManager/ErrorManager.hpp" #include "src/managers/ConfigManager/ConfigManager.hpp" #include "src/managers/SDCardManager/SDCardManager.hpp" #include "src/managers/PreprocessManager/PreprocessManager.hpp" #include "src/managers/ShutdownManager/ShutdownManager.hpp" #include "src/hardware/MotorInterface/MotorInterface.hpp" #include "src/hardware/EncoderInterface/EncoderInterface.hpp" #include "src/experiment/SignalGenerator/SignalGenerator.hpp" #include "src/experiment/MeasurementController/MeasurementController.hpp" #include "src/experiment/ExperimentController/ExperimentController.hpp" #include "src/communication/SerialCommunicator/SerialCommunicator.hpp" #include "src/communication/MPCommunicator/MPCommunicator.hpp" #include "src/protocol/JsonDispatcher.hpp" #include "src/control/ControlExecutionEngine/ControlExecutionEngine.hpp" const int ENCODER_PIN_A = 2; const int ENCODER_PIN_B = 3; const uint8_t MOTOR_DIR_PIN_A = 4; const uint8_t MOTOR_DIR_PIN_B = 5; const uint8_t MOTOR_SPEED_PIN = 6; const int SERIAL_BAUD_RATE = 921600; StateManager* g_stateManager = nullptr; ErrorManager* g_errorManager = nullptr; SDCardManager* g_sdCardManager = nullptr; ConfigManager* g_configManager = nullptr; PreprocessManager* g_preprocessManager = nullptr; ShutdownManager* g_shutdownManager = nullptr; MotorInterface* g_motorInterface = nullptr; EncoderInterface* g_encoderInterface = nullptr; SignalGenerator* g_signalGenerator = nullptr; MeasurementController* g_measurementController = nullptr; ExperimentController* g_experimentController = nullptr; ControlExecutionEngine* g_controlEngine = nullptr; SerialCommunicator* g_serialCommunicator = nullptr; MPCommunicator* g_mpCommunicator = nullptr; JsonDispatcher* g_jsonDispatcher = nullptr; void setup() { pinMode(MOTOR_DIR_PIN_A, OUTPUT); pinMode(MOTOR_DIR_PIN_B, OUTPUT); pinMode(MOTOR_SPEED_PIN, OUTPUT); digitalWrite(MOTOR_DIR_PIN_A, LOW); digitalWrite(MOTOR_DIR_PIN_B, LOW); analogWrite(MOTOR_SPEED_PIN, 0); Serial.begin(SERIAL_BAUD_RATE); delay(2000); Serial.println("\n\n=== System Initialization Started ==="); Serial.println("BUILD VERSION: 2026-01-12-23:00"); Serial.println("FIX: WaitModelParameters->ExecuteControl allowed, motor=-controlSignal"); Serial.println("[Setup] Motor pins initialized and stopped IMMEDIATELY"); g_stateManager = new StateManager(); g_errorManager = new ErrorManager(); g_sdCardManager = new SDCardManager(*g_errorManager); g_configManager = new ConfigManager(*g_stateManager, *g_errorManager); g_motorInterface = new MotorInterface(MOTOR_DIR_PIN_A, MOTOR_DIR_PIN_B, MOTOR_SPEED_PIN); g_encoderInterface = new EncoderInterface(ENCODER_PIN_A, ENCODER_PIN_B); g_mpCommunicator = new MPCommunicator(*g_errorManager); if (!g_mpCommunicator->init()) { Serial.println("[ERROR] MPCommunicator initialization failed!"); g_errorManager->report(ErrorCode::MPInitFailed); } g_signalGenerator = new SignalGenerator(*g_configManager, *g_sdCardManager, *g_errorManager); g_measurementController = new MeasurementController( *g_configManager, *g_signalGenerator, *g_motorInterface, *g_encoderInterface, *g_sdCardManager, *g_stateManager, *g_errorManager, *g_mpCommunicator ); g_preprocessManager = new PreprocessManager( *g_measurementController, *g_sdCardManager, *g_errorManager ); g_experimentController = new ExperimentController( *g_stateManager, *g_signalGenerator, *g_measurementController, *g_errorManager, *g_preprocessManager ); g_controlEngine = new ControlExecutionEngine( *g_stateManager, *g_errorManager, *g_sdCardManager, *g_motorInterface, *g_encoderInterface, *g_mpCommunicator ); g_shutdownManager = new ShutdownManager( *g_sdCardManager, *g_motorInterface, *g_stateManager, *g_errorManager ); g_serialCommunicator = new SerialCommunicator(SERIAL_BAUD_RATE, *g_errorManager); g_jsonDispatcher = new JsonDispatcher( *g_errorManager, *g_configManager, *g_stateManager, *g_preprocessManager, *g_controlEngine, *g_serialCommunicator, *g_shutdownManager, *g_motorInterface, *g_signalGenerator ); g_serialCommunicator->setDispatcher(*g_jsonDispatcher); g_motorInterface->init(); g_motorInterface->stop(); Serial.println("Motor interface initialized"); g_encoderInterface->init(); EncoderInterface::instance = g_encoderInterface; Serial.println("Encoder interface initialized"); if (!g_sdCardManager->init()) { Serial.println("SD card initialization failed!"); g_errorManager->report(ErrorCode::SDInitFailed); } else { Serial.println("SD card initialized"); } g_serialCommunicator->init(); g_errorManager->setReporter(g_serialCommunicator); Serial.println("Serial communicator initialized"); g_stateManager->transitionTo(State::Idle); Serial.println("State manager initialized - System Ready"); Serial.println("=== System Ready ===\n"); } void loop() { State currentState = g_stateManager->getCurrentState(); if (currentState != State::Measuring) { g_serialCommunicator->receiveJson(); } if (g_mpCommunicator) { g_mpCommunicator->update(); } static State previousState = State::Boot; g_stateManager->update(); if (previousState != currentState) { if (currentState != State::Measuring && currentState != State::ExecuteControl) { g_motorInterface->stop(); Serial.print("[Main] State changed, motor stopped. New state: "); Serial.println((int)currentState); } else { Serial.print("[Main] State changed to: "); Serial.println((int)currentState); } previousState = currentState; } switch (currentState) { case State::GenerateWhiteNoise: case State::Measuring: g_experimentController->update(); break; case State::SendRawData: { static bool dataSent = false; if (!dataSent) { g_motorInterface->stop(); Serial.println("[Main] Motor stopped in SendRawData state"); Serial.println("[Main] Skipping measurement data transmission"); size_t sampleCount = g_measurementController->getSampleCount(); StaticJsonDocument<256> doc; doc["type"] = "status"; doc["command"] = "MEASUREMENT_DATA_COMPLETE"; doc["payload"]["total_samples"] = sampleCount; doc["payload"]["message"] = "測定完了。データ送信をスキップします"; g_serialCommunicator->sendJson(doc); StaticJsonDocument<256> expCompleteDoc; expCompleteDoc["type"] = "experiment_complete"; expCompleteDoc["samples"] = sampleCount; g_serialCommunicator->sendJson(expCompleteDoc); Serial.println("[Main] Sent experiment_complete notification to PC"); Serial.println("[Main] Transitioning to WaitOutlierSelection"); g_stateManager->transitionTo(State::WaitOutlierSelection); dataSent = true; } if (currentState != State::SendRawData) { dataSent = false; } break; } case State::WaitModelParameters: { static bool waitMessageShown = false; if (!waitMessageShown) { Serial.println("[Main] Waiting for model parameters from PC..."); waitMessageShown = true; } if (currentState != State::WaitModelParameters) { waitMessageShown = false; } delay(100); break; } case State::IdentifyModel: { delay(10); break; } case State::SendModel: { static bool modelProcessed = false; if (!modelProcessed) { Serial.println("[Main] Model sent, skipping PID optimization for now"); Serial.println("[Main] Transitioning to WaitControlCommand"); g_stateManager->transitionTo(State::WaitControlCommand); modelProcessed = true; } if (currentState != State::SendModel) { modelProcessed = false; } break; } case State::WaitControlCommand: delay(10); break; case State::ExecuteControl: { Serial.println("[Main] Control execution complete, transitioning to SendControlLog"); g_stateManager->transitionTo(State::SendControlLog); break; } case State::SendControlLog: { static bool logSent = false; if (!logSent) { Serial.println("[Main] Sending control log"); const auto& controlLog = g_controlEngine->getControlLog(); const char* modelType = g_controlEngine->getModelType(); if (controlLog.empty()) { Serial.println("[Main] WARNING: Control log is empty (SubCore execution doesn't log to MainCore yet)"); StaticJsonDocument<256> doc; doc["type"] = "control_log"; doc["status"] = "completed"; doc["message"] = "Control executed on SubCore (detailed log not available)"; doc["samples"] = 0; doc["model_type"] = modelType; g_serialCommunicator->sendJson(doc); } else { bool success = g_controlEngine->wasLastControlSuccessful(); g_serialCommunicator->sendControlLog(controlLog, modelType); StaticJsonDocument<256> statusDoc; statusDoc["type"] = "control_status"; statusDoc["model_type"] = modelType; statusDoc["success"] = success; statusDoc["message"] = success ? "Control completed successfully" : "Control timeout - partial data collected"; statusDoc["samples"] = (int)controlLog.size(); g_serialCommunicator->sendJson(statusDoc); } Serial.println("[Main] Control log sent, returning to WaitControlCommand"); g_stateManager->transitionTo(State::WaitControlCommand); logSent = true; } if (currentState != State::SendControlLog) { logSent = false; } break; } case State::Error: Serial.println("System in Error state"); g_motorInterface->stop(); delay(1000); break; default: delay(10); break; } }

(b) JsonDispatcher.cpp/ .hpp -柔軟な拡張をもたらすインテリジェント・インターフェース-

PC側(Python)からの命令を解釈し、適切なハードウェア処理へと橋渡しをする「知能的な窓口」です。単なるシリアル受信に留まらず、オブジェクト指向に基づいたディスパッチ(振り分け)を行っています。

  • 設計の要点

    • JSONプロトコルの採用: 命令セットにJSON形式を採用することで、サンプリングレートや信号振幅といった複雑なパラメータ変更に対しても、ハードウェア側のコードを書き換えることなく柔軟に対応可能です。

    • 疎結合なアーキテクチャ: 受信したコマンドに基づいて、実験管理(ExperimentController)や設定管理(ConfigManager)などの各専門モジュールへ命令を分配します。これにより、通信仕様の変更が制御ロジックに影響を与えない「疎結合」な設計を実現しました。

    • リアルタイム性の配慮: 処理負荷の高いパース処理と、極めて高い精度が求められるモーター制御処理が干渉しないよう、将来的なマルチコア展開を見据えたタスク分離のハブとして機能します。

JsonDispatcher.hpp

#pragma once #include <Arduino.h> #include "../SpresenseFix.h" #ifdef pgm_read_ptr #undef pgm_read_ptr #define pgm_read_ptr(addr) (*(void * const *)(addr)) #endif #include <ArduinoJson.h> class ErrorManager; class ConfigManager; class StateManager; class PreprocessManager; class ControlExecutionEngine; class SerialCommunicator; class ShutdownManager; class MotorInterface; class SignalGenerator; class JsonDispatcher { public: JsonDispatcher( ErrorManager& errorManager, ConfigManager& configManager, StateManager& stateManager, PreprocessManager& preprocessManager, ControlExecutionEngine& controlEngine, SerialCommunicator& serialCommunicator, ShutdownManager& shutdownManager, MotorInterface& motorInterface, SignalGenerator& signalGenerator ); void dispatch(const JsonDocument& doc); private: ErrorManager& errorManager; ConfigManager& configManager; StateManager& stateManager; PreprocessManager& preprocessManager; ControlExecutionEngine& controlEngine; SerialCommunicator& serialCommunicator; ShutdownManager& shutdownManager; MotorInterface& motorInterface; SignalGenerator& signalGenerator; void handleCommand(const JsonDocument& doc); void handleExecutionRequest(const JsonDocument& doc); void handleSetConfig(const JsonDocument& doc); void handleRemoveOutliers(const JsonDocument& doc); void handleExecuteControl(const JsonDocument& doc); void handleOptimizePID(const JsonDocument& doc); void handleShutdown(const JsonDocument& doc); void handleFetchPreprocessData(const JsonDocument& doc); void handleSetSignal(const JsonDocument& doc); void handleSignalComplete(const JsonDocument& doc); void handleSetModel(const JsonDocument& doc); void handleExecuteMPC(const JsonDocument& doc); void handleStartExperiment(const JsonDocument& doc); void handleTerminate(const JsonDocument& doc); private: static const size_t MAX_SIGNAL_SIZE = 6000; float signalBuffer[MAX_SIGNAL_SIZE]; size_t signalSize = 0; size_t expectedTotalSamples = 0; size_t receivedChunks = 0; size_t expectedTotalChunks = 0; };

JsonDispatcher.cpp

#include "JsonDispatcher.hpp" #include "../managers/ErrorManager/ErrorManager.hpp" #include "../managers/ConfigManager/ConfigManager.hpp" #include "../state/StateManager.hpp" #include "../state/SystemState.hpp" #include "../managers/PreprocessManager/PreprocessManager.hpp" #include "../managers/SDCardManager/SDCardManager.hpp" #include "../managers/ShutdownManager/ShutdownManager.hpp" #include "../control/ControlExecutionEngine/ControlExecutionEngine.hpp" #include "../communication/SerialCommunicator/SerialCommunicator.hpp" #include "../hardware/MotorInterface/MotorInterface.hpp" #include "../experiment/SignalGenerator/SignalGenerator.hpp" #include <string.h> #include <malloc.h> JsonDispatcher::JsonDispatcher( ErrorManager& errorManager, ConfigManager& configManager, StateManager& stateManager, PreprocessManager& preprocessManager, ControlExecutionEngine& controlEngine, SerialCommunicator& serialCommunicator, ShutdownManager& shutdownManager, MotorInterface& motorInterface, SignalGenerator& signalGenerator ) : errorManager(errorManager), configManager(configManager), stateManager(stateManager), preprocessManager(preprocessManager), controlEngine(controlEngine), serialCommunicator(serialCommunicator), shutdownManager(shutdownManager), motorInterface(motorInterface), signalGenerator(signalGenerator) { } void JsonDispatcher::dispatch(const JsonDocument& doc) { if (!doc["type"].is<const char*>()) { errorManager.report(ErrorCode::MissingType); return; } if (!doc["command"].is<const char*>()) { errorManager.report(ErrorCode::InvalidFormat); return; } if (!doc["request_id"].is<int>()) { errorManager.report(ErrorCode::InvalidFormat); return; } const char* type = doc["type"]; if (strcmp(type, "command") == 0) { handleCommand(doc); } else if (strcmp(type, "execution_request") == 0) { handleExecutionRequest(doc); } else { errorManager.report(ErrorCode::UnknownType); } } void JsonDispatcher::handleCommand(const JsonDocument& doc) { const char* command = doc["command"]; if (strcmp(command, "SET_CONFIG") == 0) { handleSetConfig(doc); } else if (strcmp(command, "REMOVE_OUTLIERS") == 0) { handleRemoveOutliers(doc); } else if (strcmp(command, "EXECUTE_CONTROL") == 0) { handleExecuteControl(doc); } else if (strcmp(command, "OPTIMIZE_PID") == 0) { handleOptimizePID(doc); } else if (strcmp(command, "SHUTDOWN") == 0) { handleShutdown(doc); } else if (strcmp(command, "FETCH_PREPROCESS_DATA") == 0) { handleFetchPreprocessData(doc); } else if (strcmp(command, "SET_SIGNAL") == 0) { handleSetSignal(doc); } else if (strcmp(command, "SIGNAL_COMPLETE") == 0) { handleSignalComplete(doc); } else if (strcmp(command, "SET_MODEL") == 0) { handleSetModel(doc); } else if (strcmp(command, "EXECUTE_MPC") == 0) { handleExecuteMPC(doc); } else { errorManager.report(ErrorCode::UnknownCommand); } } void JsonDispatcher::handleSetConfig(const JsonDocument& doc) { Serial.println("[JsonDispatcher] handleSetConfig called"); if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: payload not found"); errorManager.report(ErrorCode::InvalidFormat); return; } Serial.print("[JsonDispatcher] payload type: "); Serial.println(doc["payload"].is<JsonObject>() ? "Object" : "Not Object"); JsonObjectConst payload = doc["payload"].as<JsonObjectConst>(); Serial.println("[JsonDispatcher] Applying config..."); if (!configManager.applyConfig(payload)) { Serial.println("[JsonDispatcher] ERROR: Config apply failed"); errorManager.report(ErrorCode::ConfigApplyFailed); return; } Serial.println("[JsonDispatcher] Config applied successfully, sending response"); stateManager.transitionTo(State::Configured); StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Config applied successfully"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] Response sent"); } void JsonDispatcher::handleRemoveOutliers(const JsonDocument& doc) { Serial.println("[JsonDispatcher] REMOVE_OUTLIERS command received - SKIPPING"); Serial.println("[JsonDispatcher] Initializing processed data from measurement data without outlier removal"); if (!preprocessManager.initializeFromMeasurement()) { errorManager.report(ErrorCode::PreprocessingFailed); return; } if (!preprocessManager.computeVelocityAndNormalize()) { errorManager.report(ErrorCode::PreprocessingFailed); return; } Serial.println("[JsonDispatcher] Preprocessing completed without outlier removal"); StaticJsonDocument<256> progressDoc; progressDoc["type"] = "status"; progressDoc["command"] = "PROGRESS"; progressDoc["payload"]["stage"] = "preprocessing_complete"; progressDoc["payload"]["message"] = "前処理完了(外れ値除去なし)"; serialCommunicator.sendJson(progressDoc); StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Preprocessing completed without outlier removal"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] Transitioning to IdentifyModel state"); stateManager.transitionTo(State::IdentifyModel); } void JsonDispatcher::handleExecuteControl(const JsonDocument& doc) { Serial.println("[JsonDispatcher] handleExecuteControl called"); State currentState = stateManager.getCurrentState(); if (currentState == State::ExecuteControl) { Serial.println("[JsonDispatcher] Already in ExecuteControl, aborting current control"); stateManager.transitionTo(State::SendControlLog); StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "pending"; response["message"] = "Previous control aborted, please retry"; serialCommunicator.sendJson(response); return; } if (currentState != State::WaitControlCommand && currentState != State::ExecuteControl) { Serial.print("[JsonDispatcher] WARNING: EXECUTE_CONTROL called from unexpected state: "); Serial.println(static_cast<int>(currentState)); if (currentState == State::Configured) { Serial.println("[JsonDispatcher] Transitioning from Configured to WaitControlCommand"); stateManager.transitionTo(State::WaitControlCommand); stateManager.update(); } } if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: payload not found"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonVariantConst payloadVariant = doc["payload"]; if (!payloadVariant.is<JsonObjectConst>()) { Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = payloadVariant.as<JsonObjectConst>(); Serial.println("[JsonDispatcher] payload validated successfully"); if (!controlEngine.execute(payload)) { errorManager.report(ErrorCode::ControlExecutionFailed); return; } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Control executed successfully"; serialCommunicator.sendJson(response); } void JsonDispatcher::handleStartExperiment(const JsonDocument& doc) { State currentState = stateManager.getCurrentState(); if (currentState == State::GenerateWhiteNoise || currentState == State::Measuring || currentState == State::SendRawData || currentState == State::IdentifyModel || currentState == State::SendModel) { errorManager.report(ErrorCode::ExperimentAlreadyRunning); return; } stateManager.transitionTo(State::GenerateWhiteNoise); StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Experiment started"; serialCommunicator.sendJson(response); } void JsonDispatcher::handleTerminate(const JsonDocument& doc) { Serial.println("[JsonDispatcher] TERMINATE command received"); State currentState = stateManager.getCurrentState(); if (currentState == State::Idle || currentState == State::Error || currentState == State::Boot) { errorManager.report(ErrorCode::ExperimentNotRunning); return; } Serial.println("[JsonDispatcher] Stopping motor"); motorInterface.stop(); stateManager.transitionTo(State::Idle); if (doc["payload"].is<JsonObject>()) { JsonObjectConst payload = doc["payload"].as<JsonObjectConst>(); if (payload["format_sd"].is<bool>() && payload["format_sd"].as<bool>()) { stateManager.requestSDFormat(); } } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Experiment terminated"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] Experiment terminated, motor stopped"); } void JsonDispatcher::handleExecutionRequest(const JsonDocument& doc) { const char* command = doc["command"]; if (strcmp(command, "START_EXPERIMENT") == 0) { handleStartExperiment(doc); } else if (strcmp(command, "TERMINATE") == 0) { handleTerminate(doc); } else { errorManager.report(ErrorCode::UnknownCommand); } } void JsonDispatcher::handleOptimizePID(const JsonDocument& doc) { if (!doc["payload"].is<JsonObject>()) { errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = doc["payload"].as<JsonObjectConst>(); if (!controlEngine.optimizePID(payload)) { errorManager.report(ErrorCode::ControlExecutionFailed); return; } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "PID optimization completed"; serialCommunicator.sendJson(response); } void JsonDispatcher::handleShutdown(const JsonDocument& doc) { bool formatSD = false; if (doc["payload"].is<JsonObject>()) { JsonObjectConst payload = doc["payload"].as<JsonObjectConst>(); if (payload["format_sd"].is<bool>()) { formatSD = payload["format_sd"].as<bool>(); } } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = formatSD ? "Shutting down with SD format" : "Shutting down"; serialCommunicator.sendJson(response); delay(100); if (!shutdownManager.shutdown(formatSD)) { errorManager.report(ErrorCode::SDWriteFailed); } } void JsonDispatcher::handleFetchPreprocessData(const JsonDocument& doc) { Serial.println("[JsonDispatcher] FETCH_PREPROCESS_DATA command received"); Serial.println("[JsonDispatcher] NEW ARCHITECTURE: Sending position data to PC (velocity calculated on PC)"); StaticJsonDocument<256> progressDoc; progressDoc["type"] = "status"; progressDoc["command"] = "PROGRESS"; progressDoc["payload"]["stage"] = "fetching_data"; progressDoc["payload"]["message"] = "前処理データをPC側に送信中..."; serialCommunicator.sendJson(progressDoc); const auto& processedData = preprocessManager.getProcessedData(); if (processedData.input.empty() || processedData.encoder.empty()) { Serial.println("[JsonDispatcher] ERROR: No preprocessed data available"); errorManager.report(ErrorCode::InvalidConfig); return; } Serial.print("[JsonDispatcher] Sending "); Serial.print(processedData.input.size()); Serial.println(" samples to PC"); struct mallinfo mi = mallinfo(); Serial.print("[JsonDispatcher] Free heap before sending: "); Serial.println(mi.fordblks); const size_t CHUNK_SIZE = 100; size_t totalSamples = processedData.input.size(); size_t chunks = (totalSamples + CHUNK_SIZE - 1) / CHUNK_SIZE; Serial.print("[JsonDispatcher] Sending in "); Serial.print(chunks); Serial.println(" chunks"); for (size_t chunk = 0; chunk < chunks; ++chunk) { size_t start = chunk * CHUNK_SIZE; size_t end = std::min(start + CHUNK_SIZE, totalSamples); size_t chunkLen = end - start; StaticJsonDocument<4096> chunkDoc; chunkDoc["type"] = "preprocessed_data_chunk"; chunkDoc["chunk_index"] = chunk; chunkDoc["total_chunks"] = chunks; chunkDoc["chunk_size"] = chunkLen; chunkDoc["total_samples"] = totalSamples; JsonArray inputArray = chunkDoc.createNestedArray("input"); JsonArray positionArray = chunkDoc.createNestedArray("position"); JsonArray timeArray = chunkDoc.createNestedArray("time_ms"); for (size_t i = start; i < end; ++i) { inputArray.add(processedData.input[i]); positionArray.add(processedData.encoder[i]); timeArray.add(processedData.time_ms[i]); } if (chunk == 0) { JsonObject stats = chunkDoc.createNestedObject("stats"); stats["input_mean"] = processedData.input_mean; stats["input_std"] = processedData.input_std; stats["velocity_mean"] = processedData.velocity_mean; stats["velocity_std"] = processedData.velocity_std; } serialCommunicator.sendJson(chunkDoc); Serial.print("[JsonDispatcher] Sent chunk "); Serial.print(chunk + 1); Serial.print("/"); Serial.println(chunks); delay(50); } StaticJsonDocument<256> completeDoc; completeDoc["type"] = "status"; completeDoc["command"] = "PREPROCESS_DATA_SENT"; completeDoc["payload"]["total_samples"] = totalSamples; completeDoc["payload"]["message"] = "前処理データ送信完了。PC側で同定を実行してください"; serialCommunicator.sendJson(completeDoc); StaticJsonDocument<256> preprocessCompleteDoc; preprocessCompleteDoc["type"] = "preprocess_complete"; preprocessCompleteDoc["samples"] = totalSamples; serialCommunicator.sendJson(preprocessCompleteDoc); Serial.println("[JsonDispatcher] Sent preprocess_complete notification to PC"); StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "Preprocessed data sent to PC"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] All data sent to PC successfully"); stateManager.transitionTo(State::WaitModelParameters); } void JsonDispatcher::handleSetSignal(const JsonDocument& doc) { Serial.println("[JsonDispatcher] SET_SIGNAL command received"); if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: Missing payload"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonVariantConst payloadVariant = doc["payload"]; if (!payloadVariant.is<JsonObjectConst>()) { Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = payloadVariant.as<JsonObjectConst>(); int chunk_index = payload["chunk_index"] | -1; int total_chunks = payload["total_chunks"] | -1; int chunk_size = payload["chunk_size"] | -1; int total_samples = payload["total_samples"] | -1; if (chunk_index < 0 || total_chunks <= 0 || chunk_size <= 0 || total_samples <= 0) { Serial.println("[JsonDispatcher] ERROR: Invalid chunk parameters"); errorManager.report(ErrorCode::InvalidFormat); return; } if (chunk_index == 0) { signalSize = 0; expectedTotalSamples = total_samples; expectedTotalChunks = total_chunks; receivedChunks = 0; Serial.print("[JsonDispatcher] Starting signal reception: "); Serial.print(total_samples); Serial.print(" samples in "); Serial.print(total_chunks); Serial.println(" chunks"); if (total_samples > static_cast<int>(MAX_SIGNAL_SIZE)) { Serial.print("[JsonDispatcher] ERROR: Signal too large: "); Serial.print(total_samples); Serial.print(" > "); Serial.println(MAX_SIGNAL_SIZE); errorManager.report(ErrorCode::InvalidConfig); return; } } if (!payload.containsKey("signal_data")) { Serial.println("[JsonDispatcher] ERROR: Missing signal_data"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonArrayConst signal_data = payload["signal_data"].as<JsonArrayConst>(); size_t copied = 0; for (JsonVariantConst value : signal_data) { if (signalSize >= MAX_SIGNAL_SIZE) { Serial.println("[JsonDispatcher] WARNING: Signal buffer full"); break; } signalBuffer[signalSize++] = value.as<float>(); copied++; } receivedChunks++; Serial.print("[JsonDispatcher] Received chunk "); Serial.print(chunk_index + 1); Serial.print("/"); Serial.print(total_chunks); Serial.print(", copied "); Serial.print(copied); Serial.print(" samples, total: "); Serial.println(signalSize); struct mallinfo mi = mallinfo(); Serial.print("[JsonDispatcher] Free heap: "); Serial.println(mi.fordblks); StaticJsonDocument<128> response; response["type"] = "status"; response["command"] = "SIGNAL_CHUNK_ACK"; response["payload"]["chunk_index"] = chunk_index; response["payload"]["received_samples"] = signalSize; serialCommunicator.sendJson(response); } void JsonDispatcher::handleSignalComplete(const JsonDocument& doc) { Serial.println("[JsonDispatcher] SIGNAL_COMPLETE command received"); if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: Missing payload"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = doc["payload"].as<JsonObjectConst>(); int total_samples = payload["total_samples"] | -1; Serial.print("[JsonDispatcher] Signal reception complete: "); Serial.print(signalSize); Serial.print("/"); Serial.print(total_samples); Serial.println(" samples"); if (signalSize != static_cast<size_t>(total_samples)) { Serial.println("[JsonDispatcher] WARNING: Sample count mismatch"); } if (signalSize == 0) { Serial.println("[JsonDispatcher] ERROR: No signal data received"); errorManager.report(ErrorCode::InvalidConfig); return; } signalGenerator.setSignalFromBuffer(signalBuffer, signalSize); Serial.println("[JsonDispatcher] Signal transferred to SignalGenerator"); StaticJsonDocument<256> response; response["type"] = "status"; response["command"] = "SIGNAL_READY"; response["payload"]["total_samples"] = signalSize; response["payload"]["message"] = "Signal received and ready"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] Transitioning to Measuring state (skipping signal generation)"); stateManager.transitionTo(State::Measuring); } void JsonDispatcher::handleSetModel(const JsonDocument& doc) { Serial.println("[JsonDispatcher] SET_MODEL command received"); if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: Missing payload"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonVariantConst payloadVariant = doc["payload"]; if (!payloadVariant.is<JsonObjectConst>()) { Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = payloadVariant.as<JsonObjectConst>(); if (!controlEngine.setARXModel(payload)) { Serial.println("[JsonDispatcher] ERROR: Failed to set ARX model"); return; } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "ARX model set for MPC"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] ARX model set successfully, MPC ready"); } void JsonDispatcher::handleExecuteMPC(const JsonDocument& doc) { Serial.println("[JsonDispatcher] EXECUTE_MPC command received"); if (!doc.containsKey("payload")) { Serial.println("[JsonDispatcher] ERROR: Missing payload"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonVariantConst payloadVariant = doc["payload"]; if (!payloadVariant.is<JsonObjectConst>()) { Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object"); errorManager.report(ErrorCode::InvalidFormat); return; } JsonObjectConst payload = payloadVariant.as<JsonObjectConst>(); if (!controlEngine.executeMPC(payload)) { Serial.println("[JsonDispatcher] ERROR: MPC execution failed"); return; } StaticJsonDocument<256> response; response["type"] = "response"; response["request_id"] = doc["request_id"]; response["status"] = "success"; response["message"] = "MPC velocity control executed"; serialCommunicator.sendJson(response); Serial.println("[JsonDispatcher] MPC control completed, transitioning to SendControlLog"); stateManager.transitionTo(State::SendControlLog); }

(c) identification_module.py -同定エンジン-

  • 設計の要点
    • 前処理アルゴリズム:計測データには必ずノイズやサンプリングのわずかな揺らぎが含まれます。本コードでは低パスフィルタ、線形補間による等間隔サンプリング化、外れ値の除去を統合し、同定精度を高めています。
    • 最小二乗法によるパラメーター推定:過去の入出力履歴から回帰行列を構成し、正規方程式を解くことで、実機の動特性を決定する係数を瞬時に算出します。
    • 定量的評価の実装:同定して終わりではなく、Fit率(適合率)を算出する機能を備えています。これにより、構築されたモデルがどれだけ実機を忠実に再現できているか(データツインとしての精度)を客観的に評価できます。

identification_module.py

import numpy as np from typing import Dict, List, Tuple, Optional, Callable import threading import time from scipy.signal import butter, filtfilt class ARXIdentifier: @staticmethod def lowpass_filter(data: np.ndarray, Ts: float, cutoff: float = 5.0) -> np.ndarray: nyq = 0.5 / Ts b, a = butter(2, cutoff / nyq) return filtfilt(b, a, data) @staticmethod def interpolate_uniform_sampling(time_data: np.ndarray, data: np.ndarray, Ts_target: float) -> Tuple[np.ndarray, np.ndarray]: if len(time_data) != len(data): raise ValueError(f"time_data length ({len(time_data)}) != data length ({len(data)})") if len(time_data) < 2: raise ValueError(f"insufficient data points ({len(time_data)})") t_uniform = np.arange(time_data[0], time_data[-1], Ts_target) data_interp = np.interp(t_uniform, time_data, data) return t_uniform, data_interp @staticmethod def remove_outliers(data: np.ndarray, n_std: float = 5.0) -> np.ndarray: std = np.std(data) return np.clip(data, -n_std * std, n_std * std) @staticmethod def create_regression_matrix(y: np.ndarray, u: np.ndarray, na: int, nb: int) -> Tuple[np.ndarray, np.ndarray]: N = len(y) max_lag = max(na, nb) phi = [] y_target = y[max_lag:] for t in range(max_lag, N): row = [] row.extend(y[t-na:t][::-1]) row.extend(u[t-nb:t][::-1]) phi.append(row) return np.array(phi), y_target @staticmethod def fit_percent(y_true: np.ndarray, y_pred: np.ndarray) -> float: num = np.linalg.norm(y_true - y_pred) den = np.linalg.norm(y_true - np.mean(y_true)) if den == 0: return 0.0 fit = 100.0 * (1 - num / den) return max(0.0, fit) def identify(self, input_data: np.ndarray, velocity_data: np.ndarray, time_data_ms: Optional[np.ndarray] = None, sampling_period_ms: float = 10.0, apply_preprocessing: bool = True, model_type: str = "ARX", na_range: range = range(1, 5), nb_range: range = range(1, 5), progress_callback: Optional[Callable[[str], None]] = None) -> Dict: def log(msg: str): if progress_callback: progress_callback(msg) if apply_preprocessing: Ts = sampling_period_ms / 1000.0 if time_data_ms is None: time_data = np.arange(len(input_data)) * Ts else: time_data = time_data_ms / 1000.0 if time_data_ms is not None: time_uniform, u_interp = self.interpolate_uniform_sampling(time_data, input_data, Ts) _, vel_interp = self.interpolate_uniform_sampling(time_data, velocity_data, Ts) else: u_interp = input_data.copy() vel_interp = velocity_data.copy() u_filtered = self.lowpass_filter(u_interp, Ts, cutoff=5.0) vel_filtered = self.lowpass_filter(vel_interp, Ts, cutoff=5.0) vel_cleaned = self.remove_outliers(vel_filtered, n_std=5.0) input_processed = u_filtered velocity_processed = vel_cleaned else: input_processed = input_data velocity_processed = velocity_data u_mean = float(np.mean(input_processed)) u_std = float(np.std(input_processed)) if np.std(input_processed) > 0 else 1.0 vel_mean = float(np.mean(velocity_processed)) vel_std = float(np.std(velocity_processed)) if np.std(velocity_processed) > 0 else 1.0 u_norm = (input_processed - u_mean) / u_std vel_norm = (velocity_processed - vel_mean) / vel_std N = len(u_norm) Ntrain = int(N * 0.8) vel_tr = vel_norm[:Ntrain] vel_te = vel_norm[Ntrain:] u_tr = u_norm[:Ntrain] u_te = u_norm[Ntrain:] if model_type == "ARX": best_params = self._identify_arx(vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log) elif model_type == "FIR": best_params = self._identify_fir(vel_tr, vel_te, u_tr, u_te, nb_range, u_mean, u_std, vel_mean, vel_std, log) elif model_type == "ARARX": best_params = self._identify_ararx(vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log) else: raise ValueError(f"Unknown model type: {model_type}") best_params['model_type'] = model_type best_params['input_processed'] = input_processed.tolist() best_params['velocity_processed'] = velocity_processed.tolist() return best_params def _identify_arx(self, vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log): best_fit = -np.inf best_params = None total_combinations = len(na_range) * len(nb_range) current = 0 for na in na_range: for nb in nb_range: current += 1 try: phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, na, nb) theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0] phi_te, y_te_target = self.create_regression_matrix( np.concatenate([vel_tr[-na:], vel_te]), np.concatenate([u_tr[-nb:], u_te]), na, nb ) ysim_te = phi_te @ theta fit = self.fit_percent(y_te_target, ysim_te) if fit > best_fit: best_fit = fit best_params = { 'na': na, 'nb': nb, 'nk': 0, 'coefficients': theta.tolist(), 'fit_percent': fit, 'u_mean': u_mean, 'u_std': u_std, 'vel_mean': vel_mean, 'vel_std': vel_std } except Exception: continue if best_params is None: raise RuntimeError("Failed to identify ARX model") return best_params def _identify_fir(self, vel_tr, vel_te, u_tr, u_te, nb_range, u_mean, u_std, vel_mean, vel_std, log): best_fit = -np.inf best_params = None for nb in nb_range: try: phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, 0, nb) theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0] phi_te, y_te_target = self.create_regression_matrix( vel_te, np.concatenate([u_tr[-nb:], u_te]), 0, nb ) ysim_te = phi_te @ theta fit = self.fit_percent(y_te_target, ysim_te) if fit > best_fit: best_fit = fit best_params = { 'na': 0, 'nb': nb, 'nk': 0, 'coefficients': theta.tolist(), 'fit_percent': fit, 'u_mean': u_mean, 'u_std': u_std, 'vel_mean': vel_mean, 'vel_std': vel_std } except Exception: continue if best_params is None: raise RuntimeError("Failed to identify FIR model") return best_params def _identify_ararx(self, vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log): extended_na_range = range(max(na_range) + 1, min(max(na_range) + 3, 8)) extended_nb_range = range(max(nb_range) + 1, min(max(nb_range) + 3, 8)) best_fit = -np.inf best_params = None for na in extended_na_range: for nb in extended_nb_range: try: phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, na, nb) theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0] phi_te, y_te_target = self.create_regression_matrix( np.concatenate([vel_tr[-na:], vel_te]), np.concatenate([u_tr[-nb:], u_te]), na, nb ) ysim_te = phi_te @ theta fit = self.fit_percent(y_te_target, ysim_te) if fit > best_fit: best_fit = fit best_params = { 'na': na, 'nb': nb, 'nk': 0, 'coefficients': theta.tolist(), 'fit_percent': fit, 'u_mean': u_mean, 'u_std': u_std, 'vel_mean': vel_mean, 'vel_std': vel_std } except Exception: continue if best_params is None: raise RuntimeError("Failed to identify ARARX model") return best_params class IdentificationModule: def __init__(self, serial_manager): self.serial_manager = serial_manager self.identifier = ARXIdentifier() self.data_chunks = [] self.total_chunks = 0 self.stats = None self.identification_thread: Optional[threading.Thread] = None self.is_identifying = False self.current_model_type = "ARX" self.on_identification_complete: Optional[Callable[[Dict], None]] = None self.on_identification_progress: Optional[Callable[[str], None]] = None self.on_identification_error: Optional[Callable[[str], None]] = None def handle_data_chunk(self, chunk_data: Dict): chunk_index = chunk_data['chunk_index'] total_chunks = chunk_data['total_chunks'] if self.on_identification_progress: self.on_identification_progress(f"データ受信中: {chunk_index + 1}/{total_chunks}") self.data_chunks.append(chunk_data) self.total_chunks = total_chunks if 'stats' in chunk_data: self.stats = chunk_data['stats'] if len(self.data_chunks) == self.total_chunks and self.total_chunks > 0: if self.on_identification_progress: self.on_identification_progress("全データ受信完了") def _reconstruct_data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: self.data_chunks.sort(key=lambda x: x['chunk_index']) input_data = [] velocity_data = [] time_data = [] position_data = [] for chunk in self.data_chunks: chunk_input = chunk.get('input', []) chunk_time = chunk.get('time_ms', []) chunk_position = chunk.get('position', []) min_len = min(len(chunk_input), len(chunk_time), len(chunk_position)) if len(chunk_input) != len(chunk_time) or len(chunk_input) != len(chunk_position): chunk_input = chunk_input[:min_len] chunk_time = chunk_time[:min_len] chunk_position = chunk_position[:min_len] input_data.extend(chunk_input) time_data.extend(chunk_time) position_data.extend(chunk_position) return np.array(input_data), np.array([]), np.array(time_data), np.array(position_data) def execute_identification(self, model_type: str): if self.is_identifying: return if not self.data_chunks: if self.on_identification_error: self.on_identification_error("前処理データがありません") return self.current_model_type = model_type self.identification_thread = threading.Thread(target=self._run_identification) self.identification_thread.daemon = True self.identification_thread.start() def _run_identification(self): try: self.is_identifying = True input_data, _, time_data, position_data = self._reconstruct_data() if len(input_data) == 0 or len(position_data) == 0 or len(time_data) == 0: raise ValueError("Reconstructed data is empty") if len(time_data) > 1: sampling_period_ms = np.mean(np.diff(time_data)) else: sampling_period_ms = 10.0 Ts = sampling_period_ms / 1000.0 if len(position_data) > 0: velocity_pc = -np.diff(position_data, prepend=position_data[0]) / Ts self.position_data = position_data self.velocity_pc = velocity_pc else: raise ValueError("No position data available") if self.current_model_type == "ARX": na_range = range(1, 5) nb_range = range(1, 5) elif self.current_model_type == "FIR": na_range = range(0, 1) nb_range = range(5, 16) elif self.current_model_type == "ARARX": na_range = range(1, 6) nb_range = range(1, 6) else: na_range = range(1, 5) nb_range = range(1, 5) params = self.identifier.identify( input_data, velocity_pc, time_data_ms=time_data, sampling_period_ms=sampling_period_ms, apply_preprocessing=True, model_type=self.current_model_type, na_range=na_range, nb_range=nb_range, progress_callback=self.on_identification_progress ) na = params['na'] nb = params['nb'] coefficients = params['coefficients'] if na > 0: params['a_coeffs'] = coefficients[:na] params['b_coeffs'] = coefficients[na:na+nb] else: params['a_coeffs'] = [] params['b_coeffs'] = coefficients[:nb] params['sampling_period_ms'] = sampling_period_ms if self.on_identification_complete: self.on_identification_complete(params) except Exception as e: if self.on_identification_error: self.on_identification_error(str(e)) finally: self.is_identifying = False def _send_model_parameters(self, params: Dict): message = { "type": "command", "command": "SET_MODEL", "request_id": self.serial_manager.request_id_counter, "payload": { "model_type": "ARX", "parameters": { "na": params['na'], "nb": params['nb'], "nk": params['nk'], "coefficients": params['coefficients'] }, "validation": { "fit_percent": params['fit_percent'] } } } self.serial_manager.request_id_counter += 1 import json json_str = json.dumps(message) + "\n" self.serial_manager.serial_port.write(json_str.encode('utf-8')) self.serial_manager.serial_port.flush() if self.on_identification_progress: self.on_identification_progress(f"モデルパラメータ送信完了: Fit={params['fit_percent']:.2f}%") def reset(self): self.data_chunks = [] self.total_chunks = 0 self.stats = None self.is_identifying = False

(d) pid_optimizer.py -シミュレーションによるリスクフリーな自動チューニング-

  • 設計の要点

    -高速シミュレーション: Pythonの実行速度を補うため、一部に Numba(JITコンパイラ)を導入し、数万通りのパラメータセットによる挙動シミュレーションを数秒で完了させます。

    • 台形加速プロファイルの考慮: 単なるステップ応答だけでなく、実用的な台形加速プロファイルを用いた位置決め制御のシミュレーションに対応しており、オーバーシュートや整定時間のトレードオフを自動で最適化します。

    • 安全なパラメータ導出: 実機で「発散(ハンチング)」を試行錯誤するリスクを完全に排除し、デジタルツイン上で理論的に裏付けられた安全なパラメータのみをSpresenseへフィードバックするワークフローを実現しました。

pid_optimizer.py

import numpy as np from typing import Dict, Tuple, Optional, Callable import threading try: from numba import njit NUMBA_AVAILABLE = True except ImportError: NUMBA_AVAILABLE = False def njit(*args, **kwargs): def decorator(func): return func return decorator class TrapezoidalProfile: @staticmethod def generate(target_position, max_velocity, acceleration, Ts, total_time): N = int(total_time / Ts) t = np.arange(N) * Ts Ta = max_velocity / acceleration Na = int(Ta / Ts) dist_ad = max_velocity * Ta dist_const = target_position - dist_ad if dist_const > 0: Tc = dist_const / max_velocity Nc = int(Tc / Ts) else: Nc = 0 max_velocity = np.sqrt(acceleration * target_position) Ta = max_velocity / acceleration Na = int(Ta / Ts) N_stop = min(2 * Na + Nc, N) vel = np.zeros(N) for k in range(min(Na, N)): vel[k] = acceleration * k * Ts for k in range(Na, min(Na + Nc, N)): vel[k] = max_velocity for k in range(Na + Nc, N_stop): vel[k] = max(0.0, max_velocity - acceleration * (k - Na - Nc) * Ts) pos = np.cumsum(vel * Ts) return t, vel, pos, N_stop @njit(cache=True) def simulate_arx_pid_numba( Kp, Ki, Kd, Kff, target_pos, target_vel, Ts, na, nb, a, b, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_final, u_mean, u_std, vel_mean, vel_std ): N = len(target_pos) y_hist = np.zeros(na) u_hist = np.zeros(nb) pos = np.zeros(N) vel = np.zeros(N) u_out = np.zeros(N) integ = 0.0 prev_err = 0.0 sse = 0.0 u_energy = 0.0 max_pos = 0.0 diff_pen = 0.0 diff_cnt = 0 ERROR_MAX = 1e5 VEL_MAX = 1e4 for k in range(N): p = pos[k-1] if k > 0 else 0.0 err = target_pos[k] - p err = min(max(err, -ERROR_MAX), ERROR_MAX) integ += err * Ts deriv = (err - prev_err) / Ts prev_err = err u = Kp*err + Ki*integ + Kd*deriv + Kff*target_vel[k] u = min(max(u, -u_max), u_max) u_out[k] = u if k < N_stop: sse += err * err u_energy += u * u vel_n = 0.0 for i in range(na): vel_n += a[i] * y_hist[i] for i in range(nb): vel_n += b[i] * u_hist[i] vel_n = min(max(vel_n, -VEL_MAX), VEL_MAX) vel[k] = vel_n * vel_std + vel_mean pos[k] = vel[k] * Ts if k == 0 else pos[k-1] + vel[k] * Ts if k >= N_stop: max_pos = max(max_pos, pos[k]) if k >= diff_interval and k < N_stop and k % diff_interval == 0: td = target_pos[k] - target_pos[k-diff_interval] ad = pos[k] - pos[k-diff_interval] diff_pen += ((td - ad) / max(abs(td), 1e-6))**2 diff_cnt += 1 for i in range(na-1, 0, -1): y_hist[i] = y_hist[i-1] if na > 0: y_hist[0] = vel_n for i in range(nb-1, 0, -1): u_hist[i] = u_hist[i-1] if nb > 0: u_hist[0] = -(u - u_mean) / u_std cost = ( sse / max(N_stop, 1) + lambda_u * (u_energy / max(N, 1)) + lambda_os * max(0.0, max_pos - target_final) + lambda_dp * (diff_pen / max(diff_cnt, 1)) ) if not np.isfinite(cost): cost = 1e12 return cost, pos, vel, u_out class ARXPIDSimulator: @staticmethod def simulate(Kp, Ki, Kd, Kff, target_pos, target_vel, Ts, na, nb, theta, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_final, u_mean, u_std, vel_mean, vel_std): a = theta[:na].astype(np.float64) b = theta[na:].astype(np.float64) return simulate_arx_pid_numba( Kp, Ki, Kd, Kff, target_pos.astype(np.float64), target_vel.astype(np.float64), Ts, na, nb, a, b, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_final, u_mean, u_std, vel_mean, vel_std ) class PIDOptimizer: def __init__(self, model_params: Dict): self.na = model_params['na'] self.nb = model_params['nb'] self.theta = np.array(model_params['coefficients'], dtype=np.float64) self.u_std = model_params.get('u_std', 1.0) self.vel_std = model_params.get('vel_std', 1.0) self.u_mean = model_params.get('u_mean', 0.0) self.vel_mean = model_params.get('vel_mean', 0.0) def optimize( self, target_position: float, max_velocity: float = 80.0, acceleration: float = 130.0, sampling_period_ms: float = 30.0, total_time_s: float = 2.0, u_max: float = 255.0, u_min_abs: float = 30.0, lambda_u: float = 0.0005, lambda_os: float = 10.0, lambda_dp: float = 0.5, max_iterations: int = 5000, progress_callback: Optional[Callable[[str], None]] = None, **kwargs ) -> Dict: def log(msg: str): if progress_callback: progress_callback(msg) Ts = sampling_period_ms / 1000.0 diff_interval = 10 t, vref, pref, N_stop = TrapezoidalProfile.generate( target_position, max_velocity, acceleration, Ts, total_time_s ) params = np.array([1.0, 0.01, 0.01, 0.5]) lr = kwargs.get('learning_rate', 0.08) momentum = 0.5 velocity = np.zeros_like(params) cost_history = [] best_cost = float('inf') best_params = params.copy() no_improve_count = 0 for i in range(max_iterations): cost, pos_out, vel_out, u_out = ARXPIDSimulator.simulate( params[0], params[1], params[2], params[3], pref, vref, Ts, self.na, self.nb, self.theta, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_position, self.u_mean, self.u_std, self.vel_mean, self.vel_std ) cost_history.append(cost) if cost < best_cost: best_cost = cost best_params = params.copy() no_improve_count = 0 else: no_improve_count += 1 grad = np.zeros_like(params) eps = 0.01 for j in range(4): p_plus = params.copy() p_minus = params.copy() p_plus[j] += eps p_minus[j] -= eps p_plus = np.maximum(p_plus, [0.1, 0.001, 0.001, 0.1]) p_minus = np.maximum(p_minus, [0.1, 0.001, 0.001, 0.1]) c_plus, *_ = ARXPIDSimulator.simulate( p_plus[0], p_plus[1], p_plus[2], p_plus[3], pref, vref, Ts, self.na, self.nb, self.theta, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_position, self.u_mean, self.u_std, self.vel_mean, self.vel_std ) c_minus, *_ = ARXPIDSimulator.simulate( p_minus[0], p_minus[1], p_minus[2], p_minus[3], pref, vref, Ts, self.na, self.nb, self.theta, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_position, self.u_mean, self.u_std, self.vel_mean, self.vel_std ) grad[j] = (c_plus - c_minus) / (2 * eps) if not np.all(np.isfinite(grad)): continue grad_norm = np.linalg.norm(grad) if grad_norm > 1e-10: grad_direction = grad / grad_norm param_scales = np.array([0.5, 0.05, 0.05, 0.2]) scaled_grad = grad_direction * param_scales else: scaled_grad = np.zeros_like(grad) velocity = momentum * velocity - lr * scaled_grad params = params + velocity params = np.maximum(params, [0.1, 0.001, 0.001, 0.1]) params = np.minimum(params, [50.0, 10.0, 10.0, 5.0]) if no_improve_count > 1000: params = best_params break if i % 300 == 0: log(f"Iter {i}: Cost={cost:.2f}") final_cost, pos_out, vel_out, u_out = ARXPIDSimulator.simulate( params[0], params[1], params[2], params[3], pref, vref, Ts, self.na, self.nb, self.theta, u_max, u_min_abs, lambda_u, lambda_os, lambda_dp, diff_interval, N_stop, target_position, self.u_mean, self.u_std, self.vel_mean, self.vel_std ) return { 'Kp': float(params[0]), 'Ki': float(params[1]), 'Kd': float(params[2]), 'Kff': float(params[3]), 'cost': float(final_cost), 'iterations': len(cost_history), 'cost_history': cost_history, 'time': t.tolist(), 'position_sim': pos_out.tolist(), 'velocity_sim': vel_out.tolist(), 'control_sim': u_out.tolist(), 'target_position': pref.tolist(), 'target_velocity': vref.tolist() } class ModelBasedParameterEstimator: def __init__(self, model_params: Dict): self.na = model_params['na'] self.nb = model_params['nb'] self.theta = np.array(model_params['coefficients'], dtype=np.float64) self.u_std = model_params.get('u_std', 1.0) self.vel_std = model_params.get('vel_std', 1.0) self.u_mean = model_params.get('u_mean', 0.0) self.vel_mean = model_params.get('vel_mean', 0.0) self.model_Ts = model_params.get('sampling_period_ms', 30.0) / 1000.0 def simulate_step_response(self, u_step: float, duration_s: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: Ts = self.model_Ts N = int(duration_s / Ts) a = self.theta[:self.na] b = self.theta[self.na:] y_hist = np.zeros(self.na) u_hist = np.zeros(self.nb) time = np.arange(N) * Ts velocity = np.zeros(N) position = np.zeros(N) for k in range(N): vel_n = 0.0 for i in range(self.na): vel_n += a[i] * y_hist[i] for i in range(self.nb): vel_n += b[i] * u_hist[i] velocity[k] = vel_n * self.vel_std + self.vel_mean if k == 0: position[k] = velocity[k] * Ts else: position[k] = position[k-1] + velocity[k] * Ts for i in range(self.na - 1, 0, -1): y_hist[i] = y_hist[i-1] if self.na > 0: y_hist[0] = vel_n for i in range(self.nb - 1, 0, -1): u_hist[i] = u_hist[i-1] if self.nb > 0: u_hist[0] = -(u_step - self.u_mean) / self.u_std return time, velocity, position def estimate_system_characteristics(self, u_max: float = 100.0) -> Dict: a = self.theta[:self.na] b = self.theta[self.na:] sum_a = np.sum(a) sum_b = np.sum(b) u_norm = -(u_max - self.u_mean) / self.u_std if abs(1 - sum_a) > 1e-6: vel_norm_ss = u_norm * sum_b / (1 - sum_a) analytical_steady_velocity = vel_norm_ss * self.vel_std + self.vel_mean else: analytical_steady_velocity = None time, velocity, position = self.simulate_step_response(u_max, duration_s=3.0) steady_start = int(len(velocity) * 0.8) simulated_steady_velocity = np.mean(np.abs(velocity[steady_start:])) if analytical_steady_velocity is not None: steady_velocity = abs(analytical_steady_velocity) else: steady_velocity = simulated_steady_velocity target_63 = 0.632 * simulated_steady_velocity time_constant = 0.1 for i, v in enumerate(np.abs(velocity)): if v >= target_63: time_constant = time[i] break accel_samples = min(20, len(velocity) // 4) if accel_samples > 1: dt = time[1] - time[0] accelerations = np.diff(velocity[:accel_samples]) / dt acceleration_rate = np.mean(np.abs(accelerations)) else: acceleration_rate = steady_velocity / time_constant if time_constant > 0 else 1000.0 gain = steady_velocity / u_max if u_max > 0 else 0 return { 'max_steady_velocity': steady_velocity, 'acceleration_rate': acceleration_rate, 'time_constant': time_constant, 'gain': gain } def estimate_control_parameters( self, target_position: float, total_time_s: float, u_max: float = 100.0, safety_factor: float = 0.8 ) -> Dict: chars = self.estimate_system_characteristics(u_max) max_achievable_velocity = chars['max_steady_velocity'] * safety_factor max_achievable_accel = chars['acceleration_rate'] * safety_factor S = abs(target_position) T = total_time_s a = max_achievable_accel discriminant = T * T * a * a - 4 * S * a if discriminant >= 0: V_ideal = (T * a - np.sqrt(discriminant)) / 2 V_ideal = max(V_ideal, 1.0) else: T_min = 2 * np.sqrt(S / a) if a > 0 else float('inf') V_ideal = np.sqrt(S * a) if a > 0 else max_achievable_velocity recommended_velocity = min(V_ideal, max_achievable_velocity) t_accel_target = total_time_s * 0.3 if t_accel_target > 0 and recommended_velocity > 0: required_accel = recommended_velocity / t_accel_target recommended_accel = min(required_accel, max_achievable_accel) else: recommended_accel = max_achievable_accel if recommended_accel > 0 and recommended_velocity > 0: t_accel_actual = recommended_velocity / recommended_accel dist_accel = 0.5 * recommended_accel * t_accel_actual ** 2 dist_cruise = S - 2 * dist_accel if dist_cruise > 0: t_cruise = dist_cruise / recommended_velocity estimated_time = 2 * t_accel_actual + t_cruise else: actual_peak_vel = np.sqrt(S * recommended_accel) t_up = actual_peak_vel / recommended_accel estimated_time = 2 * t_up else: estimated_time = float('inf') achievable = estimated_time <= total_time_s * 1.2 return { 'max_velocity': recommended_velocity, 'acceleration': recommended_accel, 'achievable': achievable, 'estimated_time': estimated_time, 'system_max_velocity': chars['max_steady_velocity'], 'system_max_acceleration': chars['acceleration_rate'] } class PIDOptimizationModule: def __init__(self, serial_manager): self.serial_manager = serial_manager self.optimizer = None self.parameter_estimator = None self.optimization_thread: Optional[threading.Thread] = None self.is_optimizing = False self.on_optimization_complete: Optional[Callable[[Dict], None]] = None self.on_optimization_progress: Optional[Callable[[str], None]] = None self.on_optimization_error: Optional[Callable[[str], None]] = None def set_model(self, model_params: Dict): self.optimizer = PIDOptimizer(model_params) self.parameter_estimator = ModelBasedParameterEstimator(model_params) def estimate_control_parameters( self, target_position: float, total_time_s: float, u_max: float = 100.0, safety_factor: float = 0.7 ) -> Optional[Dict]: if self.parameter_estimator is None: return None return self.parameter_estimator.estimate_control_parameters( target_position=target_position, total_time_s=total_time_s, u_max=u_max, safety_factor=safety_factor ) def get_system_characteristics(self, u_max: float = 100.0) -> Optional[Dict]: if self.parameter_estimator is None: return None return self.parameter_estimator.estimate_system_characteristics(u_max) def start_optimization( self, target_position: float, max_velocity: float = 80.0, acceleration: float = 130.0, **kwargs ): if self.optimizer is None: if self.on_optimization_error: self.on_optimization_error("Model not set") return if self.is_optimizing: return self.optimization_thread = threading.Thread( target=self._run_optimization, args=(target_position, max_velocity, acceleration), kwargs=kwargs, daemon=True ) self.optimization_thread.start() def _run_optimization(self, target_position: float, max_velocity: float, acceleration: float, **kwargs): try: self.is_optimizing = True result = self.optimizer.optimize( target_position=target_position, max_velocity=max_velocity, acceleration=acceleration, progress_callback=self.on_optimization_progress, **kwargs ) if self.on_optimization_complete: self.on_optimization_complete(result) except Exception as e: if self.on_optimization_error: self.on_optimization_error(str(e)) finally: self.is_optimizing = False def reset(self): self.is_optimizing = False
  • L_tiem さんが 2026/01/31 に 編集 をしました。 (メッセージ: 初版)
  • L_tiem さんが 前の土曜日の12:05 に 編集 をしました。
  • Opening
    JOのアイコン画像 JO 前の土曜日の16:47

    この投稿のシステムは通信によりマイコンデータを取得するが、通信した時点で「リアルタイム」ではなくなる
    処理を含めて「マイコン側」での処理が必要ではなかろうか?

    適応制御・MPC最適化  最適化が必要なのに実装されていません
    最適化計算をPC側でやろうとすると、シリアル通信の往復レイテンシ(数ms〜数十ms)が制御周期と同じオーダーになり、位相遅れが生じて系が不安定化するリスクがあります
    現在のSpresenseのサブコアの空きリソースや、制御周期の目標値はどの程度でしょうか?それに応じて、MPCの予測ホライズンや適応制御のアルゴリズムを調整する必要があります。

    話は変わりますが、ドローンの飛行制御の様に、6軸センサーからモーターへのPID制御等では、更に高速性が求められますね

    0 件の返信が折りたたまれています
ログインしてコメントを投稿する