0. 目次
- はじめに
- システムの構成・コンセプト
- システム同定および制御テストの流れ
- 設計のポイント
- 実際に使用したときの様子
- 展望
- ソースコード抜粋
1. はじめに
経験則に頼ったPID調整は限界が生じると考えています。本プロジェクトでは、SonyのSpresenseが持つ強力な演算能力とマルチコアアーキテクチャを活用し、実機の動特性を数式化する「システム同定」から、そのモデルを用いた「高度な制御」までを一気通貫で行えるプラットフォームを開発しました。 「デジタルツイン」の概念を組み込み開発に持ち込み、PCの計算資源とマイコンのリアルタイム性を高度に融合させることを目指しています。
2. システム構成・コンセプト
本システムでは、PCとマイコンを分離した役割分担(エッジ&コンピューティング)を採用しています。
PC(Python/CustomTkinter): 複雑な行列演算を伴う数理モデルの推定、制御パラメータの最適化、リッチなGUIによる可視化を担当。
Spresense(C++/Arduino): モーターのPWM駆動、エンコーダーの高速サンプリング、およびリアルタイムな制御ループの実行を担当。
ハードウェア構成
Main Board: Spresense Main Board
Extension Board: 信号の入出力およびPC通信用
Actuator: エンコーダー付きDCモーター(FIT0482)
MotorDriver: L298N
ソフトウェア構成
Spresense側: メインコア(状態遷移・通信管理)、サブコア(高速計測・制御)。
PC側: データ解析、PBRS信号生成、ARX/FIR/ARARX同定アルゴリズム、PID最適化シミュレータ。
- 同定処理および制御テストの流れ
実機の挙動を正確にモデル化するため、以下のステップを踏みます。
信号生成: PC側でM系列信号(PRBS)を生成し、Spresense経由でモーターに印加。
高速計測: Spresenseが入出力データをサンプリング。
データ転送: 独自バイナリプロトコルにより、PCへ欠落のないデータ転送を実行。
モデル推定: ARXモデルを用い、最小二乗法によってパラメータを算出。
同定結果の閲覧:同定結果の適合率および検証データに対する同定結果のグラフの表示
PID最適化:同定したモデルを基にPIDパラメーターの最適化。
制御テストデータの送信:制御テストのために最適化パラメーターをSpresenseへ送信
制御テストの実行:位置制御、速度制御、モデル予測制御を行う。
- 設計のポイント
① 堅牢なハイブリッド通信プロトコル
PC-Spresense間の通信には、「柔軟なJSON」と「高速なバイナリ」を組み合わせた独自プロトコルを設計しました。
-
制御コマンド (JSON): 状態遷移やパラメータ設定に使用。
-
高速データ転送 (バイナリ): 独自パケット構造(Magic Number + Payload + Checksum)を定義。数千点の浮動小数点データをシリアル帯域の限界まで高速に、かつ整合性を保って転送します。
② レイヤー分離とエラーハンドリング
組み込み開発におけるデバッグ効率を最大化するため、ソフトウェアアーキテクチャにレイヤー分離(Transport / Protocol / Domain)を導入しました。AboutError.hpp によりエラー発生箇所を特定し、GUIへ即座に通知する堅牢なシステムとしています。
レイヤーとエラータイプ
#pragma once
enum class ErrorLayer
{
None,
Transport,
Protocol,
Domain
};
enum class ErrorCode
{
None,
SerialPortNotOpened,
InvalidJSON,
JsonSizeExceeded,
UnknownError,
MissingType,
UnknownType,
InvalidFormat,
MismatchedSize,
MissingPayload,
UnknownCommand,
InvalidParameter,
IdentificationFailed,
OptimizationFailed,
InvalidConfig,
SDWriteFailed,
SDInitFailed,
MPInitFailed,
MPSendFailed,
WhiteNoiseGenerationFailed,
MeasurementFailed,
PreprocessingFailed,
OutOfMemory,
Success,
ConfigApplyFailed,
ExperimentAlreadyRunning,
ExperimentStartFailed,
ControlExecutionFailed,
ExperimentNotRunning,
InvalidStateTransition,
OutlierRemovalFailed
};
struct ErrorInfo
{
ErrorLayer layer;
ErrorCode code;
const char* message;
};
③ マルチコア性能の活用(設計思想)
Spresenseの真価を引き出すため、タスクを各コアに独立させる設計を採っています。
Main Core: 通信スタック、UIイベント管理。
Sub Core: タイマー割り込みによるモーターPWM制御、エンコーダー信号の高速デコード。 これにより、PCとの重い通信処理中でも、モーター制御の「決定論的なリアルタイム性」を維持します。
④ MPC(モデル予測制御)の実装に向けて
本プラットフォームの最終目標は、同定したモデルをSpresense側にフィードバックし、制約条件付きの最適化問題を解くMPC(モデル予測制御)の実装です。Spresenseの強力なFPU/DSPを活用することで、小型デバイスによる高度な現代制御の民主化を目指しています。
- 実際に使用したときの様子
現在はメインコアをベースに、PC側のリッチなGUIと連携してシステム同定を行う環境が整っています。
この画面から実験の時間、サンプリング周期、および、実験の信号の生成のパラメーターの設定を行います。
サイドバーを下にスライドすると実験設定画面②のように信号を生成し、設定をSpresenseに送信するボタンが表示されます。
このボタンを押すことでSpresenseに設定を送信し、Spresense側で内容が確認された場合、実験の開始ボタンを押すことができるようになり、実験開始ボタンを押すことで実験が開始されます。
実験が終了したあと、実験ログにモデル同定タブの前処理ボタンが押すように指示が表示されます。
モデル同定画面では前処理の実行、およびARXモデルを含むシステム同定を実行できます。
前処理ボタンを押した後、所望のモデルのボタンを押すことでシステム同定が実行されます。以下はARXモデルのボタンを押したときの画面です。
以下は、サンプリング時間を30秒として同定を行ったときのモデルの適合率と予測値のグラフです。非常に高い精度で予測がされていることがわかります。
同定完了後、制御テスト画面です。現在は位置制御、速度制御、モデル予測制御(まだ未完成)を選択できます。
位置制御のPIDパラメーターを最適化した結果以下のようになりました。
最適化完了後、制御テスト実行のボタンを押すことで制御テストを実行することができます。
6. 展望
Sub Coreへの完全オフロード: 制御周期の更なる高速化。
オンボードMPC: Spresense内での将来軌道予測計算の実装。
適応制御: 負荷変動に合わせたモデルのリアルタイム更新。
7. 全体のコード
SpresenseとPCのファイルのディレクトリ構成と各ファイルの概要を示す。
7.1 ディレクトリ構成
以下にSpresenseとPCのファイルそれぞれのディレクトリ構成を示す。
Spresense (Arduino Project)
├── system_identification-spresense.ino # プロジェクトのエントリーポイント(Setup/Loop)
└── src/
├── AboutError.hpp # システム全体で共通のエラーコード定義
├── SpresenseFix.h # 特定環境下での互換性維持用ヘッダ
│
├── communication/ # PCおよびコア間通信の抽象化層
│ ├── IErrorReporter.hpp # エラー通知用のインターフェース定義
│ ├── MPProtocol.hpp # マルチコア間通信(MPCom)用プロトコル
│ ├── BinaryGenerator/ # 計測データのバイナリパケット構築ロジック
│ ├── MPCommunicator/ # サブコアとのデータ・コマンド授受
│ └── SerialCommunicator/ # PC(Python)とのシリアル通信管理
│
├── control/ # 制御アルゴリズムの実装
│ ├── ControlExecutionEngine # 制御ループの実行スケジュール管理
│ ├── PIDController/ # PID制御の基本クラス
│ ├── MPCController/ # モデル予測制御(将来の拡張用)
│ ├── PIDOptimizer/ # オンボードでのPIDパラメータ最適化
│ └── controls/ # 具体的な制御則(位置・速度・台形加速)
│
├── experiment/ # システム同定のための実験管理
│ ├── ExperimentController # 同定実験全体のシーケンス管理
│ ├── MeasurementController # 高速サンプリングとバッファリング
│ └── SignalGenerator/ # M系列信号やPRBSなどの同定用信号生成
│
├── hardware/ # 物理デバイスのドライバ抽象化層
│ ├── EncoderInterface/ # エンコーダーのパルス計数と速度計算
│ └── MotorInterface/ # PWM出力とモータードライバ制御
│
├── managers/ # リソースおよびシステム状態の管理
│ ├── ConfigManager/ # 実験パラメータや設定値の保持
│ ├── ErrorManager/ # 発生したエラーの収集と上位層への報告
│ ├── SDCardManager/ # 大容量計測データのSD保存・読み出し
│ ├── PreprocessManager/ # 実験データの前処理(フィルタリング等)
│ └── ShutdownManager/ # システムの安全な停止プロセス管理
│
├── protocol/ # 通信フォーマットの解析
│ └── JsonDispatcher # PCから届いたJSONコマンドを各モジュールへ振り分け
│
└── state/ # システムの状態管理
├── StateManager # 現在の状態(待機、実験中、エラー等)の管理
└── SystemState.hpp # 状態遷移図に基づいた列挙型定義
以下はPC側のファイルの構成である。
PC (Python)
├── main.py (実行エントリーポイント)
├── comm/ (通信・計算ロジック層)
│ ├── identification_module.py # ARXモデル同定
│ ├── pid_optimizer.py # PIDパラメータ最適化
│ ├── serial_manager.py # バイナリ/JSONハイブリッド通信
│ └── signal_generator.py # M系列/ホワイトノイズ生成
├── ui/ (表示・インターフェース層)
│ ├── main_window.py # メイン画面レイアウト
│ ├── experiment_tab.py # 実験設定タブ
│ ├── data_tab.py # データ解析・同定タブ
│ └── control_tab.py # 制御テスト・MPCタブ
└── utils/ # 共通ユーティリティ
7.2 コードの抜粋
以下にコードの一部を抜粋して掲載する
(a)system_identification-spresense.ino-堅牢なリアルタイム製を支えるステートマシン-
本システムのメインエントリーポイントです。高度な制御システムにおいて、最も避けるべきは「状態の矛盾」による暴走です。本プログラムでは、システムの状態をenum class Stateで厳密に定義し、イベント駆動型のステートマシンとして設計しました。
-
設計の要点
-
状態遷移の可視化:待機(idle)から計測(Measuring)、データ送信(SendRawData)に至るまで載フローを中央管理し、予期せぬ動作を排除しています。
-
軽量なメインループ:loop() 内では状態に応じたハンドラを呼び出すのみにとどめ、実際載制御ロジックはサブモジュールへ委譲することで高い保守性と可読性を確保しています。
-
エラーの集中管理:どのフェーズで異常が発生しても、共通のエラー状態へ遷移し、安全にモーターを停止させた上でPCへ通知する「フェイルセーフ」を実装しています
-
system_identification-spresense.ino
#include <Arduino.h>
#ifdef pgm_read_ptr
#undef pgm_read_ptr
#endif
#define pgm_read_ptr(addr) (*(void * const *)(addr))
#include <SDHCI.h>
#include <File.h>
#include "src/SpresenseFix.h"
#include "src/AboutError.hpp"
#include "src/state/SystemState.hpp"
#include "src/state/StateManager.hpp"
#include "src/managers/ErrorManager/ErrorManager.hpp"
#include "src/managers/ConfigManager/ConfigManager.hpp"
#include "src/managers/SDCardManager/SDCardManager.hpp"
#include "src/managers/PreprocessManager/PreprocessManager.hpp"
#include "src/managers/ShutdownManager/ShutdownManager.hpp"
#include "src/hardware/MotorInterface/MotorInterface.hpp"
#include "src/hardware/EncoderInterface/EncoderInterface.hpp"
#include "src/experiment/SignalGenerator/SignalGenerator.hpp"
#include "src/experiment/MeasurementController/MeasurementController.hpp"
#include "src/experiment/ExperimentController/ExperimentController.hpp"
#include "src/communication/SerialCommunicator/SerialCommunicator.hpp"
#include "src/communication/MPCommunicator/MPCommunicator.hpp"
#include "src/protocol/JsonDispatcher.hpp"
#include "src/control/ControlExecutionEngine/ControlExecutionEngine.hpp"
const int ENCODER_PIN_A = 2;
const int ENCODER_PIN_B = 3;
const uint8_t MOTOR_DIR_PIN_A = 4;
const uint8_t MOTOR_DIR_PIN_B = 5;
const uint8_t MOTOR_SPEED_PIN = 6;
const int SERIAL_BAUD_RATE = 921600;
StateManager* g_stateManager = nullptr;
ErrorManager* g_errorManager = nullptr;
SDCardManager* g_sdCardManager = nullptr;
ConfigManager* g_configManager = nullptr;
PreprocessManager* g_preprocessManager = nullptr;
ShutdownManager* g_shutdownManager = nullptr;
MotorInterface* g_motorInterface = nullptr;
EncoderInterface* g_encoderInterface = nullptr;
SignalGenerator* g_signalGenerator = nullptr;
MeasurementController* g_measurementController = nullptr;
ExperimentController* g_experimentController = nullptr;
ControlExecutionEngine* g_controlEngine = nullptr;
SerialCommunicator* g_serialCommunicator = nullptr;
MPCommunicator* g_mpCommunicator = nullptr;
JsonDispatcher* g_jsonDispatcher = nullptr;
void setup() {
pinMode(MOTOR_DIR_PIN_A, OUTPUT);
pinMode(MOTOR_DIR_PIN_B, OUTPUT);
pinMode(MOTOR_SPEED_PIN, OUTPUT);
digitalWrite(MOTOR_DIR_PIN_A, LOW);
digitalWrite(MOTOR_DIR_PIN_B, LOW);
analogWrite(MOTOR_SPEED_PIN, 0);
Serial.begin(SERIAL_BAUD_RATE);
delay(2000);
Serial.println("\n\n=== System Initialization Started ===");
Serial.println("BUILD VERSION: 2026-01-12-23:00");
Serial.println("FIX: WaitModelParameters->ExecuteControl allowed, motor=-controlSignal");
Serial.println("[Setup] Motor pins initialized and stopped IMMEDIATELY");
g_stateManager = new StateManager();
g_errorManager = new ErrorManager();
g_sdCardManager = new SDCardManager(*g_errorManager);
g_configManager = new ConfigManager(*g_stateManager, *g_errorManager);
g_motorInterface = new MotorInterface(MOTOR_DIR_PIN_A, MOTOR_DIR_PIN_B, MOTOR_SPEED_PIN);
g_encoderInterface = new EncoderInterface(ENCODER_PIN_A, ENCODER_PIN_B);
g_mpCommunicator = new MPCommunicator(*g_errorManager);
if (!g_mpCommunicator->init()) {
Serial.println("[ERROR] MPCommunicator initialization failed!");
g_errorManager->report(ErrorCode::MPInitFailed);
}
g_signalGenerator = new SignalGenerator(*g_configManager, *g_sdCardManager, *g_errorManager);
g_measurementController = new MeasurementController(
*g_configManager, *g_signalGenerator, *g_motorInterface, *g_encoderInterface,
*g_sdCardManager, *g_stateManager, *g_errorManager, *g_mpCommunicator
);
g_preprocessManager = new PreprocessManager(
*g_measurementController, *g_sdCardManager, *g_errorManager
);
g_experimentController = new ExperimentController(
*g_stateManager, *g_signalGenerator, *g_measurementController, *g_errorManager, *g_preprocessManager
);
g_controlEngine = new ControlExecutionEngine(
*g_stateManager, *g_errorManager, *g_sdCardManager, *g_motorInterface,
*g_encoderInterface, *g_mpCommunicator
);
g_shutdownManager = new ShutdownManager(
*g_sdCardManager, *g_motorInterface, *g_stateManager, *g_errorManager
);
g_serialCommunicator = new SerialCommunicator(SERIAL_BAUD_RATE, *g_errorManager);
g_jsonDispatcher = new JsonDispatcher(
*g_errorManager, *g_configManager, *g_stateManager, *g_preprocessManager,
*g_controlEngine, *g_serialCommunicator, *g_shutdownManager,
*g_motorInterface, *g_signalGenerator
);
g_serialCommunicator->setDispatcher(*g_jsonDispatcher);
g_motorInterface->init();
g_motorInterface->stop();
Serial.println("Motor interface initialized");
g_encoderInterface->init();
EncoderInterface::instance = g_encoderInterface;
Serial.println("Encoder interface initialized");
if (!g_sdCardManager->init()) {
Serial.println("SD card initialization failed!");
g_errorManager->report(ErrorCode::SDInitFailed);
} else {
Serial.println("SD card initialized");
}
g_serialCommunicator->init();
g_errorManager->setReporter(g_serialCommunicator);
Serial.println("Serial communicator initialized");
g_stateManager->transitionTo(State::Idle);
Serial.println("State manager initialized - System Ready");
Serial.println("=== System Ready ===\n");
}
void loop() {
State currentState = g_stateManager->getCurrentState();
if (currentState != State::Measuring) {
g_serialCommunicator->receiveJson();
}
if (g_mpCommunicator) {
g_mpCommunicator->update();
}
static State previousState = State::Boot;
g_stateManager->update();
if (previousState != currentState) {
if (currentState != State::Measuring && currentState != State::ExecuteControl) {
g_motorInterface->stop();
Serial.print("[Main] State changed, motor stopped. New state: ");
Serial.println((int)currentState);
} else {
Serial.print("[Main] State changed to: ");
Serial.println((int)currentState);
}
previousState = currentState;
}
switch (currentState) {
case State::GenerateWhiteNoise:
case State::Measuring:
g_experimentController->update();
break;
case State::SendRawData: {
static bool dataSent = false;
if (!dataSent) {
g_motorInterface->stop();
Serial.println("[Main] Motor stopped in SendRawData state");
Serial.println("[Main] Skipping measurement data transmission");
size_t sampleCount = g_measurementController->getSampleCount();
StaticJsonDocument<256> doc;
doc["type"] = "status";
doc["command"] = "MEASUREMENT_DATA_COMPLETE";
doc["payload"]["total_samples"] = sampleCount;
doc["payload"]["message"] = "測定完了。データ送信をスキップします";
g_serialCommunicator->sendJson(doc);
StaticJsonDocument<256> expCompleteDoc;
expCompleteDoc["type"] = "experiment_complete";
expCompleteDoc["samples"] = sampleCount;
g_serialCommunicator->sendJson(expCompleteDoc);
Serial.println("[Main] Sent experiment_complete notification to PC");
Serial.println("[Main] Transitioning to WaitOutlierSelection");
g_stateManager->transitionTo(State::WaitOutlierSelection);
dataSent = true;
}
if (currentState != State::SendRawData) {
dataSent = false;
}
break;
}
case State::WaitModelParameters: {
static bool waitMessageShown = false;
if (!waitMessageShown) {
Serial.println("[Main] Waiting for model parameters from PC...");
waitMessageShown = true;
}
if (currentState != State::WaitModelParameters) {
waitMessageShown = false;
}
delay(100);
break;
}
case State::IdentifyModel: {
delay(10);
break;
}
case State::SendModel: {
static bool modelProcessed = false;
if (!modelProcessed) {
Serial.println("[Main] Model sent, skipping PID optimization for now");
Serial.println("[Main] Transitioning to WaitControlCommand");
g_stateManager->transitionTo(State::WaitControlCommand);
modelProcessed = true;
}
if (currentState != State::SendModel) {
modelProcessed = false;
}
break;
}
case State::WaitControlCommand:
delay(10);
break;
case State::ExecuteControl: {
Serial.println("[Main] Control execution complete, transitioning to SendControlLog");
g_stateManager->transitionTo(State::SendControlLog);
break;
}
case State::SendControlLog: {
static bool logSent = false;
if (!logSent) {
Serial.println("[Main] Sending control log");
const auto& controlLog = g_controlEngine->getControlLog();
const char* modelType = g_controlEngine->getModelType();
if (controlLog.empty()) {
Serial.println("[Main] WARNING: Control log is empty (SubCore execution doesn't log to MainCore yet)");
StaticJsonDocument<256> doc;
doc["type"] = "control_log";
doc["status"] = "completed";
doc["message"] = "Control executed on SubCore (detailed log not available)";
doc["samples"] = 0;
doc["model_type"] = modelType;
g_serialCommunicator->sendJson(doc);
} else {
bool success = g_controlEngine->wasLastControlSuccessful();
g_serialCommunicator->sendControlLog(controlLog, modelType);
StaticJsonDocument<256> statusDoc;
statusDoc["type"] = "control_status";
statusDoc["model_type"] = modelType;
statusDoc["success"] = success;
statusDoc["message"] = success ? "Control completed successfully" : "Control timeout - partial data collected";
statusDoc["samples"] = (int)controlLog.size();
g_serialCommunicator->sendJson(statusDoc);
}
Serial.println("[Main] Control log sent, returning to WaitControlCommand");
g_stateManager->transitionTo(State::WaitControlCommand);
logSent = true;
}
if (currentState != State::SendControlLog) {
logSent = false;
}
break;
}
case State::Error:
Serial.println("System in Error state");
g_motorInterface->stop();
delay(1000);
break;
default:
delay(10);
break;
}
}
(b) JsonDispatcher.cpp/ .hpp -柔軟な拡張をもたらすインテリジェント・インターフェース-
PC側(Python)からの命令を解釈し、適切なハードウェア処理へと橋渡しをする「知能的な窓口」です。単なるシリアル受信に留まらず、オブジェクト指向に基づいたディスパッチ(振り分け)を行っています。
-
設計の要点
-
JSONプロトコルの採用: 命令セットにJSON形式を採用することで、サンプリングレートや信号振幅といった複雑なパラメータ変更に対しても、ハードウェア側のコードを書き換えることなく柔軟に対応可能です。
-
疎結合なアーキテクチャ: 受信したコマンドに基づいて、実験管理(ExperimentController)や設定管理(ConfigManager)などの各専門モジュールへ命令を分配します。これにより、通信仕様の変更が制御ロジックに影響を与えない「疎結合」な設計を実現しました。
-
リアルタイム性の配慮: 処理負荷の高いパース処理と、極めて高い精度が求められるモーター制御処理が干渉しないよう、将来的なマルチコア展開を見据えたタスク分離のハブとして機能します。
-
JsonDispatcher.hpp
#pragma once
#include <Arduino.h>
#include "../SpresenseFix.h"
#ifdef pgm_read_ptr
#undef pgm_read_ptr
#define pgm_read_ptr(addr) (*(void * const *)(addr))
#endif
#include <ArduinoJson.h>
class ErrorManager;
class ConfigManager;
class StateManager;
class PreprocessManager;
class ControlExecutionEngine;
class SerialCommunicator;
class ShutdownManager;
class MotorInterface;
class SignalGenerator;
class JsonDispatcher {
public:
JsonDispatcher(
ErrorManager& errorManager,
ConfigManager& configManager,
StateManager& stateManager,
PreprocessManager& preprocessManager,
ControlExecutionEngine& controlEngine,
SerialCommunicator& serialCommunicator,
ShutdownManager& shutdownManager,
MotorInterface& motorInterface,
SignalGenerator& signalGenerator
);
void dispatch(const JsonDocument& doc);
private:
ErrorManager& errorManager;
ConfigManager& configManager;
StateManager& stateManager;
PreprocessManager& preprocessManager;
ControlExecutionEngine& controlEngine;
SerialCommunicator& serialCommunicator;
ShutdownManager& shutdownManager;
MotorInterface& motorInterface;
SignalGenerator& signalGenerator;
void handleCommand(const JsonDocument& doc);
void handleExecutionRequest(const JsonDocument& doc);
void handleSetConfig(const JsonDocument& doc);
void handleRemoveOutliers(const JsonDocument& doc);
void handleExecuteControl(const JsonDocument& doc);
void handleOptimizePID(const JsonDocument& doc);
void handleShutdown(const JsonDocument& doc);
void handleFetchPreprocessData(const JsonDocument& doc);
void handleSetSignal(const JsonDocument& doc);
void handleSignalComplete(const JsonDocument& doc);
void handleSetModel(const JsonDocument& doc);
void handleExecuteMPC(const JsonDocument& doc);
void handleStartExperiment(const JsonDocument& doc);
void handleTerminate(const JsonDocument& doc);
private:
static const size_t MAX_SIGNAL_SIZE = 6000;
float signalBuffer[MAX_SIGNAL_SIZE];
size_t signalSize = 0;
size_t expectedTotalSamples = 0;
size_t receivedChunks = 0;
size_t expectedTotalChunks = 0;
};
JsonDispatcher.cpp
#include "JsonDispatcher.hpp"
#include "../managers/ErrorManager/ErrorManager.hpp"
#include "../managers/ConfigManager/ConfigManager.hpp"
#include "../state/StateManager.hpp"
#include "../state/SystemState.hpp"
#include "../managers/PreprocessManager/PreprocessManager.hpp"
#include "../managers/SDCardManager/SDCardManager.hpp"
#include "../managers/ShutdownManager/ShutdownManager.hpp"
#include "../control/ControlExecutionEngine/ControlExecutionEngine.hpp"
#include "../communication/SerialCommunicator/SerialCommunicator.hpp"
#include "../hardware/MotorInterface/MotorInterface.hpp"
#include "../experiment/SignalGenerator/SignalGenerator.hpp"
#include <string.h>
#include <malloc.h>
JsonDispatcher::JsonDispatcher(
ErrorManager& errorManager,
ConfigManager& configManager,
StateManager& stateManager,
PreprocessManager& preprocessManager,
ControlExecutionEngine& controlEngine,
SerialCommunicator& serialCommunicator,
ShutdownManager& shutdownManager,
MotorInterface& motorInterface,
SignalGenerator& signalGenerator
)
: errorManager(errorManager),
configManager(configManager),
stateManager(stateManager),
preprocessManager(preprocessManager),
controlEngine(controlEngine),
serialCommunicator(serialCommunicator),
shutdownManager(shutdownManager),
motorInterface(motorInterface),
signalGenerator(signalGenerator)
{
}
void JsonDispatcher::dispatch(const JsonDocument& doc)
{
if (!doc["type"].is<const char*>()) {
errorManager.report(ErrorCode::MissingType);
return;
}
if (!doc["command"].is<const char*>()) {
errorManager.report(ErrorCode::InvalidFormat);
return;
}
if (!doc["request_id"].is<int>()) {
errorManager.report(ErrorCode::InvalidFormat);
return;
}
const char* type = doc["type"];
if (strcmp(type, "command") == 0) {
handleCommand(doc);
}
else if (strcmp(type, "execution_request") == 0) {
handleExecutionRequest(doc);
}
else {
errorManager.report(ErrorCode::UnknownType);
}
}
void JsonDispatcher::handleCommand(const JsonDocument& doc)
{
const char* command = doc["command"];
if (strcmp(command, "SET_CONFIG") == 0) {
handleSetConfig(doc);
}
else if (strcmp(command, "REMOVE_OUTLIERS") == 0) {
handleRemoveOutliers(doc);
}
else if (strcmp(command, "EXECUTE_CONTROL") == 0) {
handleExecuteControl(doc);
}
else if (strcmp(command, "OPTIMIZE_PID") == 0) {
handleOptimizePID(doc);
}
else if (strcmp(command, "SHUTDOWN") == 0) {
handleShutdown(doc);
}
else if (strcmp(command, "FETCH_PREPROCESS_DATA") == 0) {
handleFetchPreprocessData(doc);
}
else if (strcmp(command, "SET_SIGNAL") == 0) {
handleSetSignal(doc);
}
else if (strcmp(command, "SIGNAL_COMPLETE") == 0) {
handleSignalComplete(doc);
}
else if (strcmp(command, "SET_MODEL") == 0) {
handleSetModel(doc);
}
else if (strcmp(command, "EXECUTE_MPC") == 0) {
handleExecuteMPC(doc);
}
else {
errorManager.report(ErrorCode::UnknownCommand);
}
}
void JsonDispatcher::handleSetConfig(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] handleSetConfig called");
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: payload not found");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
Serial.print("[JsonDispatcher] payload type: ");
Serial.println(doc["payload"].is<JsonObject>() ? "Object" : "Not Object");
JsonObjectConst payload = doc["payload"].as<JsonObjectConst>();
Serial.println("[JsonDispatcher] Applying config...");
if (!configManager.applyConfig(payload)) {
Serial.println("[JsonDispatcher] ERROR: Config apply failed");
errorManager.report(ErrorCode::ConfigApplyFailed);
return;
}
Serial.println("[JsonDispatcher] Config applied successfully, sending response");
stateManager.transitionTo(State::Configured);
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Config applied successfully";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] Response sent");
}
void JsonDispatcher::handleRemoveOutliers(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] REMOVE_OUTLIERS command received - SKIPPING");
Serial.println("[JsonDispatcher] Initializing processed data from measurement data without outlier removal");
if (!preprocessManager.initializeFromMeasurement()) {
errorManager.report(ErrorCode::PreprocessingFailed);
return;
}
if (!preprocessManager.computeVelocityAndNormalize()) {
errorManager.report(ErrorCode::PreprocessingFailed);
return;
}
Serial.println("[JsonDispatcher] Preprocessing completed without outlier removal");
StaticJsonDocument<256> progressDoc;
progressDoc["type"] = "status";
progressDoc["command"] = "PROGRESS";
progressDoc["payload"]["stage"] = "preprocessing_complete";
progressDoc["payload"]["message"] = "前処理完了(外れ値除去なし)";
serialCommunicator.sendJson(progressDoc);
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Preprocessing completed without outlier removal";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] Transitioning to IdentifyModel state");
stateManager.transitionTo(State::IdentifyModel);
}
void JsonDispatcher::handleExecuteControl(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] handleExecuteControl called");
State currentState = stateManager.getCurrentState();
if (currentState == State::ExecuteControl) {
Serial.println("[JsonDispatcher] Already in ExecuteControl, aborting current control");
stateManager.transitionTo(State::SendControlLog);
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "pending";
response["message"] = "Previous control aborted, please retry";
serialCommunicator.sendJson(response);
return;
}
if (currentState != State::WaitControlCommand && currentState != State::ExecuteControl) {
Serial.print("[JsonDispatcher] WARNING: EXECUTE_CONTROL called from unexpected state: ");
Serial.println(static_cast<int>(currentState));
if (currentState == State::Configured) {
Serial.println("[JsonDispatcher] Transitioning from Configured to WaitControlCommand");
stateManager.transitionTo(State::WaitControlCommand);
stateManager.update();
}
}
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: payload not found");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonVariantConst payloadVariant = doc["payload"];
if (!payloadVariant.is<JsonObjectConst>()) {
Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = payloadVariant.as<JsonObjectConst>();
Serial.println("[JsonDispatcher] payload validated successfully");
if (!controlEngine.execute(payload)) {
errorManager.report(ErrorCode::ControlExecutionFailed);
return;
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Control executed successfully";
serialCommunicator.sendJson(response);
}
void JsonDispatcher::handleStartExperiment(const JsonDocument& doc)
{
State currentState = stateManager.getCurrentState();
if (currentState == State::GenerateWhiteNoise ||
currentState == State::Measuring ||
currentState == State::SendRawData ||
currentState == State::IdentifyModel ||
currentState == State::SendModel) {
errorManager.report(ErrorCode::ExperimentAlreadyRunning);
return;
}
stateManager.transitionTo(State::GenerateWhiteNoise);
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Experiment started";
serialCommunicator.sendJson(response);
}
void JsonDispatcher::handleTerminate(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] TERMINATE command received");
State currentState = stateManager.getCurrentState();
if (currentState == State::Idle || currentState == State::Error || currentState == State::Boot) {
errorManager.report(ErrorCode::ExperimentNotRunning);
return;
}
Serial.println("[JsonDispatcher] Stopping motor");
motorInterface.stop();
stateManager.transitionTo(State::Idle);
if (doc["payload"].is<JsonObject>()) {
JsonObjectConst payload = doc["payload"].as<JsonObjectConst>();
if (payload["format_sd"].is<bool>() && payload["format_sd"].as<bool>()) {
stateManager.requestSDFormat();
}
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Experiment terminated";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] Experiment terminated, motor stopped");
}
void JsonDispatcher::handleExecutionRequest(const JsonDocument& doc)
{
const char* command = doc["command"];
if (strcmp(command, "START_EXPERIMENT") == 0) {
handleStartExperiment(doc);
}
else if (strcmp(command, "TERMINATE") == 0) {
handleTerminate(doc);
}
else {
errorManager.report(ErrorCode::UnknownCommand);
}
}
void JsonDispatcher::handleOptimizePID(const JsonDocument& doc)
{
if (!doc["payload"].is<JsonObject>()) {
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = doc["payload"].as<JsonObjectConst>();
if (!controlEngine.optimizePID(payload)) {
errorManager.report(ErrorCode::ControlExecutionFailed);
return;
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "PID optimization completed";
serialCommunicator.sendJson(response);
}
void JsonDispatcher::handleShutdown(const JsonDocument& doc)
{
bool formatSD = false;
if (doc["payload"].is<JsonObject>()) {
JsonObjectConst payload = doc["payload"].as<JsonObjectConst>();
if (payload["format_sd"].is<bool>()) {
formatSD = payload["format_sd"].as<bool>();
}
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = formatSD ? "Shutting down with SD format" : "Shutting down";
serialCommunicator.sendJson(response);
delay(100);
if (!shutdownManager.shutdown(formatSD)) {
errorManager.report(ErrorCode::SDWriteFailed);
}
}
void JsonDispatcher::handleFetchPreprocessData(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] FETCH_PREPROCESS_DATA command received");
Serial.println("[JsonDispatcher] NEW ARCHITECTURE: Sending position data to PC (velocity calculated on PC)");
StaticJsonDocument<256> progressDoc;
progressDoc["type"] = "status";
progressDoc["command"] = "PROGRESS";
progressDoc["payload"]["stage"] = "fetching_data";
progressDoc["payload"]["message"] = "前処理データをPC側に送信中...";
serialCommunicator.sendJson(progressDoc);
const auto& processedData = preprocessManager.getProcessedData();
if (processedData.input.empty() || processedData.encoder.empty()) {
Serial.println("[JsonDispatcher] ERROR: No preprocessed data available");
errorManager.report(ErrorCode::InvalidConfig);
return;
}
Serial.print("[JsonDispatcher] Sending ");
Serial.print(processedData.input.size());
Serial.println(" samples to PC");
struct mallinfo mi = mallinfo();
Serial.print("[JsonDispatcher] Free heap before sending: ");
Serial.println(mi.fordblks);
const size_t CHUNK_SIZE = 100;
size_t totalSamples = processedData.input.size();
size_t chunks = (totalSamples + CHUNK_SIZE - 1) / CHUNK_SIZE;
Serial.print("[JsonDispatcher] Sending in ");
Serial.print(chunks);
Serial.println(" chunks");
for (size_t chunk = 0; chunk < chunks; ++chunk) {
size_t start = chunk * CHUNK_SIZE;
size_t end = std::min(start + CHUNK_SIZE, totalSamples);
size_t chunkLen = end - start;
StaticJsonDocument<4096> chunkDoc;
chunkDoc["type"] = "preprocessed_data_chunk";
chunkDoc["chunk_index"] = chunk;
chunkDoc["total_chunks"] = chunks;
chunkDoc["chunk_size"] = chunkLen;
chunkDoc["total_samples"] = totalSamples;
JsonArray inputArray = chunkDoc.createNestedArray("input");
JsonArray positionArray = chunkDoc.createNestedArray("position");
JsonArray timeArray = chunkDoc.createNestedArray("time_ms");
for (size_t i = start; i < end; ++i) {
inputArray.add(processedData.input[i]);
positionArray.add(processedData.encoder[i]);
timeArray.add(processedData.time_ms[i]);
}
if (chunk == 0) {
JsonObject stats = chunkDoc.createNestedObject("stats");
stats["input_mean"] = processedData.input_mean;
stats["input_std"] = processedData.input_std;
stats["velocity_mean"] = processedData.velocity_mean;
stats["velocity_std"] = processedData.velocity_std;
}
serialCommunicator.sendJson(chunkDoc);
Serial.print("[JsonDispatcher] Sent chunk ");
Serial.print(chunk + 1);
Serial.print("/");
Serial.println(chunks);
delay(50);
}
StaticJsonDocument<256> completeDoc;
completeDoc["type"] = "status";
completeDoc["command"] = "PREPROCESS_DATA_SENT";
completeDoc["payload"]["total_samples"] = totalSamples;
completeDoc["payload"]["message"] = "前処理データ送信完了。PC側で同定を実行してください";
serialCommunicator.sendJson(completeDoc);
StaticJsonDocument<256> preprocessCompleteDoc;
preprocessCompleteDoc["type"] = "preprocess_complete";
preprocessCompleteDoc["samples"] = totalSamples;
serialCommunicator.sendJson(preprocessCompleteDoc);
Serial.println("[JsonDispatcher] Sent preprocess_complete notification to PC");
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "Preprocessed data sent to PC";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] All data sent to PC successfully");
stateManager.transitionTo(State::WaitModelParameters);
}
void JsonDispatcher::handleSetSignal(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] SET_SIGNAL command received");
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: Missing payload");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonVariantConst payloadVariant = doc["payload"];
if (!payloadVariant.is<JsonObjectConst>()) {
Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = payloadVariant.as<JsonObjectConst>();
int chunk_index = payload["chunk_index"] | -1;
int total_chunks = payload["total_chunks"] | -1;
int chunk_size = payload["chunk_size"] | -1;
int total_samples = payload["total_samples"] | -1;
if (chunk_index < 0 || total_chunks <= 0 || chunk_size <= 0 || total_samples <= 0) {
Serial.println("[JsonDispatcher] ERROR: Invalid chunk parameters");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
if (chunk_index == 0) {
signalSize = 0;
expectedTotalSamples = total_samples;
expectedTotalChunks = total_chunks;
receivedChunks = 0;
Serial.print("[JsonDispatcher] Starting signal reception: ");
Serial.print(total_samples);
Serial.print(" samples in ");
Serial.print(total_chunks);
Serial.println(" chunks");
if (total_samples > static_cast<int>(MAX_SIGNAL_SIZE)) {
Serial.print("[JsonDispatcher] ERROR: Signal too large: ");
Serial.print(total_samples);
Serial.print(" > ");
Serial.println(MAX_SIGNAL_SIZE);
errorManager.report(ErrorCode::InvalidConfig);
return;
}
}
if (!payload.containsKey("signal_data")) {
Serial.println("[JsonDispatcher] ERROR: Missing signal_data");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonArrayConst signal_data = payload["signal_data"].as<JsonArrayConst>();
size_t copied = 0;
for (JsonVariantConst value : signal_data) {
if (signalSize >= MAX_SIGNAL_SIZE) {
Serial.println("[JsonDispatcher] WARNING: Signal buffer full");
break;
}
signalBuffer[signalSize++] = value.as<float>();
copied++;
}
receivedChunks++;
Serial.print("[JsonDispatcher] Received chunk ");
Serial.print(chunk_index + 1);
Serial.print("/");
Serial.print(total_chunks);
Serial.print(", copied ");
Serial.print(copied);
Serial.print(" samples, total: ");
Serial.println(signalSize);
struct mallinfo mi = mallinfo();
Serial.print("[JsonDispatcher] Free heap: ");
Serial.println(mi.fordblks);
StaticJsonDocument<128> response;
response["type"] = "status";
response["command"] = "SIGNAL_CHUNK_ACK";
response["payload"]["chunk_index"] = chunk_index;
response["payload"]["received_samples"] = signalSize;
serialCommunicator.sendJson(response);
}
void JsonDispatcher::handleSignalComplete(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] SIGNAL_COMPLETE command received");
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: Missing payload");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = doc["payload"].as<JsonObjectConst>();
int total_samples = payload["total_samples"] | -1;
Serial.print("[JsonDispatcher] Signal reception complete: ");
Serial.print(signalSize);
Serial.print("/");
Serial.print(total_samples);
Serial.println(" samples");
if (signalSize != static_cast<size_t>(total_samples)) {
Serial.println("[JsonDispatcher] WARNING: Sample count mismatch");
}
if (signalSize == 0) {
Serial.println("[JsonDispatcher] ERROR: No signal data received");
errorManager.report(ErrorCode::InvalidConfig);
return;
}
signalGenerator.setSignalFromBuffer(signalBuffer, signalSize);
Serial.println("[JsonDispatcher] Signal transferred to SignalGenerator");
StaticJsonDocument<256> response;
response["type"] = "status";
response["command"] = "SIGNAL_READY";
response["payload"]["total_samples"] = signalSize;
response["payload"]["message"] = "Signal received and ready";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] Transitioning to Measuring state (skipping signal generation)");
stateManager.transitionTo(State::Measuring);
}
void JsonDispatcher::handleSetModel(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] SET_MODEL command received");
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: Missing payload");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonVariantConst payloadVariant = doc["payload"];
if (!payloadVariant.is<JsonObjectConst>()) {
Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = payloadVariant.as<JsonObjectConst>();
if (!controlEngine.setARXModel(payload)) {
Serial.println("[JsonDispatcher] ERROR: Failed to set ARX model");
return;
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "ARX model set for MPC";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] ARX model set successfully, MPC ready");
}
void JsonDispatcher::handleExecuteMPC(const JsonDocument& doc)
{
Serial.println("[JsonDispatcher] EXECUTE_MPC command received");
if (!doc.containsKey("payload")) {
Serial.println("[JsonDispatcher] ERROR: Missing payload");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonVariantConst payloadVariant = doc["payload"];
if (!payloadVariant.is<JsonObjectConst>()) {
Serial.println("[JsonDispatcher] ERROR: payload is not a JSON object");
errorManager.report(ErrorCode::InvalidFormat);
return;
}
JsonObjectConst payload = payloadVariant.as<JsonObjectConst>();
if (!controlEngine.executeMPC(payload)) {
Serial.println("[JsonDispatcher] ERROR: MPC execution failed");
return;
}
StaticJsonDocument<256> response;
response["type"] = "response";
response["request_id"] = doc["request_id"];
response["status"] = "success";
response["message"] = "MPC velocity control executed";
serialCommunicator.sendJson(response);
Serial.println("[JsonDispatcher] MPC control completed, transitioning to SendControlLog");
stateManager.transitionTo(State::SendControlLog);
}
(c) identification_module.py -同定エンジン-
- 設計の要点
- 前処理アルゴリズム:計測データには必ずノイズやサンプリングのわずかな揺らぎが含まれます。本コードでは低パスフィルタ、線形補間による等間隔サンプリング化、外れ値の除去を統合し、同定精度を高めています。
- 最小二乗法によるパラメーター推定:過去の入出力履歴から回帰行列を構成し、正規方程式を解くことで、実機の動特性を決定する係数を瞬時に算出します。
- 定量的評価の実装:同定して終わりではなく、Fit率(適合率)を算出する機能を備えています。これにより、構築されたモデルがどれだけ実機を忠実に再現できているか(データツインとしての精度)を客観的に評価できます。
identification_module.py
import numpy as np
from typing import Dict, List, Tuple, Optional, Callable
import threading
import time
from scipy.signal import butter, filtfilt
class ARXIdentifier:
@staticmethod
def lowpass_filter(data: np.ndarray, Ts: float, cutoff: float = 5.0) -> np.ndarray:
nyq = 0.5 / Ts
b, a = butter(2, cutoff / nyq)
return filtfilt(b, a, data)
@staticmethod
def interpolate_uniform_sampling(time_data: np.ndarray, data: np.ndarray, Ts_target: float) -> Tuple[np.ndarray, np.ndarray]:
if len(time_data) != len(data):
raise ValueError(f"time_data length ({len(time_data)}) != data length ({len(data)})")
if len(time_data) < 2:
raise ValueError(f"insufficient data points ({len(time_data)})")
t_uniform = np.arange(time_data[0], time_data[-1], Ts_target)
data_interp = np.interp(t_uniform, time_data, data)
return t_uniform, data_interp
@staticmethod
def remove_outliers(data: np.ndarray, n_std: float = 5.0) -> np.ndarray:
std = np.std(data)
return np.clip(data, -n_std * std, n_std * std)
@staticmethod
def create_regression_matrix(y: np.ndarray, u: np.ndarray, na: int, nb: int) -> Tuple[np.ndarray, np.ndarray]:
N = len(y)
max_lag = max(na, nb)
phi = []
y_target = y[max_lag:]
for t in range(max_lag, N):
row = []
row.extend(y[t-na:t][::-1])
row.extend(u[t-nb:t][::-1])
phi.append(row)
return np.array(phi), y_target
@staticmethod
def fit_percent(y_true: np.ndarray, y_pred: np.ndarray) -> float:
num = np.linalg.norm(y_true - y_pred)
den = np.linalg.norm(y_true - np.mean(y_true))
if den == 0:
return 0.0
fit = 100.0 * (1 - num / den)
return max(0.0, fit)
def identify(self, input_data: np.ndarray, velocity_data: np.ndarray,
time_data_ms: Optional[np.ndarray] = None,
sampling_period_ms: float = 10.0,
apply_preprocessing: bool = True,
model_type: str = "ARX",
na_range: range = range(1, 5), nb_range: range = range(1, 5),
progress_callback: Optional[Callable[[str], None]] = None) -> Dict:
def log(msg: str):
if progress_callback:
progress_callback(msg)
if apply_preprocessing:
Ts = sampling_period_ms / 1000.0
if time_data_ms is None:
time_data = np.arange(len(input_data)) * Ts
else:
time_data = time_data_ms / 1000.0
if time_data_ms is not None:
time_uniform, u_interp = self.interpolate_uniform_sampling(time_data, input_data, Ts)
_, vel_interp = self.interpolate_uniform_sampling(time_data, velocity_data, Ts)
else:
u_interp = input_data.copy()
vel_interp = velocity_data.copy()
u_filtered = self.lowpass_filter(u_interp, Ts, cutoff=5.0)
vel_filtered = self.lowpass_filter(vel_interp, Ts, cutoff=5.0)
vel_cleaned = self.remove_outliers(vel_filtered, n_std=5.0)
input_processed = u_filtered
velocity_processed = vel_cleaned
else:
input_processed = input_data
velocity_processed = velocity_data
u_mean = float(np.mean(input_processed))
u_std = float(np.std(input_processed)) if np.std(input_processed) > 0 else 1.0
vel_mean = float(np.mean(velocity_processed))
vel_std = float(np.std(velocity_processed)) if np.std(velocity_processed) > 0 else 1.0
u_norm = (input_processed - u_mean) / u_std
vel_norm = (velocity_processed - vel_mean) / vel_std
N = len(u_norm)
Ntrain = int(N * 0.8)
vel_tr = vel_norm[:Ntrain]
vel_te = vel_norm[Ntrain:]
u_tr = u_norm[:Ntrain]
u_te = u_norm[Ntrain:]
if model_type == "ARX":
best_params = self._identify_arx(vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log)
elif model_type == "FIR":
best_params = self._identify_fir(vel_tr, vel_te, u_tr, u_te, nb_range, u_mean, u_std, vel_mean, vel_std, log)
elif model_type == "ARARX":
best_params = self._identify_ararx(vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log)
else:
raise ValueError(f"Unknown model type: {model_type}")
best_params['model_type'] = model_type
best_params['input_processed'] = input_processed.tolist()
best_params['velocity_processed'] = velocity_processed.tolist()
return best_params
def _identify_arx(self, vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log):
best_fit = -np.inf
best_params = None
total_combinations = len(na_range) * len(nb_range)
current = 0
for na in na_range:
for nb in nb_range:
current += 1
try:
phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, na, nb)
theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0]
phi_te, y_te_target = self.create_regression_matrix(
np.concatenate([vel_tr[-na:], vel_te]),
np.concatenate([u_tr[-nb:], u_te]),
na, nb
)
ysim_te = phi_te @ theta
fit = self.fit_percent(y_te_target, ysim_te)
if fit > best_fit:
best_fit = fit
best_params = {
'na': na, 'nb': nb, 'nk': 0,
'coefficients': theta.tolist(),
'fit_percent': fit,
'u_mean': u_mean, 'u_std': u_std,
'vel_mean': vel_mean, 'vel_std': vel_std
}
except Exception:
continue
if best_params is None:
raise RuntimeError("Failed to identify ARX model")
return best_params
def _identify_fir(self, vel_tr, vel_te, u_tr, u_te, nb_range, u_mean, u_std, vel_mean, vel_std, log):
best_fit = -np.inf
best_params = None
for nb in nb_range:
try:
phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, 0, nb)
theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0]
phi_te, y_te_target = self.create_regression_matrix(
vel_te,
np.concatenate([u_tr[-nb:], u_te]),
0, nb
)
ysim_te = phi_te @ theta
fit = self.fit_percent(y_te_target, ysim_te)
if fit > best_fit:
best_fit = fit
best_params = {
'na': 0, 'nb': nb, 'nk': 0,
'coefficients': theta.tolist(),
'fit_percent': fit,
'u_mean': u_mean, 'u_std': u_std,
'vel_mean': vel_mean, 'vel_std': vel_std
}
except Exception:
continue
if best_params is None:
raise RuntimeError("Failed to identify FIR model")
return best_params
def _identify_ararx(self, vel_tr, vel_te, u_tr, u_te, na_range, nb_range, u_mean, u_std, vel_mean, vel_std, log):
extended_na_range = range(max(na_range) + 1, min(max(na_range) + 3, 8))
extended_nb_range = range(max(nb_range) + 1, min(max(nb_range) + 3, 8))
best_fit = -np.inf
best_params = None
for na in extended_na_range:
for nb in extended_nb_range:
try:
phi_tr, y_tr_target = self.create_regression_matrix(vel_tr, u_tr, na, nb)
theta = np.linalg.lstsq(phi_tr, y_tr_target, rcond=None)[0]
phi_te, y_te_target = self.create_regression_matrix(
np.concatenate([vel_tr[-na:], vel_te]),
np.concatenate([u_tr[-nb:], u_te]),
na, nb
)
ysim_te = phi_te @ theta
fit = self.fit_percent(y_te_target, ysim_te)
if fit > best_fit:
best_fit = fit
best_params = {
'na': na, 'nb': nb, 'nk': 0,
'coefficients': theta.tolist(),
'fit_percent': fit,
'u_mean': u_mean, 'u_std': u_std,
'vel_mean': vel_mean, 'vel_std': vel_std
}
except Exception:
continue
if best_params is None:
raise RuntimeError("Failed to identify ARARX model")
return best_params
class IdentificationModule:
def __init__(self, serial_manager):
self.serial_manager = serial_manager
self.identifier = ARXIdentifier()
self.data_chunks = []
self.total_chunks = 0
self.stats = None
self.identification_thread: Optional[threading.Thread] = None
self.is_identifying = False
self.current_model_type = "ARX"
self.on_identification_complete: Optional[Callable[[Dict], None]] = None
self.on_identification_progress: Optional[Callable[[str], None]] = None
self.on_identification_error: Optional[Callable[[str], None]] = None
def handle_data_chunk(self, chunk_data: Dict):
chunk_index = chunk_data['chunk_index']
total_chunks = chunk_data['total_chunks']
if self.on_identification_progress:
self.on_identification_progress(f"データ受信中: {chunk_index + 1}/{total_chunks}")
self.data_chunks.append(chunk_data)
self.total_chunks = total_chunks
if 'stats' in chunk_data:
self.stats = chunk_data['stats']
if len(self.data_chunks) == self.total_chunks and self.total_chunks > 0:
if self.on_identification_progress:
self.on_identification_progress("全データ受信完了")
def _reconstruct_data(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
self.data_chunks.sort(key=lambda x: x['chunk_index'])
input_data = []
velocity_data = []
time_data = []
position_data = []
for chunk in self.data_chunks:
chunk_input = chunk.get('input', [])
chunk_time = chunk.get('time_ms', [])
chunk_position = chunk.get('position', [])
min_len = min(len(chunk_input), len(chunk_time), len(chunk_position))
if len(chunk_input) != len(chunk_time) or len(chunk_input) != len(chunk_position):
chunk_input = chunk_input[:min_len]
chunk_time = chunk_time[:min_len]
chunk_position = chunk_position[:min_len]
input_data.extend(chunk_input)
time_data.extend(chunk_time)
position_data.extend(chunk_position)
return np.array(input_data), np.array([]), np.array(time_data), np.array(position_data)
def execute_identification(self, model_type: str):
if self.is_identifying:
return
if not self.data_chunks:
if self.on_identification_error:
self.on_identification_error("前処理データがありません")
return
self.current_model_type = model_type
self.identification_thread = threading.Thread(target=self._run_identification)
self.identification_thread.daemon = True
self.identification_thread.start()
def _run_identification(self):
try:
self.is_identifying = True
input_data, _, time_data, position_data = self._reconstruct_data()
if len(input_data) == 0 or len(position_data) == 0 or len(time_data) == 0:
raise ValueError("Reconstructed data is empty")
if len(time_data) > 1:
sampling_period_ms = np.mean(np.diff(time_data))
else:
sampling_period_ms = 10.0
Ts = sampling_period_ms / 1000.0
if len(position_data) > 0:
velocity_pc = -np.diff(position_data, prepend=position_data[0]) / Ts
self.position_data = position_data
self.velocity_pc = velocity_pc
else:
raise ValueError("No position data available")
if self.current_model_type == "ARX":
na_range = range(1, 5)
nb_range = range(1, 5)
elif self.current_model_type == "FIR":
na_range = range(0, 1)
nb_range = range(5, 16)
elif self.current_model_type == "ARARX":
na_range = range(1, 6)
nb_range = range(1, 6)
else:
na_range = range(1, 5)
nb_range = range(1, 5)
params = self.identifier.identify(
input_data,
velocity_pc,
time_data_ms=time_data,
sampling_period_ms=sampling_period_ms,
apply_preprocessing=True,
model_type=self.current_model_type,
na_range=na_range,
nb_range=nb_range,
progress_callback=self.on_identification_progress
)
na = params['na']
nb = params['nb']
coefficients = params['coefficients']
if na > 0:
params['a_coeffs'] = coefficients[:na]
params['b_coeffs'] = coefficients[na:na+nb]
else:
params['a_coeffs'] = []
params['b_coeffs'] = coefficients[:nb]
params['sampling_period_ms'] = sampling_period_ms
if self.on_identification_complete:
self.on_identification_complete(params)
except Exception as e:
if self.on_identification_error:
self.on_identification_error(str(e))
finally:
self.is_identifying = False
def _send_model_parameters(self, params: Dict):
message = {
"type": "command",
"command": "SET_MODEL",
"request_id": self.serial_manager.request_id_counter,
"payload": {
"model_type": "ARX",
"parameters": {
"na": params['na'],
"nb": params['nb'],
"nk": params['nk'],
"coefficients": params['coefficients']
},
"validation": {
"fit_percent": params['fit_percent']
}
}
}
self.serial_manager.request_id_counter += 1
import json
json_str = json.dumps(message) + "\n"
self.serial_manager.serial_port.write(json_str.encode('utf-8'))
self.serial_manager.serial_port.flush()
if self.on_identification_progress:
self.on_identification_progress(f"モデルパラメータ送信完了: Fit={params['fit_percent']:.2f}%")
def reset(self):
self.data_chunks = []
self.total_chunks = 0
self.stats = None
self.is_identifying = False
(d) pid_optimizer.py -シミュレーションによるリスクフリーな自動チューニング-
-
設計の要点
-高速シミュレーション: Pythonの実行速度を補うため、一部に Numba(JITコンパイラ)を導入し、数万通りのパラメータセットによる挙動シミュレーションを数秒で完了させます。
-
台形加速プロファイルの考慮: 単なるステップ応答だけでなく、実用的な台形加速プロファイルを用いた位置決め制御のシミュレーションに対応しており、オーバーシュートや整定時間のトレードオフを自動で最適化します。
-
安全なパラメータ導出: 実機で「発散(ハンチング)」を試行錯誤するリスクを完全に排除し、デジタルツイン上で理論的に裏付けられた安全なパラメータのみをSpresenseへフィードバックするワークフローを実現しました。
-
pid_optimizer.py
import numpy as np
from typing import Dict, Tuple, Optional, Callable
import threading
try:
from numba import njit
NUMBA_AVAILABLE = True
except ImportError:
NUMBA_AVAILABLE = False
def njit(*args, **kwargs):
def decorator(func):
return func
return decorator
class TrapezoidalProfile:
@staticmethod
def generate(target_position, max_velocity, acceleration, Ts, total_time):
N = int(total_time / Ts)
t = np.arange(N) * Ts
Ta = max_velocity / acceleration
Na = int(Ta / Ts)
dist_ad = max_velocity * Ta
dist_const = target_position - dist_ad
if dist_const > 0:
Tc = dist_const / max_velocity
Nc = int(Tc / Ts)
else:
Nc = 0
max_velocity = np.sqrt(acceleration * target_position)
Ta = max_velocity / acceleration
Na = int(Ta / Ts)
N_stop = min(2 * Na + Nc, N)
vel = np.zeros(N)
for k in range(min(Na, N)):
vel[k] = acceleration * k * Ts
for k in range(Na, min(Na + Nc, N)):
vel[k] = max_velocity
for k in range(Na + Nc, N_stop):
vel[k] = max(0.0, max_velocity - acceleration * (k - Na - Nc) * Ts)
pos = np.cumsum(vel * Ts)
return t, vel, pos, N_stop
@njit(cache=True)
def simulate_arx_pid_numba(
Kp, Ki, Kd, Kff,
target_pos, target_vel, Ts,
na, nb, a, b,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_final,
u_mean, u_std, vel_mean, vel_std
):
N = len(target_pos)
y_hist = np.zeros(na)
u_hist = np.zeros(nb)
pos = np.zeros(N)
vel = np.zeros(N)
u_out = np.zeros(N)
integ = 0.0
prev_err = 0.0
sse = 0.0
u_energy = 0.0
max_pos = 0.0
diff_pen = 0.0
diff_cnt = 0
ERROR_MAX = 1e5
VEL_MAX = 1e4
for k in range(N):
p = pos[k-1] if k > 0 else 0.0
err = target_pos[k] - p
err = min(max(err, -ERROR_MAX), ERROR_MAX)
integ += err * Ts
deriv = (err - prev_err) / Ts
prev_err = err
u = Kp*err + Ki*integ + Kd*deriv + Kff*target_vel[k]
u = min(max(u, -u_max), u_max)
u_out[k] = u
if k < N_stop:
sse += err * err
u_energy += u * u
vel_n = 0.0
for i in range(na):
vel_n += a[i] * y_hist[i]
for i in range(nb):
vel_n += b[i] * u_hist[i]
vel_n = min(max(vel_n, -VEL_MAX), VEL_MAX)
vel[k] = vel_n * vel_std + vel_mean
pos[k] = vel[k] * Ts if k == 0 else pos[k-1] + vel[k] * Ts
if k >= N_stop:
max_pos = max(max_pos, pos[k])
if k >= diff_interval and k < N_stop and k % diff_interval == 0:
td = target_pos[k] - target_pos[k-diff_interval]
ad = pos[k] - pos[k-diff_interval]
diff_pen += ((td - ad) / max(abs(td), 1e-6))**2
diff_cnt += 1
for i in range(na-1, 0, -1):
y_hist[i] = y_hist[i-1]
if na > 0:
y_hist[0] = vel_n
for i in range(nb-1, 0, -1):
u_hist[i] = u_hist[i-1]
if nb > 0:
u_hist[0] = -(u - u_mean) / u_std
cost = (
sse / max(N_stop, 1)
+ lambda_u * (u_energy / max(N, 1))
+ lambda_os * max(0.0, max_pos - target_final)
+ lambda_dp * (diff_pen / max(diff_cnt, 1))
)
if not np.isfinite(cost):
cost = 1e12
return cost, pos, vel, u_out
class ARXPIDSimulator:
@staticmethod
def simulate(Kp, Ki, Kd, Kff,
target_pos, target_vel, Ts,
na, nb, theta,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_final,
u_mean, u_std, vel_mean, vel_std):
a = theta[:na].astype(np.float64)
b = theta[na:].astype(np.float64)
return simulate_arx_pid_numba(
Kp, Ki, Kd, Kff,
target_pos.astype(np.float64),
target_vel.astype(np.float64),
Ts, na, nb, a, b,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_final,
u_mean, u_std, vel_mean, vel_std
)
class PIDOptimizer:
def __init__(self, model_params: Dict):
self.na = model_params['na']
self.nb = model_params['nb']
self.theta = np.array(model_params['coefficients'], dtype=np.float64)
self.u_std = model_params.get('u_std', 1.0)
self.vel_std = model_params.get('vel_std', 1.0)
self.u_mean = model_params.get('u_mean', 0.0)
self.vel_mean = model_params.get('vel_mean', 0.0)
def optimize(
self,
target_position: float,
max_velocity: float = 80.0,
acceleration: float = 130.0,
sampling_period_ms: float = 30.0,
total_time_s: float = 2.0,
u_max: float = 255.0,
u_min_abs: float = 30.0,
lambda_u: float = 0.0005,
lambda_os: float = 10.0,
lambda_dp: float = 0.5,
max_iterations: int = 5000,
progress_callback: Optional[Callable[[str], None]] = None,
**kwargs
) -> Dict:
def log(msg: str):
if progress_callback:
progress_callback(msg)
Ts = sampling_period_ms / 1000.0
diff_interval = 10
t, vref, pref, N_stop = TrapezoidalProfile.generate(
target_position, max_velocity, acceleration, Ts, total_time_s
)
params = np.array([1.0, 0.01, 0.01, 0.5])
lr = kwargs.get('learning_rate', 0.08)
momentum = 0.5
velocity = np.zeros_like(params)
cost_history = []
best_cost = float('inf')
best_params = params.copy()
no_improve_count = 0
for i in range(max_iterations):
cost, pos_out, vel_out, u_out = ARXPIDSimulator.simulate(
params[0], params[1], params[2], params[3],
pref, vref, Ts,
self.na, self.nb, self.theta,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_position,
self.u_mean, self.u_std, self.vel_mean, self.vel_std
)
cost_history.append(cost)
if cost < best_cost:
best_cost = cost
best_params = params.copy()
no_improve_count = 0
else:
no_improve_count += 1
grad = np.zeros_like(params)
eps = 0.01
for j in range(4):
p_plus = params.copy()
p_minus = params.copy()
p_plus[j] += eps
p_minus[j] -= eps
p_plus = np.maximum(p_plus, [0.1, 0.001, 0.001, 0.1])
p_minus = np.maximum(p_minus, [0.1, 0.001, 0.001, 0.1])
c_plus, *_ = ARXPIDSimulator.simulate(
p_plus[0], p_plus[1], p_plus[2], p_plus[3],
pref, vref, Ts,
self.na, self.nb, self.theta,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_position,
self.u_mean, self.u_std, self.vel_mean, self.vel_std
)
c_minus, *_ = ARXPIDSimulator.simulate(
p_minus[0], p_minus[1], p_minus[2], p_minus[3],
pref, vref, Ts,
self.na, self.nb, self.theta,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_position,
self.u_mean, self.u_std, self.vel_mean, self.vel_std
)
grad[j] = (c_plus - c_minus) / (2 * eps)
if not np.all(np.isfinite(grad)):
continue
grad_norm = np.linalg.norm(grad)
if grad_norm > 1e-10:
grad_direction = grad / grad_norm
param_scales = np.array([0.5, 0.05, 0.05, 0.2])
scaled_grad = grad_direction * param_scales
else:
scaled_grad = np.zeros_like(grad)
velocity = momentum * velocity - lr * scaled_grad
params = params + velocity
params = np.maximum(params, [0.1, 0.001, 0.001, 0.1])
params = np.minimum(params, [50.0, 10.0, 10.0, 5.0])
if no_improve_count > 1000:
params = best_params
break
if i % 300 == 0:
log(f"Iter {i}: Cost={cost:.2f}")
final_cost, pos_out, vel_out, u_out = ARXPIDSimulator.simulate(
params[0], params[1], params[2], params[3],
pref, vref, Ts,
self.na, self.nb, self.theta,
u_max, u_min_abs,
lambda_u, lambda_os, lambda_dp,
diff_interval, N_stop, target_position,
self.u_mean, self.u_std, self.vel_mean, self.vel_std
)
return {
'Kp': float(params[0]),
'Ki': float(params[1]),
'Kd': float(params[2]),
'Kff': float(params[3]),
'cost': float(final_cost),
'iterations': len(cost_history),
'cost_history': cost_history,
'time': t.tolist(),
'position_sim': pos_out.tolist(),
'velocity_sim': vel_out.tolist(),
'control_sim': u_out.tolist(),
'target_position': pref.tolist(),
'target_velocity': vref.tolist()
}
class ModelBasedParameterEstimator:
def __init__(self, model_params: Dict):
self.na = model_params['na']
self.nb = model_params['nb']
self.theta = np.array(model_params['coefficients'], dtype=np.float64)
self.u_std = model_params.get('u_std', 1.0)
self.vel_std = model_params.get('vel_std', 1.0)
self.u_mean = model_params.get('u_mean', 0.0)
self.vel_mean = model_params.get('vel_mean', 0.0)
self.model_Ts = model_params.get('sampling_period_ms', 30.0) / 1000.0
def simulate_step_response(self, u_step: float, duration_s: float = 2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Ts = self.model_Ts
N = int(duration_s / Ts)
a = self.theta[:self.na]
b = self.theta[self.na:]
y_hist = np.zeros(self.na)
u_hist = np.zeros(self.nb)
time = np.arange(N) * Ts
velocity = np.zeros(N)
position = np.zeros(N)
for k in range(N):
vel_n = 0.0
for i in range(self.na):
vel_n += a[i] * y_hist[i]
for i in range(self.nb):
vel_n += b[i] * u_hist[i]
velocity[k] = vel_n * self.vel_std + self.vel_mean
if k == 0:
position[k] = velocity[k] * Ts
else:
position[k] = position[k-1] + velocity[k] * Ts
for i in range(self.na - 1, 0, -1):
y_hist[i] = y_hist[i-1]
if self.na > 0:
y_hist[0] = vel_n
for i in range(self.nb - 1, 0, -1):
u_hist[i] = u_hist[i-1]
if self.nb > 0:
u_hist[0] = -(u_step - self.u_mean) / self.u_std
return time, velocity, position
def estimate_system_characteristics(self, u_max: float = 100.0) -> Dict:
a = self.theta[:self.na]
b = self.theta[self.na:]
sum_a = np.sum(a)
sum_b = np.sum(b)
u_norm = -(u_max - self.u_mean) / self.u_std
if abs(1 - sum_a) > 1e-6:
vel_norm_ss = u_norm * sum_b / (1 - sum_a)
analytical_steady_velocity = vel_norm_ss * self.vel_std + self.vel_mean
else:
analytical_steady_velocity = None
time, velocity, position = self.simulate_step_response(u_max, duration_s=3.0)
steady_start = int(len(velocity) * 0.8)
simulated_steady_velocity = np.mean(np.abs(velocity[steady_start:]))
if analytical_steady_velocity is not None:
steady_velocity = abs(analytical_steady_velocity)
else:
steady_velocity = simulated_steady_velocity
target_63 = 0.632 * simulated_steady_velocity
time_constant = 0.1
for i, v in enumerate(np.abs(velocity)):
if v >= target_63:
time_constant = time[i]
break
accel_samples = min(20, len(velocity) // 4)
if accel_samples > 1:
dt = time[1] - time[0]
accelerations = np.diff(velocity[:accel_samples]) / dt
acceleration_rate = np.mean(np.abs(accelerations))
else:
acceleration_rate = steady_velocity / time_constant if time_constant > 0 else 1000.0
gain = steady_velocity / u_max if u_max > 0 else 0
return {
'max_steady_velocity': steady_velocity,
'acceleration_rate': acceleration_rate,
'time_constant': time_constant,
'gain': gain
}
def estimate_control_parameters(
self,
target_position: float,
total_time_s: float,
u_max: float = 100.0,
safety_factor: float = 0.8
) -> Dict:
chars = self.estimate_system_characteristics(u_max)
max_achievable_velocity = chars['max_steady_velocity'] * safety_factor
max_achievable_accel = chars['acceleration_rate'] * safety_factor
S = abs(target_position)
T = total_time_s
a = max_achievable_accel
discriminant = T * T * a * a - 4 * S * a
if discriminant >= 0:
V_ideal = (T * a - np.sqrt(discriminant)) / 2
V_ideal = max(V_ideal, 1.0)
else:
T_min = 2 * np.sqrt(S / a) if a > 0 else float('inf')
V_ideal = np.sqrt(S * a) if a > 0 else max_achievable_velocity
recommended_velocity = min(V_ideal, max_achievable_velocity)
t_accel_target = total_time_s * 0.3
if t_accel_target > 0 and recommended_velocity > 0:
required_accel = recommended_velocity / t_accel_target
recommended_accel = min(required_accel, max_achievable_accel)
else:
recommended_accel = max_achievable_accel
if recommended_accel > 0 and recommended_velocity > 0:
t_accel_actual = recommended_velocity / recommended_accel
dist_accel = 0.5 * recommended_accel * t_accel_actual ** 2
dist_cruise = S - 2 * dist_accel
if dist_cruise > 0:
t_cruise = dist_cruise / recommended_velocity
estimated_time = 2 * t_accel_actual + t_cruise
else:
actual_peak_vel = np.sqrt(S * recommended_accel)
t_up = actual_peak_vel / recommended_accel
estimated_time = 2 * t_up
else:
estimated_time = float('inf')
achievable = estimated_time <= total_time_s * 1.2
return {
'max_velocity': recommended_velocity,
'acceleration': recommended_accel,
'achievable': achievable,
'estimated_time': estimated_time,
'system_max_velocity': chars['max_steady_velocity'],
'system_max_acceleration': chars['acceleration_rate']
}
class PIDOptimizationModule:
def __init__(self, serial_manager):
self.serial_manager = serial_manager
self.optimizer = None
self.parameter_estimator = None
self.optimization_thread: Optional[threading.Thread] = None
self.is_optimizing = False
self.on_optimization_complete: Optional[Callable[[Dict], None]] = None
self.on_optimization_progress: Optional[Callable[[str], None]] = None
self.on_optimization_error: Optional[Callable[[str], None]] = None
def set_model(self, model_params: Dict):
self.optimizer = PIDOptimizer(model_params)
self.parameter_estimator = ModelBasedParameterEstimator(model_params)
def estimate_control_parameters(
self,
target_position: float,
total_time_s: float,
u_max: float = 100.0,
safety_factor: float = 0.7
) -> Optional[Dict]:
if self.parameter_estimator is None:
return None
return self.parameter_estimator.estimate_control_parameters(
target_position=target_position,
total_time_s=total_time_s,
u_max=u_max,
safety_factor=safety_factor
)
def get_system_characteristics(self, u_max: float = 100.0) -> Optional[Dict]:
if self.parameter_estimator is None:
return None
return self.parameter_estimator.estimate_system_characteristics(u_max)
def start_optimization(
self,
target_position: float,
max_velocity: float = 80.0,
acceleration: float = 130.0,
**kwargs
):
if self.optimizer is None:
if self.on_optimization_error:
self.on_optimization_error("Model not set")
return
if self.is_optimizing:
return
self.optimization_thread = threading.Thread(
target=self._run_optimization,
args=(target_position, max_velocity, acceleration),
kwargs=kwargs,
daemon=True
)
self.optimization_thread.start()
def _run_optimization(self, target_position: float, max_velocity: float, acceleration: float, **kwargs):
try:
self.is_optimizing = True
result = self.optimizer.optimize(
target_position=target_position,
max_velocity=max_velocity,
acceleration=acceleration,
progress_callback=self.on_optimization_progress,
**kwargs
)
if self.on_optimization_complete:
self.on_optimization_complete(result)
except Exception as e:
if self.on_optimization_error:
self.on_optimization_error(str(e))
finally:
self.is_optimizing = False
def reset(self):
self.is_optimizing = False

-
L_tiem
さんが
2026/01/31
に
編集
をしました。
(メッセージ: 初版)
-
L_tiem
さんが
前の土曜日の12:05
に
編集
をしました。
Opening
JO
前の土曜日の16:47
ログインしてコメントを投稿するこの投稿のシステムは通信によりマイコンデータを取得するが、通信した時点で「リアルタイム」ではなくなる
処理を含めて「マイコン側」での処理が必要ではなかろうか?
適応制御・MPC最適化 最適化が必要なのに実装されていません
最適化計算をPC側でやろうとすると、シリアル通信の往復レイテンシ(数ms〜数十ms)が制御周期と同じオーダーになり、位相遅れが生じて系が不安定化するリスクがあります
現在のSpresenseのサブコアの空きリソースや、制御周期の目標値はどの程度でしょうか?それに応じて、MPCの予測ホライズンや適応制御のアルゴリズムを調整する必要があります。
話は変わりますが、ドローンの飛行制御の様に、6軸センサーからモーターへのPID制御等では、更に高速性が求められますね