MicroPython的午睡(52) ATOMLite、気圧の測定結果をNode-REDへ

Joseph Halfmoon

前回、定番の圧力センサ Bosch BMP280の補償計算をなんとかやっつけてMicroPythonで気圧と温度を測れるようになりました。今回は測った結果を、例によってMQTTに載せてNode-REDへ報告し、Dashboard上にグラフ表示してみます。まずは前回プログラムのモジュール化から。

※「MicroPython的午睡」投稿順 Indexはこちら

以下の実験は、ESP32を搭載した M5Stack製 M5ATOMLite に、ESP32 “generic” port のMicroPythonを書き込んだもので行っています。

Bosch BMP280インタフェース用のMicroPythonモジュール

前回作成したBMP280から温度と圧力を読み取るためのMicroPythonスクリプトを、モジュール化したもの全文を掲げます。__init__()関数内でキメウチのPinに対してI2Cを設定しています。ESP32は端子自由度が高いので、異なる端子にBMP280を接続する場合には端子番号のみ変更すれば動作するのではないかと思います(確かめておらんケド。)また、モジュールとしてインポートせずに、これを直接実行すると、前回同様のテスト読み取り動作を行い、標準出力に測定した温度や圧力を印字します。

import time
from machine import Pin, I2C

class LiteBMP280:
    """BMP280 on I2C bus

    """
    adr = 0x77
    ID = 0xD0
    CONF = 0xF5
    CTRL = 0xF4
    STAT = 0xF3
    REST = 0xE0
    TMPX = 0xFC
    TMPL = 0xFB
    TMPM = 0xFA
    PRSX = 0xF9
    PRSL = 0xF8
    PRSM = 0xF7
    T1 = 0x88
    T2 = 0x8A
    T3 = 0x8C
    P1 = 0x8E
    P2 = 0x90
    P3 = 0x92
    P4 = 0x94
    P5 = 0x96
    P6 = 0x98
    P7 = 0x9A
    P8 = 0x9C
    P9 = 0x9E

    def __init__(self):
        self.i2c = I2C(1, scl=Pin(21), sda=Pin(25), freq=100000)
        self.TEMP = None
        self.PRES = None
        self.RAWT = None
        self.RAWP = None
        self.t_fine = None
        self.dig_T1 = None
        self.dig_T2 = None
        self.dig_T3 = None
        self.dig_P1 = None
        self.dig_P2 = None
        self.dig_P3 = None
        self.dig_P4 = None
        self.dig_P5 = None
        self.dig_P6 = None
        self.dig_P7 = None
        self.dig_P8 = None
        self.dig_P9 = None
    
    def readParamsU(self, rAdr):
        dig_b = self.i2c.readfrom_mem(LiteBMP280.adr, rAdr, 2)
        tmp = dig_b[1]<<8 | dig_b[0]  
        return tmp

    def readParamsS(self, rAdr):
        tmp = self.readParamsU(rAdr)  
        if (tmp & 0x8000) != 0:
            tmp = -((~tmp + 1) & 0xFFFF)
        return tmp

    #This function based on BOSCH reference code.
    def bmp280_compensate_T(self, adc_T, opt=False):
        var1 = (adc_T/16384.0 - self.dig_T1/1024.0) * self.dig_T2
        A1=adc_T/131072.0
        T1=self.dig_T1/8192.0
        var2 = ((A1-T1)*(A1-T1))*self.dig_T3
        self.t_fine = var1 + var2
        if opt:
            print("var1=",var1)
            print("var2=",var2)
            print("t_fine=",self.t_fine)
        self.TEMP = (var1+var2)/5120.0
        return self.TEMP

    #Following test values from BOSCH reference code.
    def test_bmp280_compensate_T(self):
        self.dig_T1 = 27504
        self.dig_T2 = 26435
        self.dig_T3 = -1000
        adc_T  = 519888
        print("TEST_T(Expected 25.08)=", self.bmp280_compensate_T(adc_T, opt=True))

    #This function based on BOSCH reference code.
    def bmp280_compensate_P(self, adc_P, opt=False):
        var10 = (self.t_fine/2.0) - 64000.0
        var20 = var10*var10*(self.dig_P6/32768.0)
        var21 = var20+var10*(self.dig_P5)*2.0
        var22 = (var21/4.0)+(self.dig_P4*65536.0)
        var11 = (self.dig_P3*var10*var10/524288.0+self.dig_P2*var10)/524288.0
        var12 = (1.0+var11/32768.0)*self.dig_P1
        p0 = 1048576.0 - adc_P
        p = (p0-(var22/4096.0))*6250.0/var12
        var1 = self.dig_P9*p*p/2147483648.0
        var2 = p*self.dig_P8/32768.0
        if opt:
            print("var10=", var10)
            print("var11=", var11)
            print("var12=", var12)
            print("var1=", var1)
            print("var20=", var20)
            print("var21=", var21)
            print("var22=", var22)
            print("var2=", var2)
            print("p0=", p0)
            print("p=", p)
        self.PRES = p + (var1 + var2 + self.dig_P7)/16.0
        return self.PRES

    #Following test values from BOSCH reference code.
    def test_bmp280_compensate_P(self):
        self.dig_P1 = 36477
        self.dig_P2 = -10685
        self.dig_P3 = 3024
        self.dig_P4 = 2855
        self.dig_P5 = 140
        self.dig_P6 = -7
        self.dig_P7 = 15500
        self.dig_P8 = -14600
        self.dig_P9 = 6000
        adc_P = 415148
        print("TEST_P(Expected 100653)=", self.bmp280_compensate_P(adc_P, opt=True))

    def bmp280_getParamsT(self, opt=False):
        self.dig_T1 = self.readParamsU(LiteBMP280.T1)
        self.dig_T2 = self.readParamsS(LiteBMP280.T2)
        self.dig_T3 = self.readParamsS(LiteBMP280.T3)
        if opt:
            print("DIG_T1:", self.dig_T1)
            print("DIG_T2:", self.dig_T2)
            print("DIG_T3:", self.dig_T3)
    
    def bmp280_getParamsP(self, opt=False):
        self.dig_P1 = self.readParamsU(LiteBMP280.P1)
        self.dig_P2 = self.readParamsS(LiteBMP280.P2)
        self.dig_P3 = self.readParamsS(LiteBMP280.P3)
        self.dig_P4 = self.readParamsS(LiteBMP280.P4)
        self.dig_P5 = self.readParamsS(LiteBMP280.P5)
        self.dig_P6 = self.readParamsS(LiteBMP280.P6)
        self.dig_P7 = self.readParamsS(LiteBMP280.P7)
        self.dig_P8 = self.readParamsS(LiteBMP280.P8)
        self.dig_P9 = self.readParamsS(LiteBMP280.P9)
        if opt:
            print("DIG_P1:", self.dig_P1)
            print("DIG_P2:", self.dig_P2)
            print("DIG_P3:", self.dig_P3)
            print("DIG_P4:", self.dig_P4)
            print("DIG_P5:", self.dig_P5)
            print("DIG_P6:", self.dig_P6)
            print("DIG_P7:", self.dig_P7)
            print("DIG_P8:", self.dig_P8)
            print("DIG_P9:", self.dig_P9)

    def setup(self):
        #    test_bmp280_compensate_T()
        #    test_bmp280_compensate_P()
        buf = str(self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.ID, 1), "utf-8")
        print("BMP280 ID = 0x{0:02x}".format(ord(buf[0])))
        self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CONF, b'\x40') # 125mS, no filter, no spi
        self.i2c.writeto_mem(LiteBMP280.adr, LiteBMP280.CTRL, b'\x27') # x1, x1, normal mode
        self.bmp280_getParamsT(False)
        self.bmp280_getParamsP(False)
    
    def measure(self):
        prslis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.PRSM, 3)
        tmplis = self.i2c.readfrom_mem(LiteBMP280.adr, LiteBMP280.TMPM, 3)
        self.RAWP = (prslis[0]<<12) | (prslis[1] << 4)
        self.RAWT = (tmplis[0]<<12) | (tmplis[1] << 4)
        return [self.bmp280_compensate_T(self.RAWT), self.bmp280_compensate_P(self.RAWP)]  
        
