前回、定番の圧力センサ Bosch BMP280の補償計算をなんとかやっつけてMicroPythonで気圧と温度を測れるようになりました。今回は測った結果を、例によってMQTTに載せてNode-REDへ報告し、Dashboard上にグラフ表示してみます。まずは前回プログラムのモジュール化から。
※「MicroPython的午睡」投稿順 Indexはこちら
以下の実験は、ESP32を搭載した M5Stack製 M5ATOMLite に、ESP32 “generic” port のMicroPythonを書き込んだもので行っています。
Bosch BMP280インタフェース用のMicroPythonモジュール
前回作成したBMP280から温度と圧力を読み取るためのMicroPythonスクリプトを、モジュール化したもの全文を掲げます。__init__()関数内でキメウチのPinに対してI2Cを設定しています。ESP32は端子自由度が高いので、異なる端子にBMP280を接続する場合には端子番号のみ変更すれば動作するのではないかと思います(確かめておらんケド。)また、モジュールとしてインポートせずに、これを直接実行すると、前回同様のテスト読み取り動作を行い、標準出力に測定した温度や圧力を印字します。
import time from machine import Pin, I2C class LiteBMP280: """BMP280 on I2C bus """ adr = 0x77 ID = 0xD0 CONF = 0xF5 CTRL = 0xF4 STAT = 0xF3 REST = 0xE0 TMPX = 0xFC TMPL = 0xFB TMPM = 0xFA PRSX = 0xF9 PRSL = 0xF8 PRSM = 0xF7 T1 = 0x88 T2 = 0x8A T3 = 0x8C P1 = 0x8E P2 = 0x90 P3 = 0x92 P4 = 0x94 P5 = 0x96 P6 = 0x98 P7 = 0x9A P8 = 0x9C P9 = 0x9E def __init__(self): self.i2c = I2C(1, scl=Pin(21), sda=Pin(25), freq=100000) self.TEMP = None self.PRES = None self.RAWT = None self.RAWP = None self.t_fine = None self.dig_T1 = None self.dig_T2 = None self.dig_T3 = None self.dig_P1 = None self.dig_P2 = None self.dig_P3 = None self.dig_P4 = None self.dig_P5 = None self.dig_P6 = None self.dig_P7 = None self.dig_P8 = None self.dig_P9 = None def readParamsU(self, rAdr): dig_b = self.i2c.readfrom_mem(LiteBMP280.adr, rAdr, 2) tmp = dig_b[1]<<8 | dig_b[0] return tmp def readParamsS(self, rAdr): tmp = self.readParamsU(rAdr) if (tmp & 0x8000) != 0: tmp = -((~tmp + 1) & 0xFFFF) return tmp #This function based on BOSCH reference code. def bmp280_compensate_T(self, adc_T, opt=False): var1 = (adc_T/16384.0 - self.dig_T1/1024.0) * self.dig_T2 A1=adc_T/131072.0 T1=self.dig_T1/8192.0 var2 = ((A1-T1)*(A1-T1))*self.dig_T3 self.t_fine = var1 + var2 if opt: print("var1=",var1) print("var2=",var2) print("t_fine=",self.t_fine) self.TEMP = (var1+var2)/5120.0 return self.TEMP #Following test values from BOSCH reference code. def test_bmp280_compensate_T(self): self.dig_T1 = 27504 self.dig_T2 = 26435 self.dig_T3 = -1000 adc_T = 519888 print("TEST_T(Expected 25.08)=", self.bmp280_compensate_T(adc_T, opt=True)) #This function based on BOSCH reference code. def bmp280_compensate_P(self, adc_P, opt=False): var10 = (self.t_fine/2.0) - 64000.0 var20 = var10*var10*(self.dig_P6/32768.0) var21 = var20+var10*(self.dig_P5)*2.0 var22 = (var21/4.0)+(self.dig_P4*65536.0) var11 = (self.dig_P3*var10*var10/524288.0+self.dig_P2*var10)/524288.0 var12 = (1.0+var11/32768.0)*self.dig_P1 p0 = 1048576.0 - adc_P p = (p0-(var22/4096.0))*6250.0/var12 var1 = self.dig_P9*p*p/2147483648.0 var2 = p*self.dig_P8/32768.0 if opt: print("var10=", var10) print("var11=", var11) print("var12=", var12) print("var1=", var1) print("var20=", var20) print("var21=", var21) print("var22=", var22) print("var2=", var2) print("p0=", p0) print("p=", p) self.PRES = p + (var1 + var2 + self.dig_P7)/16.0 return self.PRES #Following test values from BOSCH reference code. def test_bmp280_compensate_P(self): self.dig_P1 = 36477 self.dig_P2 = -10685 self.dig_P3 = 3024 self.dig_P4 = 2855 self.dig_P5 = 140 self.dig_P6 = -7 self.dig_P7 = 15500 self.dig_P8 = -14600 self.dig_P9 = 6000 adc_P = 415148 print("TEST_P(Expected 100653)=", self.bmp280_compensate_P(adc_P, opt=True)) def bmp280_getParamsT(self, opt=False): self.dig_T1 = self.readParamsU(LiteBMP280.T1) self.dig_T2 = self.readParamsS(LiteBMP280.T2) self.dig_T3 = self.readParamsS(LiteBMP280.T3) if opt: print("DIG_T1:", self.dig_T1) print("DIG_T2:", self.dig_T2) print("DIG_T3:", self.dig_T3) def bmp280_getParamsP(self, opt=False): self.dig_P1 = self.readParamsU(LiteBMP280.P1) self.dig_P2 = self.readParamsS(LiteBMP280.P2) self.dig_P3 = self.readParamsS(LiteBMP280.P3) self.dig_P4 = self.readParamsS(LiteBMP280.P4) self.dig_P5 = self.readParamsS(LiteBMP280.P5) self.dig_P6 = self.readParamsS(LiteBMP280.P6) self.dig_P7 = self.readParamsS(LiteBMP280.P7) self.dig_P8 = self.readParamsS(LiteBMP280.P8) self.dig_P9 = self.readParamsS(LiteBMP280.P9) if opt: print("DIG_P1:", self.dig_P1) print("DIG_P2:", self.dig_P2) print("DIG_P3:", self.dig_P3) print("DIG_P4:", self.dig_P4) print("DIG_P5:", self.dig_P5) print("DIG_P6:", self.dig_P6) print("DIG_P7:", self.dig_P7) print("DIG_P8:", self.dig_P8) print("DIG_P9:", self.dig_P9) def setup(self): # test_bmp280_compensate_T() # test_bmp280_compensate_P() buf = str(self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.ID, 1), "utf-8") print("BMP280 ID = 0x{0:02x}".format(ord(buf[0]))) self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CONF, b'\x40') # 125mS, no filter, no spi self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CTRL, b'\x27') # x1, x1, normal mode self.bmp280_getParamsT(False) self.bmp280_getParamsP(False) def measure(self): prslis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.PRSM, 3) tmplis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.TMPM, 3) self.RAWP = (prslis[0]<<12) | (prslis[1] << 4) self.RAWT = (tmplis[0]<<12) | (tmplis[1] << 4) return [self.bmp280_compensate_T(self.RAWT), self.bmp280_compensate_P(self.RAWP)] def main(): bmp280 = LiteBMP280() bmp280.setup() loopCounter = 0 while( loopCounter < 60 ): loopCounter += 1 result = bmp280.measure() print("TEMPERATURE: {0:3.1f} [C]".format(result[0])) print("PRESSURE : {0:7.1f} [Pa]".format(result[1])) time.sleep(10) if __name__ == "__main__": main()
Node-RED側の変更点
継ぎ足しで作成しているNode-RED側の変更は、MQTTからJSONオブジェクトを受信するトピック “ATOMLite/JSON” のフローに以下の変更を加えました。
-
- functionノードの出力先を+1して5方向にした
- 追加した出力先には受信したJSONオブジェクトの中で、”BMP280_P” というプロパティがあったら、その値をpayloadに載せて送り出すようにした。
- DHT11の温度表示用のダッシュボードのchartをBMP280の温度でも使うことにした。
- 温度をハンドルするためのfunctionノードの3番目の送り先にはDHT11とBMP280の2系列のデータを送り込む。
- 温度の第1系列は、以前から存在するDHT11の温度測定結果で、JSONオブジェクトの中で “TEMP” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”DHT11″を追加するようにした。
- 温度の第2系列は、今回追加のBMP280の温度センサの測定値。JSONオブジェクトの中で “BMP280_T” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”BMP280″を追加するようにした。
実際のフローはこんな感じ。
本体スクリプト
以下に本体スクリプト全文を示しますが、前述のBMP280用モジュールが、MicroPythonのモジュールサーチパス上に、LiteBMP280.py なる名前で存在する前提であります(過去記事で作製したLiteOnBoard.py、LiteNetwork.py、LiteDHT11.py なども同様です。)
機能的には、以前作成のDHT11で温度湿度を測る部分の後ろにBMP280の温度圧力測定を追加しただけです。Asyncio的には同じインターバルで測定することになります。ただし、外部ハードウエアは取ったり付けたり(今回はDHT11は接続してないです)することがあるので、接続したハードウエアのみ動作するように制御を追加しています。後日、スクリプトを修正しなくてもハード構成が反映できるようにするつもりですが、現状は、extDevControlという大域変数にハード構成を反映した数値を代入する方式です。
import sys import ujson import uasyncio from LiteOnBoard import LiteOnBoard from LiteNetwork import LiteNetwork from LiteDHT11 import LiteDHT11 from LiteBMP280 import LiteBMP280 ledColor = (0, 0, 0) OnBoardDevice = LiteOnBoard() LiteNet = LiteNetwork() DHT11 = None BMP280 = None DHT11_ON = 0x1 BMP280_ON = 0x2 extDevControl = BMP280_ON def colorChange(msg): global ledColor if msg=="R": ledColor = (255, 0, 0) elif msg=="G": ledColor = (0, 255, 0) elif msg=="B": ledColor = (0, 0, 255) def callbackSub(topic, msg): topicS = topic.decode('utf-8') msgS = msg.decode('utf-8') print(topicS, msgS) if topicS=="ATOMLite/Color": print("ATOMLite/Color Message: ", msgS) colorChange(msgS) if topicS=="ATOMLite/Settings": msgJ = ujson.loads(msgS) print("ATOMLite/Settings: ") for item in msgJ: for k,v in item.items(): if k=="key": ky = v if k=="value": vl = v print(ky, " = ", vl) def makeOBJ(num): global DHT11, BMP280 workDic = dict() if (extDevControl & DHT11_ON) != 0: tpl = DHT11.measure() workDic['TEMP'] = tpl[0] workDic['HUMI'] = tpl[1] if (extDevControl & BMP280_ON) != 0: tpl = BMP280.measure() workDic['BMP280_T'] = tpl[0] workDic['BMP280_P'] = tpl[1] workDic['A'] = num workDic['B'] = LiteNet.currentTIME() return ujson.dumps(workDic) async def reportStatus(period_s): while True: LiteNet.client.publish("ATOMLite/Status", "ATOMLite Running: " + LiteNet.currentTIME()) await uasyncio.sleep(period_s) async def sendJson(period_s): sendCounter = 0 while True: jsonSTR = makeOBJ(sendCounter) LiteNet.client.publish("ATOMLite/Json", jsonSTR) sendCounter += 1 await uasyncio.sleep(period_s) async def ledAndUserButton(period_s): global OnBoardDevice ledFlag = True buttonFlag = False while True: if ledFlag: OnBoardDevice.M5ATOMLiteLED(ledColor) ledFlag = False else: OnBoardDevice.M5ATOMLiteLED( (0, 0, 0) ) ledFlag = True if OnBoardDevice.M5ATOMLiteUserButton(): print("Button ON") LiteNet.client.publish("ATOMLite/Button", "Button ON") buttonFlag = True elif buttonFlag: buttonFlag = False LiteNet.client.publish("ATOMLite/Button", "Button OFF") await uasyncio.sleep(period_s) async def mainLoop(period_ms): global OnBoardDevice taskStatus = uasyncio.create_task(reportStatus(60)) taskJson = uasyncio.create_task(sendJson(60)) taskATOMhard = uasyncio.create_task(ledAndUserButton(1)) while( OnBoardDevice.buttonCounter < 6 ): LiteNet.client.check_msg() await uasyncio.sleep_ms(period_ms) try: taskStatus.cancel() except: print("Exception at taskStatus:", sys.exc_info()[0]) try: taskJson.cancel() except: print("Exception at taskJson:", sys.exc_info()[0]) try: taskATOMhard.cancel() except: print("Exception at taskATOMhard:", sys.exc_info()[0]) LiteNet.client.publish("ATOMLite/Button", "N/A") LiteNet.client.publish("ATOMLite/Status", "ATOMLite: out of service.") LiteNet.close() print("disconnect.") def extInit(): global DHT11, BMP280 if (extDevControl & DHT11_ON) != 0: DHT11 = LiteDHT11() if (extDevControl & BMP280_ON) != 0: BMP280 = LiteBMP280() BMP280.setup() def main(): if not LiteNet.do_connect(): print("ERROR: WiFi connection.") sys.exit(1) LiteNet.initRTC() if not LiteNet.connectMqttBroker(callbackSub): print("ERROR: MQTT Broker connection.") sys.exit(1) extInit() uasyncio.run(mainLoop(1000)) if __name__ == "__main__": main()
実行結果
ホスト(Raspberry Pi 3 model B+)上で実行されているMQTTブローカ経由でNode-REDへ報告されたBMP280の気圧と温度の測定結果をchart化したものをPC上のブラウザで見たところが以下です。
とりあえずそれらしく動いているみたい。今回はこんなところね。