
前回、定番の圧力センサ Bosch BMP280の補償計算をなんとかやっつけてMicroPythonで気圧と温度を測れるようになりました。今回は測った結果を、例によってMQTTに載せてNode-REDへ報告し、Dashboard上にグラフ表示してみます。まずは前回プログラムのモジュール化から。
※「MicroPython的午睡」投稿順 Indexはこちら
以下の実験は、ESP32を搭載した M5Stack製 M5ATOMLite に、ESP32 “generic” port のMicroPythonを書き込んだもので行っています。
Bosch BMP280インタフェース用のMicroPythonモジュール
前回作成したBMP280から温度と圧力を読み取るためのMicroPythonスクリプトを、モジュール化したもの全文を掲げます。__init__()関数内でキメウチのPinに対してI2Cを設定しています。ESP32は端子自由度が高いので、異なる端子にBMP280を接続する場合には端子番号のみ変更すれば動作するのではないかと思います(確かめておらんケド。)また、モジュールとしてインポートせずに、これを直接実行すると、前回同様のテスト読み取り動作を行い、標準出力に測定した温度や圧力を印字します。
import time
from machine import Pin, I2C
class LiteBMP280:
"""BMP280 on I2C bus
"""
adr = 0x77
ID = 0xD0
CONF = 0xF5
CTRL = 0xF4
STAT = 0xF3
REST = 0xE0
TMPX = 0xFC
TMPL = 0xFB
TMPM = 0xFA
PRSX = 0xF9
PRSL = 0xF8
PRSM = 0xF7
T1 = 0x88
T2 = 0x8A
T3 = 0x8C
P1 = 0x8E
P2 = 0x90
P3 = 0x92
P4 = 0x94
P5 = 0x96
P6 = 0x98
P7 = 0x9A
P8 = 0x9C
P9 = 0x9E
def __init__(self):
self.i2c = I2C(1, scl=Pin(21), sda=Pin(25), freq=100000)
self.TEMP = None
self.PRES = None
self.RAWT = None
self.RAWP = None
self.t_fine = None
self.dig_T1 = None
self.dig_T2 = None
self.dig_T3 = None
self.dig_P1 = None
self.dig_P2 = None
self.dig_P3 = None
self.dig_P4 = None
self.dig_P5 = None
self.dig_P6 = None
self.dig_P7 = None
self.dig_P8 = None
self.dig_P9 = None
def readParamsU(self, rAdr):
dig_b = self.i2c.readfrom_mem(LiteBMP280.adr, rAdr, 2)
tmp = dig_b[1]<<8 | dig_b[0]
return tmp
def readParamsS(self, rAdr):
tmp = self.readParamsU(rAdr)
if (tmp & 0x8000) != 0:
tmp = -((~tmp + 1) & 0xFFFF)
return tmp
#This function based on BOSCH reference code.
def bmp280_compensate_T(self, adc_T, opt=False):
var1 = (adc_T/16384.0 - self.dig_T1/1024.0) * self.dig_T2
A1=adc_T/131072.0
T1=self.dig_T1/8192.0
var2 = ((A1-T1)*(A1-T1))*self.dig_T3
self.t_fine = var1 + var2
if opt:
print("var1=",var1)
print("var2=",var2)
print("t_fine=",self.t_fine)
self.TEMP = (var1+var2)/5120.0
return self.TEMP
#Following test values from BOSCH reference code.
def test_bmp280_compensate_T(self):
self.dig_T1 = 27504
self.dig_T2 = 26435
self.dig_T3 = -1000
adc_T = 519888
print("TEST_T(Expected 25.08)=", self.bmp280_compensate_T(adc_T, opt=True))
#This function based on BOSCH reference code.
def bmp280_compensate_P(self, adc_P, opt=False):
var10 = (self.t_fine/2.0) - 64000.0
var20 = var10*var10*(self.dig_P6/32768.0)
var21 = var20+var10*(self.dig_P5)*2.0
var22 = (var21/4.0)+(self.dig_P4*65536.0)
var11 = (self.dig_P3*var10*var10/524288.0+self.dig_P2*var10)/524288.0
var12 = (1.0+var11/32768.0)*self.dig_P1
p0 = 1048576.0 - adc_P
p = (p0-(var22/4096.0))*6250.0/var12
var1 = self.dig_P9*p*p/2147483648.0
var2 = p*self.dig_P8/32768.0
if opt:
print("var10=", var10)
print("var11=", var11)
print("var12=", var12)
print("var1=", var1)
print("var20=", var20)
print("var21=", var21)
print("var22=", var22)
print("var2=", var2)
print("p0=", p0)
print("p=", p)
self.PRES = p + (var1 + var2 + self.dig_P7)/16.0
return self.PRES
#Following test values from BOSCH reference code.
def test_bmp280_compensate_P(self):
self.dig_P1 = 36477
self.dig_P2 = -10685
self.dig_P3 = 3024
self.dig_P4 = 2855
self.dig_P5 = 140
self.dig_P6 = -7
self.dig_P7 = 15500
self.dig_P8 = -14600
self.dig_P9 = 6000
adc_P = 415148
print("TEST_P(Expected 100653)=", self.bmp280_compensate_P(adc_P, opt=True))
def bmp280_getParamsT(self, opt=False):
self.dig_T1 = self.readParamsU(LiteBMP280.T1)
self.dig_T2 = self.readParamsS(LiteBMP280.T2)
self.dig_T3 = self.readParamsS(LiteBMP280.T3)
if opt:
print("DIG_T1:", self.dig_T1)
print("DIG_T2:", self.dig_T2)
print("DIG_T3:", self.dig_T3)
def bmp280_getParamsP(self, opt=False):
self.dig_P1 = self.readParamsU(LiteBMP280.P1)
self.dig_P2 = self.readParamsS(LiteBMP280.P2)
self.dig_P3 = self.readParamsS(LiteBMP280.P3)
self.dig_P4 = self.readParamsS(LiteBMP280.P4)
self.dig_P5 = self.readParamsS(LiteBMP280.P5)
self.dig_P6 = self.readParamsS(LiteBMP280.P6)
self.dig_P7 = self.readParamsS(LiteBMP280.P7)
self.dig_P8 = self.readParamsS(LiteBMP280.P8)
self.dig_P9 = self.readParamsS(LiteBMP280.P9)
if opt:
print("DIG_P1:", self.dig_P1)
print("DIG_P2:", self.dig_P2)
print("DIG_P3:", self.dig_P3)
print("DIG_P4:", self.dig_P4)
print("DIG_P5:", self.dig_P5)
print("DIG_P6:", self.dig_P6)
print("DIG_P7:", self.dig_P7)
print("DIG_P8:", self.dig_P8)
print("DIG_P9:", self.dig_P9)
def setup(self):
# test_bmp280_compensate_T()
# test_bmp280_compensate_P()
buf = str(self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.ID, 1), "utf-8")
print("BMP280 ID = 0x{0:02x}".format(ord(buf[0])))
self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CONF, b'\x40') # 125mS, no filter, no spi
self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CTRL, b'\x27') # x1, x1, normal mode
self.bmp280_getParamsT(False)
self.bmp280_getParamsP(False)
def measure(self):
prslis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.PRSM, 3)
tmplis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.TMPM, 3)
self.RAWP = (prslis[0]<<12) | (prslis[1] << 4)
self.RAWT = (tmplis[0]<<12) | (tmplis[1] << 4)
return [self.bmp280_compensate_T(self.RAWT), self.bmp280_compensate_P(self.RAWP)]
def main():
bmp280 = LiteBMP280()
bmp280.setup()
loopCounter = 0
while( loopCounter < 60 ):
loopCounter += 1
result = bmp280.measure()
print("TEMPERATURE: {0:3.1f} [C]".format(result[0]))
print("PRESSURE : {0:7.1f} [Pa]".format(result[1]))
time.sleep(10)
if __name__ == "__main__":
main()
Node-RED側の変更点
継ぎ足しで作成しているNode-RED側の変更は、MQTTからJSONオブジェクトを受信するトピック “ATOMLite/JSON” のフローに以下の変更を加えました。
-
- functionノードの出力先を+1して5方向にした
- 追加した出力先には受信したJSONオブジェクトの中で、”BMP280_P” というプロパティがあったら、その値をpayloadに載せて送り出すようにした。
- DHT11の温度表示用のダッシュボードのchartをBMP280の温度でも使うことにした。
- 温度をハンドルするためのfunctionノードの3番目の送り先にはDHT11とBMP280の2系列のデータを送り込む。
- 温度の第1系列は、以前から存在するDHT11の温度測定結果で、JSONオブジェクトの中で “TEMP” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”DHT11″を追加するようにした。
- 温度の第2系列は、今回追加のBMP280の温度センサの測定値。JSONオブジェクトの中で “BMP280_T” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”BMP280″を追加するようにした。
実際のフローはこんな感じ。
本体スクリプト
以下に本体スクリプト全文を示しますが、前述のBMP280用モジュールが、MicroPythonのモジュールサーチパス上に、LiteBMP280.py なる名前で存在する前提であります(過去記事で作製したLiteOnBoard.py、LiteNetwork.py、LiteDHT11.py なども同様です。)
機能的には、以前作成のDHT11で温度湿度を測る部分の後ろにBMP280の温度圧力測定を追加しただけです。Asyncio的には同じインターバルで測定することになります。ただし、外部ハードウエアは取ったり付けたり(今回はDHT11は接続してないです)することがあるので、接続したハードウエアのみ動作するように制御を追加しています。後日、スクリプトを修正しなくてもハード構成が反映できるようにするつもりですが、現状は、extDevControlという大域変数にハード構成を反映した数値を代入する方式です。
import sys
import ujson
import uasyncio
from LiteOnBoard import LiteOnBoard
from LiteNetwork import LiteNetwork
from LiteDHT11 import LiteDHT11
from LiteBMP280 import LiteBMP280
ledColor = (0, 0, 0)
OnBoardDevice = LiteOnBoard()
LiteNet = LiteNetwork()
DHT11 = None
BMP280 = None
DHT11_ON = 0x1
BMP280_ON = 0x2
extDevControl = BMP280_ON
def colorChange(msg):
global ledColor
if msg=="R":
ledColor = (255, 0, 0)
elif msg=="G":
ledColor = (0, 255, 0)
elif msg=="B":
ledColor = (0, 0, 255)
def callbackSub(topic, msg):
topicS = topic.decode('utf-8')
msgS = msg.decode('utf-8')
print(topicS, msgS)
if topicS=="ATOMLite/Color":
print("ATOMLite/Color Message: ", msgS)
colorChange(msgS)
if topicS=="ATOMLite/Settings":
msgJ = ujson.loads(msgS)
print("ATOMLite/Settings: ")
for item in msgJ:
for k,v in item.items():
if k=="key":
ky = v
if k=="value":
vl = v
print(ky, " = ", vl)
def makeOBJ(num):
global DHT11, BMP280
workDic = dict()
if (extDevControl & DHT11_ON) != 0:
tpl = DHT11.measure()
workDic['TEMP'] = tpl[0]
workDic['HUMI'] = tpl[1]
if (extDevControl & BMP280_ON) != 0:
tpl = BMP280.measure()
workDic['BMP280_T'] = tpl[0]
workDic['BMP280_P'] = tpl[1]
workDic['A'] = num
workDic['B'] = LiteNet.currentTIME()
return ujson.dumps(workDic)
async def reportStatus(period_s):
while True:
LiteNet.client.publish("ATOMLite/Status", "ATOMLite Running: " + LiteNet.currentTIME())
await uasyncio.sleep(period_s)
async def sendJson(period_s):
sendCounter = 0
while True:
jsonSTR = makeOBJ(sendCounter)
LiteNet.client.publish("ATOMLite/Json", jsonSTR)
sendCounter += 1
await uasyncio.sleep(period_s)
async def ledAndUserButton(period_s):
global OnBoardDevice
ledFlag = True
buttonFlag = False
while True:
if ledFlag:
OnBoardDevice.M5ATOMLiteLED(ledColor)
ledFlag = False
else:
OnBoardDevice.M5ATOMLiteLED( (0, 0, 0) )
ledFlag = True
if OnBoardDevice.M5ATOMLiteUserButton():
print("Button ON")
LiteNet.client.publish("ATOMLite/Button", "Button ON")
buttonFlag = True
elif buttonFlag:
buttonFlag = False
LiteNet.client.publish("ATOMLite/Button", "Button OFF")
await uasyncio.sleep(period_s)
async def mainLoop(period_ms):
global OnBoardDevice
taskStatus = uasyncio.create_task(reportStatus(60))
taskJson = uasyncio.create_task(sendJson(60))
taskATOMhard = uasyncio.create_task(ledAndUserButton(1))
while( OnBoardDevice.buttonCounter < 6 ):
LiteNet.client.check_msg()
await uasyncio.sleep_ms(period_ms)
try:
taskStatus.cancel()
except:
print("Exception at taskStatus:", sys.exc_info()[0])
try:
taskJson.cancel()
except:
print("Exception at taskJson:", sys.exc_info()[0])
try:
taskATOMhard.cancel()
except:
print("Exception at taskATOMhard:", sys.exc_info()[0])
LiteNet.client.publish("ATOMLite/Button", "N/A")
LiteNet.client.publish("ATOMLite/Status", "ATOMLite: out of service.")
LiteNet.close()
print("disconnect.")
def extInit():
global DHT11, BMP280
if (extDevControl & DHT11_ON) != 0:
DHT11 = LiteDHT11()
if (extDevControl & BMP280_ON) != 0:
BMP280 = LiteBMP280()
BMP280.setup()
def main():
if not LiteNet.do_connect():
print("ERROR: WiFi connection.")
sys.exit(1)
LiteNet.initRTC()
if not LiteNet.connectMqttBroker(callbackSub):
print("ERROR: MQTT Broker connection.")
sys.exit(1)
extInit()
uasyncio.run(mainLoop(1000))
if __name__ == "__main__":
main()
実行結果
ホスト(Raspberry Pi 3 model B+)上で実行されているMQTTブローカ経由でNode-REDへ報告されたBMP280の気圧と温度の測定結果をchart化したものをPC上のブラウザで見たところが以下です。
とりあえずそれらしく動いているみたい。今回はこんなところね。