def main():
    bmp280 = LiteBMP280()
    bmp280.setup()
    loopCounter = 0
    while( loopCounter < 60 ):
        loopCounter += 1
        result = bmp280.measure()
        print("TEMPERATURE: {0:3.1f} [C]".format(result[0]))
        print("PRESSURE   : {0:7.1f} [Pa]".format(result[1]))        
        time.sleep(10)

if __name__ == "__main__":
    main()
Node-RED側の変更点

継ぎ足しで作成しているNode-RED側の変更は、MQTTからJSONオブジェクトを受信するトピック “ATOMLite/JSON” のフローに以下の変更を加えました。

    • functionノードの出力先を+1して5方向にした
    • 追加した出力先には受信したJSONオブジェクトの中で、”BMP280_P” というプロパティがあったら、その値をpayloadに載せて送り出すようにした。
    • DHT11の温度表示用のダッシュボードのchartをBMP280の温度でも使うことにした。
    • 温度をハンドルするためのfunctionノードの3番目の送り先にはDHT11とBMP280の2系列のデータを送り込む。
    • 温度の第1系列は、以前から存在するDHT11の温度測定結果で、JSONオブジェクトの中で “TEMP” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”DHT11″を追加するようにした。
    • 温度の第2系列は、今回追加のBMP280の温度センサの測定値。JSONオブジェクトの中で “BMP280_T” というプロパティ名で送られてくる。こちらの結果を送出する際には系列の識別のため topicに”BMP280″を追加するようにした。

