keiske-hongyo が 2023年05月02日00時05分58秒 に編集
コメント無し
本文の変更
## はじめに 以前から部屋の温度や湿度、Co2濃度観測するもの作成したいと思っていました。ただ、コンソールで出力するのではあまりおもしろくないので、MQTTを使用してネットワークに定期的にセンサの状態を配信するようにしてみました。 ## 構成 今回はマイコンにSeeed Xiao ESP32Cを使用しました。データの配信にはMQTTを使用して配信をしています。MQTTのBrokerには、Mosquittoを使用しています。Mosquittoは公開サーバを使用せず、ローカルでサーバを立ち上げテストをしています。Mosquittoのインストールや稼働についてはインターネットに情報がたくさんあるので調べてみてください。
![構成図](https://camo.elchika.com/3fd16380bb29c4c0eff5c907c4046ef059bac54a/687474703a2f2f73746f726167652e676f6f676c65617069732e636f6d2f656c6368696b612f76312f757365722f33646164353039642d616133622d343132302d616433622d6638623565376137366139332f35313535393765332d386637392d343538392d393835332d343835633936643335653138/)
![構成図](https://camo.elchika.com/6dbd11d509e1a1c29a29d724aaaea96d7801907c/687474703a2f2f73746f726167652e676f6f676c65617069732e636f6d2f656c6368696b612f76312f757365722f33646164353039642d616133622d343132302d616433622d6638623565376137366139332f61343665353437302d386336632d343961362d626263652d346532633763386234656639/)
## 使用部品 使用部品は下の表に示す部品を使用しました。 |使用部品|型名|備考| |---|---|---|
|マイコン|Seeed Xaio ESP32C |秋月電子で購入|
|マイコン|Seeed Xiao ESP32C |秋月電子で購入|
|温度・湿度・気圧センサ|BME280|秋月電子で購入| |二酸化炭素濃度センサ|MHZ-19C|秋月電子で購入| ## 回路図 下の図は今回の回路図になります。BME280はI2CでMHZ-19CはUARTで通信を行い、データを取得しています。 ![回路図](https://camo.elchika.com/2ded4b0a9d4ed1d987fd9a25ad85bc42a75107e1/687474703a2f2f73746f726167652e676f6f676c65617069732e636f6d2f656c6368696b612f76312f757365722f33646164353039642d616133622d343132302d616433622d6638623565376137366139332f64643435323338652d343234322d343766382d396239662d396138386661646462373530/) ## プログラムについて マイコン側については、ネットワークでデータを配信すること、手早く作成することを目標としたので、Micro Pythonを使用して作成しました。また、購読側は 、Golangを使用して作成しました。 ### 配信側側のプログラム センサデータを配信するマイコンのプログラムはMicropythonを使用しています。BME280のセンサの取得は[こちらのサイト](https://qiita.com/sirius1000/items/e536b42099dbaae7e1a2)を利用しました。 ```python:main.py import time import ujson as json from umqtt.simple import MQTTClient from machine import Pin from machine import I2C from sensor.bme280 import BME280 from sensor.mhz_19c import mhz19C if __name__ == "__main__": led = Pin(2,Pin.OUT) i2c = I2C(0, scl=Pin(7), sda=Pin(6), freq=10000) sensor = mhz19C() bme = BME280(i2c=i2c) c = MQTTClient("umqtt_client", "<Brokerのアドレス>", port=<ポート番号>) while True: c.connect() led.off() ppm = sensor.getdata() temp = bme.temperature hum = bme.humidity press = bme.pressure if ppm is None: print('Data None') ppm = 9999 else: print(ppm, end='[ppm]\n') print(temp,end='[℃]\n') print(hum,end='[%]\n') print(press,end='[hPa]\n') d = {'temp': temp, 'hum': hum, 'press':press} data = {'co2': ppm, 'bme280': d} c.publish("test/sensor", json.dumps(data)) c.disconnect() time.sleep(1) led.on() time.sleep(1) ``` ```python:bme280.py import time # BME280 default address. BME280_I2CADDR = 0x76 # Operating Modes BME280_OSAMPLE_1 = 1 BME280_OSAMPLE_2 = 2 BME280_OSAMPLE_4 = 3 BME280_OSAMPLE_8 = 4 BME280_OSAMPLE_16 = 5 # BME280 Registers BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers BME280_REGISTER_DIG_T2 = 0x8A BME280_REGISTER_DIG_T3 = 0x8C BME280_REGISTER_DIG_P1 = 0x8E BME280_REGISTER_DIG_P2 = 0x90 BME280_REGISTER_DIG_P3 = 0x92 BME280_REGISTER_DIG_P4 = 0x94 BME280_REGISTER_DIG_P5 = 0x96 BME280_REGISTER_DIG_P6 = 0x98 BME280_REGISTER_DIG_P7 = 0x9A BME280_REGISTER_DIG_P8 = 0x9C BME280_REGISTER_DIG_P9 = 0x9E BME280_REGISTER_DIG_H1 = 0xA1 BME280_REGISTER_DIG_H2 = 0xE1 BME280_REGISTER_DIG_H3 = 0xE3 BME280_REGISTER_DIG_H4 = 0xE4 BME280_REGISTER_DIG_H5 = 0xE5 BME280_REGISTER_DIG_H6 = 0xE6 BME280_REGISTER_DIG_H7 = 0xE7 BME280_REGISTER_CHIPID = 0xD0 BME280_REGISTER_VERSION = 0xD1 BME280_REGISTER_SOFTRESET = 0xE0 BME280_REGISTER_CONTROL_HUM = 0xF2 BME280_REGISTER_CONTROL = 0xF4 BME280_REGISTER_CONFIG = 0xF5 BME280_REGISTER_PRESSURE_DATA = 0xF7 BME280_REGISTER_TEMP_DATA = 0xFA BME280_REGISTER_HUMIDITY_DATA = 0xFD class Device: """Class for communicating with an I2C device. Allows reading and writing 8-bit, 16-bit, and byte array values to registers on the device.""" def __init__(self, address, i2c): """Create an instance of the I2C device at the specified address using the specified I2C interface object.""" self._address = address self._i2c = i2c def writeRaw8(self, value): """Write an 8-bit value on the bus (without register).""" value = value & 0xFF self._i2c.writeto(self._address, value) def write8(self, register, value): """Write an 8-bit value to the specified register.""" b = bytearray(1) b[0] = value & 0xFF self._i2c.writeto_mem(self._address, register, b) def write16(self, register, value): """Write a 16-bit value to the specified register.""" value = value & 0xFFFF b = bytearray(2) b[0] = value & 0xFF b[1] = (value >> 8) & 0xFF self._i2c.writeto_mem(self._address, register, value) def readRaw8(self): """Read an 8-bit value on the bus (without register).""" return int.from_bytes(self._i2c.readfrom(self._address, 1), 'little') & 0xFF def readU8(self, register): """Read an unsigned byte from the specified register.""" return int.from_bytes( self._i2c.readfrom_mem(self._address, register, 1), 'little') & 0xFF def readS8(self, register): """Read a signed byte from the specified register.""" result = self.readU8(register) if result > 127: result -= 256 return result def readU16(self, register, little_endian=True): result = int.from_bytes( self._i2c.readfrom_mem(self._address, register, 2), 'little') & 0xFFFF if not little_endian: result = ((result << 8) & 0xFF00) + (result >> 8) return result def readS16(self, register, little_endian=True): """Read a signed 16-bit value from the specified register, with the specified endianness (default little endian, or least significant byte first).""" result = self.readU16(register, little_endian) if result > 32767: result -= 65536 return result def readU16LE(self, register): """Read an unsigned 16-bit value from the specified register, in little endian byte order.""" return self.readU16(register, little_endian=True) def readU16BE(self, register): """Read an unsigned 16-bit value from the specified register, in big endian byte order.""" return self.readU16(register, little_endian=False) def readS16LE(self, register): """Read a signed 16-bit value from the specified register, in little endian byte order.""" return self.readS16(register, little_endian=True) def readS16BE(self, register): """Read a signed 16-bit value from the specified register, in big endian byte order.""" return self.readS16(register, little_endian=False) class BME280: def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None, **kwargs): # Check that mode is valid. if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4, BME280_OSAMPLE_8, BME280_OSAMPLE_16]: raise ValueError( 'Unexpected mode value {0}. Set mode to one of ' 'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or ' 'BME280_ULTRAHIGHRES'.format(mode)) self._mode = mode # Create I2C device. if i2c is None: raise ValueError('An I2C object is required.') self._device = Device(address, i2c) # Load calibration values. self._load_calibration() self._device.write8(BME280_REGISTER_CONTROL, 0x3F) self.t_fine = 0 def _load_calibration(self): self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1) self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2) self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3) self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1) self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2) self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3) self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4) self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5) self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6) self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7) self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8) self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9) self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1) self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2) self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3) self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7) h4 = self._device.readS8(BME280_REGISTER_DIG_H4) h4 = (h4 << 24) >> 20 self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F) h5 = self._device.readS8(BME280_REGISTER_DIG_H6) h5 = (h5 << 24) >> 20 self.dig_H5 = h5 | ( self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F) def read_raw_temp(self): """Reads the raw (uncompensated) temperature from the sensor.""" meas = self._mode self._device.write8(BME280_REGISTER_CONTROL_HUM, meas) meas = self._mode << 5 | self._mode << 2 | 1 self._device.write8(BME280_REGISTER_CONTROL, meas) sleep_time = 1250 + 2300 * (1 << self._mode) sleep_time = sleep_time + 2300 * (1 << self._mode) + 575 sleep_time = sleep_time + 2300 * (1 << self._mode) + 575 time.sleep_us(sleep_time) # Wait the required time msb = self._device.readU8(BME280_REGISTER_TEMP_DATA) lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1) xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2) raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4 return raw def read_raw_pressure(self): """Reads the raw (uncompensated) pressure level from the sensor.""" """Assumes that the temperature has already been read """ """i.e. that enough delay has been provided""" msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA) lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1) xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2) raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4 return raw def read_raw_humidity(self): """Assumes that the temperature has already been read """ """i.e. that enough delay has been provided""" msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA) lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1) raw = (msb << 8) | lsb return raw def read_temperature(self): """Get the compensated temperature in 0.01 of a degree celsius.""" adc = self.read_raw_temp() var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11) var2 = (( (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) * self.dig_T3) >> 14 self.t_fine = var1 + var2 return (self.t_fine * 5 + 128) >> 8 def read_pressure(self): """Gets the compensated pressure in Pascals.""" adc = self.read_raw_pressure() var1 = self.t_fine - 128000 var2 = var1 * var1 * self.dig_P6 var2 = var2 + ((var1 * self.dig_P5) << 17) var2 = var2 + (self.dig_P4 << 35) var1 = (((var1 * var1 * self.dig_P3) >> 8) + ((var1 * self.dig_P2) >> 12)) var1 = (((1 << 47) + var1) * self.dig_P1) >> 33 if var1 == 0: return 0 p = 1048576 - adc p = (((p << 31) - var2) * 3125) // var1 var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25 var2 = (self.dig_P8 * p) >> 19 return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4) def read_humidity(self): adc = self.read_raw_humidity() # print 'Raw humidity = {0:d}'.format (adc) h = self.t_fine - 76800 h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) + 16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h * self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) * self.dig_H2 + 8192) >> 14)) h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4) h = 0 if h < 0 else h h = 419430400 if h > 419430400 else h return h >> 12 @property def temperature(self): """Return the temperature in degrees.""" t = self.read_temperature() # ti = t // 100 # td = t - ti * 100 return t / 100 # return "{}.{:02d}C".format(ti, td) @property def pressure(self): "Return the temperature in hPa." p = self.read_pressure() // 256 # pi = p // 100 # pd = p - pi * 100 return p / 100 # return "{}.{:02d}hPa".format(pi, pd) @property def humidity(self): "Return the humidity in percent." h = self.read_humidity() # hi = h // 1024 # hd = h * 100 // 1024 - hi * 100 return h / 1024 # return "{}.{:02d}%".format(hi, hd) ``` ```python:mhz_19c.py from machine import UART class mhz19C: """ Co2センサーMHZ-19Cクラス """ # コンストラクタ def __init__(self): self.uart = UART(1, 9600, 8, None, 1, tx=21, rx=20) # センサーからのデータ取得 def getdata(self): cmd = bytearray([0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]) self.uart.write(cmd) if self.uart.any(): data = self.uart.read() ppm = int.from_bytes(data[2:4], "big") return ppm ``` ### 購読側のプログラム 購読側のプログラムです。Publisherで配信されるとデータを受信し、表示します。 ```golang:購読側のプログラム package main import ( "encoding/json" "fmt" "log" "os" "os/signal" mqtt "github.com/eclipse/paho.mqtt.golang" ) type Bme280 struct { Temp float64 `json:"temp"` Hum float64 `json:"hum"` Press float64 `json:"press"` } // SendData JSON用構造体 type RecvData struct { Co2 int `json:"co2"` Sensor Bme280 `json:"bme280"` } // --------------------------------------------------------------- func main() { var rData RecvData //var c Counter fmt.Fprintf(os.Stderr, "*** 開始 ***\n") msgCh := make(chan mqtt.Message) var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { msgCh <- msg } // MQTTブローカに接続 opts := mqtt.NewClientOptions().AddBroker("tcp://<Brokerのアドレス>:<ポート番号>") cc := mqtt.NewClient(opts) token := cc.Connect() if token.Wait() && token.Error() != nil { log.Fatalf("Mqtt error: %s", token.Error()) } // Topicを設定 subscribeToken := cc.Subscribe("test/sensor", 0, f) if subscribeToken.Wait() && subscribeToken.Error() != nil { log.Fatal(subscribeToken.Error()) } // OSから割込を検出 signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt) for { select { // メッセージ到着 case m := <-msgCh: // JSONデータをデコードして表示 json.Unmarshal(m.Payload(), &rData) fmt.Printf("Co2 : %d[ppm]\n", rData.Co2) fmt.Printf("Temp : %5.2f[℃]\n", rData.Sensor.Temp) fmt.Printf("Hum : %5.2f[per]\n", rData.Sensor.Hum) fmt.Printf("Press: %5.2f[hPa]\n", rData.Sensor.Press) // 終了割込処理 case <-signalCh: fmt.Printf("Interrupt detected.\n") cc.Disconnect(1000) return } } } ``` 下の図は実際に動作させて、センサの状態を購読している様子です。 ![センサ状態を購読](https://camo.elchika.com/1548e10448652a956ef1e5dcc94580926386fd95/687474703a2f2f73746f726167652e676f6f676c65617069732e636f6d2f656c6368696b612f76312f757365722f33646164353039642d616133622d343132302d616433622d6638623565376137366139332f32393439646665342d396537622d343337652d623730612d623434393833616538393535/) # 最後に 今回、ESP32CとCo2センサ、温度・湿度・気圧センサを使用してCo2濃度や温度などの部屋の状況を配信するものを製作してみました。MQTTを使えば簡単に配信したり、購読できるので応用の範囲が広がると思います。製作した記事が少しでも参考になれば良いかなぁと思っています。