やりたいことは簡単なのです
- センサーの値を読みたい
- それを、USB経由で出力したい
- センサーは、UARTとI2C
・・・まあ、今時どんなマイコンでもできそうなのですが。
敢えてのRaspberry Pi picoで!。
「純正でUSB積んでコンパイル不要で550円」てのは、魅力的だなと。
というわけで、習作がてら、がんばってみましたよと。
Co2センサーの選定
以下センサーを準備。
- 赤外線式 MH-Z19B
- マイクロホットプレート式 CCS811
- マイクロヒーター式 VME680
事前にRaspberry Pi (picoじゃない)に接続し、相関を取ったりして。
「赤外線式」なCo2センサーが良さげなので、コイツに決めました。
他のはみょーに、リニアじゃないような・・・。
まあこのへんの経緯は、以下記事そのまんまだったので。
以下記事を参照のこと。
https://pc.watch.impress.co.jp/docs/column/ali/1298772.html
ついでに、温度湿度気圧センサも、BME280を選定しました、とさ。
Raspberry Pi picoの準備
まあ普通に入手、開発環境は「Tera Term」のみで!。
このへんの経緯は、別記事にまとめてありますよ、と。
https://elchika.com/article/d5c0dd9a-8b8a-4bf5-bca0-2181b0158a6d/
I2CとUART接続
まあ、プログラムと波形をいったりきたりして、なんとか折り合いをつけました。
いろんなピンに同じ名称がついてたり(多分「使える」って意味だけど)、USBなシリアル入出力とUARTがバッティングしているかどうかいまいちだったりで、まあ左上の0、1ピンをI2Cに、6、7ピンをUARTに使う感じで納まりました。
I2Cに使う場合、インターナルプルアップが効いているような動作しておりました(が、不安なのでプルアップ外付けでつけましたとさ)。
Raspberry Pi pico側ソフト
まあ、microPythonで!。
main.py
import utime
import machine
led = machine.Pin(25, machine.Pin.OUT)
conversion_factor = 3.3 / (65535)
i2c_bus_number = 0
i2c_address = 0x76
i2c_sda=machine.Pin(0)
i2c_scl=machine.Pin(1)
i2c=machine.I2C(i2c_bus_number,sda=i2c_sda, scl=i2c_scl, freq=100000)
digT = []
digP = []
digH = []
t_fine = 0.0
def writeReg(reg,data):
i2c.writeto_mem(i2c_address, reg, data)
def get_calib_param():
calib = []
for i in range (0x88,0x88+24):
readdata = i2c.readfrom_mem(i2c_address,i,1)
calib.append(int.from_bytes(readdata,'big'))
readdata = i2c.readfrom_mem(i2c_address,0xA1,1)
calib.append(int.from_bytes(readdata,'big'))
for i in range (0xE1,0xE1+7):
readdata = i2c.readfrom_mem(i2c_address,i,1)
calib.append(int.from_bytes(readdata,'big'))
teststr = calib[1] << 8
teststr = calib[1] | calib[0]
digT.append((calib[1] << 8) | calib[0])
digT.append((calib[3] << 8) | calib[2])
digT.append((calib[5] << 8) | calib[4])
digP.append((calib[7] << 8) | calib[6])
digP.append((calib[9] << 8) | calib[8])
digP.append((calib[11]<< 8) | calib[10])
digP.append((calib[13]<< 8) | calib[12])
digP.append((calib[15]<< 8) | calib[14])
digP.append((calib[17]<< 8) | calib[16])
digP.append((calib[19]<< 8) | calib[18])
digP.append((calib[21]<< 8) | calib[20])
digP.append((calib[23]<< 8) | calib[22])
digH.append( calib[24] )
digH.append((calib[26]<< 8) | calib[25])
digH.append( calib[27] )
digH.append((calib[28]<< 4) | (0x0F & calib[29]))
digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
digH.append( calib[31] )
for i in range(1,2):
if digT[i] & 0x8000:
digT[i] = (-digT[i] ^ 0xFFFF) + 1
for i in range(1,8):
if digP[i] & 0x8000:
digP[i] = (-digP[i] ^ 0xFFFF) + 1
for i in range(0,6):
if digH[i] & 0x8000:
digH[i] = (-digH[i] ^ 0xFFFF) + 1
def readData():
data = []
for i in range (0xF7, 0xF7+8):
readdata = i2c.readfrom_mem(i2c_address,i,1)
data.append(int.from_bytes(readdata,'big'))
pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
hum_raw = (data[6] << 8) | data[7]
compensate_T(temp_raw)
compensate_P(pres_raw)
compensate_H(hum_raw)
def compensate_P(adc_P):
global t_fine
pressure = 0.0
v1 = (t_fine / 2.0) - 64000.0
v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
v2 = v2 + ((v1 * digP[4]) * 2.0)
v2 = (v2 / 4.0) + (digP[3] * 65536.0)
v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0)) / 262144
v1 = ((32768 + v1) * digP[0]) / 32768
if v1 == 0:
return 0
pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
if pressure < 0x80000000:
pressure = (pressure * 2.0) / v1
else:
pressure = (pressure / v1) * 2
v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
v2 = ((pressure / 4.0) * digP[7]) / 8192.0
pressure = (pressure + ((v1 + v2 + digP[6]) / 16.0)) / 100
pressStr = "pressure: {:7.2f} hPa".format(pressure)
print (pressStr)
def compensate_T(adc_T):
global t_fine
v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
t_fine = v1 + v2
temperature = t_fine / 5120.0
tempStr = "temperature: {:5.2f} c".format(temperature)
print (tempStr)
def compensate_H(adc_H):
global t_fine
var_h = t_fine - 76800.0
if var_h != 0:
var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
else:
return 0
var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
humStr = "hum: {:6.2f} %".format(var_h)
print (humStr)
def readCo2():
uart=machine.UART(1,baudrate=9600)
b=bytearray([0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79])
uart.write(b)
utime.sleep(0.1)
n=uart.read(8)
f=n[2]*256+n[3]
co2Str = "co2: {} ppm".format(str(f))
print (co2Str)
utime.sleep(0.5)
uart = None
def setup():
osrs_t = 1 #Temperature oversampling x 1
osrs_p = 1 #Pressure oversampling x 1
osrs_h = 1 #Humidity oversampling x 1
mode = 3 #Normal mode
t_sb = 5 #Tstandby 1000ms
filter = 0 #Filter off
spi3w_en = 0 #3-wire SPI Disable
ctrl_meas_reg = bytearray(1)
config_reg = bytearray(1)
ctrl_hum_reg = bytearray(1)
ctrl_meas_reg[0] = (osrs_t << 5) | (osrs_p << 2) | mode
config_reg[0] = (t_sb << 5) | (filter << 2) | spi3w_en
ctrl_hum_reg[0] = osrs_h
writeReg(0xF2,ctrl_hum_reg)
writeReg(0xF4,ctrl_meas_reg)
writeReg(0xF5,config_reg)
setup()
get_calib_param()
if __name__ == '__main__':
for i in range(40):
led.toggle()
utime.sleep(0.15)
while True:
led.toggle()
readCo2()
readData()
utime.sleep(10)
BME280周りは、以下ソースを改造。
https://github.com/SWITCHSCIENCE/BME280/blob/master/Python27/bme280_sample.py
ハマりポイントとしては、smbusの「read_byte_data」は、戻り値が「int型」なんだけど、machine.i2cの「from_bytes」は、戻り値が「bytes型」なこと。
pythonのbytes型は、読み込み専用で演算に使えないので、int変換が要りますよ、と。
動かすと、USBシリアルから、10秒ごとにデータがつらつらと出てきます。
co2: 512 ppm
temperature: 18.23 c
hum: 42.15 %
pressure: 1023.74 hPa
受け側
まあオマケ要素ですが。
どうせならサイネージに組み込もうかと。
というわけで、受け側。
会社で使ってるサイネージシステムは、raspbianで。
かつAPIサーバにデータ送る形なので。
受け渡し用のスクリプトを仕込みました。
send.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import urllib
import urllib2
import serial
import time
import sys
import os
import re
url = 'https://okuru.url'
def main():
if not (os.path.exists('/dev/ttyACM0')):
sys.exit()
dataco2 = 0
datatemp = 0
datahum = 0
datapres = 0
con=serial.Serial('/dev/ttyACM0', 115200)
while True:
measdata=con.readline()
if 'co2:' in measdata:
result = re.match('co2: (\d+) ppm', measdata)
if result != None:
dataco2 = result.group(1)
if 'temperature:' in measdata:
result = re.match('temperature: (.+) c', measdata)
if result != None:
datatemp = result.group(1)
if 'hum:' in measdata:
result = re.match('hum: (.+) %', measdata)
if result != None:
datahum = result.group(1)
if 'pressure:' in measdata:
result = re.match('pressure: (.+) hPa', measdata)
if result != None:
datapres = result.group(1)
hostname = os.uname()[1]
if len(str(dataco2)) > 2 and len(str(datatemp)) > 2 and len(str(datahum)) > 2 and len(str(datapres)) > 2:
params = {
'hostname' : hostname,
'dataco2' : dataco2,
'datatemp' : datatemp,
'datahum' : datahum,
'datapres' : datapres
}
params = urllib.urlencode(params)
req = urllib2.Request(url)
req.add_data(params)
res = urllib2.urlopen(req)
dataco2 = 0
datatemp = 0
datahum = 0
datapres = 0
sys.exit()
if __name__ == '__main__':
main()
ま、パラメータにデータくっつけて送ってるだけの単発ものです。
これをsystemdに設定して、タイマー動作、と。
/etc/systemd/system/send.service
[Unit] Description=send [Service] Type=simple ExecStart=/opt/send.py [Install] WantedBy=multi-user.target
/etc/systemd/system/send.timer
[Unit] Description=Runs send [Timer] OnBootSec=2min OnUnitActiveSec=1min Unit=send.service [Install] WantedBy=multi-user.target
あとはAPIサーバ側で適当に画面作って・・・。
ま、こんな感じに!。
基板起こし
とはいってもユニバーサルでね。
秋月C基板を使いましたとさ。
マウンタ作成
適当に作画。
いまいち寸法が合わないので、調整可能な穴具合にしてあります。
接続と設置
まあ、無事完成!。
相手がラズパイなら、まあラズパイ側のUART I2Cを使えばいいのですが、別のサイネージ用STBにも使いたいなってね。
無事会社に置いて、ソーシャルディスタンスだのを意識させる系のサイネージとして、活躍しております。
すてきすてきー!。
投稿者の人気記事
-
eucaly
さんが
2021/02/28
に
編集
をしました。
(メッセージ: 初版)
ログインしてコメントを投稿する