実際のフローはこんな感じ。

Dashboard_BMP280

本体スクリプト

以下に本体スクリプト全文を示しますが、前述のBMP280用モジュールが、MicroPythonのモジュールサーチパス上に、LiteBMP280.py なる名前で存在する前提であります(過去記事で作製したLiteOnBoard.py、LiteNetwork.py、LiteDHT11.py なども同様です。)

機能的には、以前作成のDHT11で温度湿度を測る部分の後ろにBMP280の温度圧力測定を追加しただけです。Asyncio的には同じインターバルで測定することになります。ただし、外部ハードウエアは取ったり付けたり(今回はDHT11は接続してないです)することがあるので、接続したハードウエアのみ動作するように制御を追加しています。後日、スクリプトを修正しなくてもハード構成が反映できるようにするつもりですが、現状は、extDevControlという大域変数にハード構成を反映した数値を代入する方式です。

import sys
import ujson
import uasyncio
from LiteOnBoard import LiteOnBoard
from LiteNetwork import LiteNetwork
from LiteDHT11 import LiteDHT11
from LiteBMP280 import LiteBMP280

ledColor = (0, 0, 0)
OnBoardDevice = LiteOnBoard()
LiteNet = LiteNetwork()
DHT11 = None
BMP280 = None
DHT11_ON = 0x1
BMP280_ON = 0x2
extDevControl = BMP280_ON

def colorChange(msg):
    global ledColor
    if msg=="R":
        ledColor = (255, 0, 0)
    elif msg=="G":
        ledColor = (0, 255, 0)
    elif msg=="B":
        ledColor = (0, 0, 255)

def callbackSub(topic, msg):
    topicS = topic.decode('utf-8')
    msgS = msg.decode('utf-8')
    print(topicS, msgS)
    if topicS=="ATOMLite/Color":
        print("ATOMLite/Color Message: ", msgS)
        colorChange(msgS)
    if topicS=="ATOMLite/Settings":
        msgJ = ujson.loads(msgS)
        print("ATOMLite/Settings: ")
        for item in msgJ:
            for k,v in item.items():
                if k=="key":
                    ky = v
                if k=="value":
                    vl = v
            print(ky, " = ", vl)

def makeOBJ(num):
    global DHT11, BMP280
    workDic = dict()
    if (extDevControl & DHT11_ON) != 0:
        tpl = DHT11.measure()
        workDic['TEMP'] = tpl[0]
        workDic['HUMI'] = tpl[1]    
    if (extDevControl & BMP280_ON) != 0:
        tpl = BMP280.measure()
        workDic['BMP280_T'] = tpl[0]
        workDic['BMP280_P'] = tpl[1]            
    workDic['A'] = num
    workDic['B'] = LiteNet.currentTIME()
    return ujson.dumps(workDic)

async def reportStatus(period_s):
    while True:
        LiteNet.client.publish("ATOMLite/Status", "ATOMLite Running: " + LiteNet.currentTIME())
        await uasyncio.sleep(period_s)

async def sendJson(period_s):
    sendCounter = 0
    while True:
        jsonSTR = makeOBJ(sendCounter)
        LiteNet.client.publish("ATOMLite/Json", jsonSTR)
        sendCounter += 1
        await uasyncio.sleep(period_s)

async def ledAndUserButton(period_s):
    global OnBoardDevice
    ledFlag = True
    buttonFlag = False
    while True:
        if ledFlag:
            OnBoardDevice.M5ATOMLiteLED(ledColor)
            ledFlag = False
        else:
            OnBoardDevice.M5ATOMLiteLED( (0, 0, 0) )
            ledFlag = True
        if OnBoardDevice.M5ATOMLiteUserButton():
            print("Button ON")
            LiteNet.client.publish("ATOMLite/Button", "Button ON")
            buttonFlag = True
        elif buttonFlag:
            buttonFlag = False
            LiteNet.client.publish("ATOMLite/Button", "Button OFF")            
        await uasyncio.sleep(period_s)

async def mainLoop(period_ms):
    global OnBoardDevice
    taskStatus = uasyncio.create_task(reportStatus(60))
    taskJson = uasyncio.create_task(sendJson(60))
    taskATOMhard = uasyncio.create_task(ledAndUserButton(1))
    while( OnBoardDevice.buttonCounter < 6 ):
        LiteNet.client.check_msg()
        await uasyncio.sleep_ms(period_ms)
    try:
        taskStatus.cancel()
    except:
        print("Exception at taskStatus:", sys.exc_info()[0])
    try:       
        taskJson.cancel()
    except:
        print("Exception at taskJson:", sys.exc_info()[0])
    try:       
        taskATOMhard.cancel()
    except:
        print("Exception at taskATOMhard:", sys.exc_info()[0])
    LiteNet.client.publish("ATOMLite/Button", "N/A")            
    LiteNet.client.publish("ATOMLite/Status", "ATOMLite: out of service.")
    LiteNet.close()
    print("disconnect.")

def extInit():
    global DHT11, BMP280
    if (extDevControl & DHT11_ON) != 0:
        DHT11 = LiteDHT11()
    if (extDevControl & BMP280_ON) != 0:
        BMP280 = LiteBMP280()
        BMP280.setup()

def main():
    if not LiteNet.do_connect():
        print("ERROR: WiFi connection.")
        sys.exit(1)
    LiteNet.initRTC()
    if not LiteNet.connectMqttBroker(callbackSub):
        print("ERROR: MQTT Broker connection.")
        sys.exit(1)
    extInit()
    uasyncio.run(mainLoop(1000))
            
if __name__ == "__main__":
    main()
実行結果

ホスト(Raspberry Pi 3 model B+)上で実行されているMQTTブローカ経由でNode-REDへ報告されたBMP280の気圧と温度の測定結果をchart化したものをPC上のブラウザで見たところが以下です。

Dashboard_chartsとりあえずそれらしく動いているみたい。今回はこんなところね。

MicroPython的午睡(51) ATOMLite、BMP280の補償計算大変なのね へ戻る

MicroPython的午睡(53) ESPRESSIF、ESP-EYEでLチカを へ進む