MicroPython的午睡(46) uasyncioでMQTT送受信、ATOMLite

Joseph Halfmoon

前回までで「エッジ側」M5ATOM LiteとNode-REDサーバー(Raspberry Pi 3)との間でMQTTを介してJSONオブジェクトを送受できるようになりました。しかし不満なのは送受信のタイミングです。前回まで単一のmainループにお任せだったので便利とは言えない仕組みでした。今回から uasyncioを使って改良していきたいと思います。

※「MicroPython的午睡」投稿順 Indexはこちら

(末尾に実験に使ったスクリプト全文を掲げました)

送受信する内容をJSON化することで、応用が効きやすくなりました。しかし、送受のタイミングは1個のループだけで制御しています。前回は10秒に1回転というタイミングで制御していたために、Node-RED側から送信してもそれが反映するまで結構時間が掛かったりしていました。かと言ってループの回転を無暗と上げるのも考えものです。無駄な処理や通信にリソースを食うばかりかと思います。それぞれ対象毎に適切なインターバルというか、時間がある筈。例えばゆっくり変化する気温を測るのであれば、1分に1回、あるいは10分に1回でも十分かもしれません。一方人間様の指令となると、10秒も待たせたら「壊れた」と不安に思われるでしょう。なんとか1秒の遅延くらいは許されるかもしれないですが「モッサリした反応」と言われるような気がします。結局、

対象毎、個別に動作タイミングを調整できる

のがよろしいかとおもいます。そこで今回から使いますのが、uasyncioモジュールです。PythonのasyncioのMicroPython版です。日本語ドキュメントへのリンクが以下に。

uasyncio — 非同期 I/O スケジューラ

コルーチンを使った並行動作ができるものです。以前にも以下の投稿で使ってみたことがあります(そのときはフルPythonでしたが。)

IoT何をいまさら(79) Python, threadingとasyncioなLチカ

IO待ちなどがある場合には非常に有効だと思います。MQTT送受信などまさにうってつけかと。

コルーチンなので、プリエンプティブに制御を取り上げて次の仕事を開始するわけではなく、自分のタスクで「待ち」に入ったときに自分から制御権を手放すことで、並行にいくつもの仕事が進んでいるように「みせかける」わけです。

今回の uasyncio タスクの構成

今回は、やっていることは前回とまったく同じ(Node-RED側のフローは前回とまったく同じ)で、MicroPython側の制御を以下の3つのTASKに分割してみました。

mainLoop、前回のメインループ同等ですが、余計な仕事を別Taskに追い出し、10秒毎から毎秒回転に高速化しました。もっと早くすることもできるのですが、負荷かけたくないのでこんなもんかと。このループの中でMQTTのsubscribeをチェックしてその処理にトリガをかけています。よって最悪でも2秒以内には到来したNode-RED側からの「下り」の指令が処理される筈。

reportStatus、前回はメインループの中で回っていたので毎10秒毎に「生きているよ」のハートビート信号のつもりのATOMLite内のRTC時刻を報告してました。しかし、それほど高頻度で報告は要らないように思いました。「止まっている」なら止まったと言ってくれればよい、という感じ。毎分程度の報告で十分かと考えました。止まっているときは以下のように表示されるようにしました。

sendJson、後でセンサの計測結果など「各種混載して」送信するつもりで用意したJSONオブジェクトの送信です。サーバーへの報告は5秒に1回としました。当然、それ以下のレゾリューションでの処理はATOMLite側(エッジ)で担当、結果だけをサーバーへ送るという感じですかね。

末尾のスクリプトを参照していただくと、”async” な関数が3つ見つかると思います。

    1. 初期化処理(WiFiやMQTT接続処理など)後、mainLoop()起動。「回転周期」1000ミリ秒指定(後でもっと短くできるように。)
    2. mainLoop()処理のループ前に、以下の2つのタスクを生成。
    3. reportStatusは「回転周期」60秒
    4. sendJsonは「回転周期」5秒

今回は何か受信を待ってタスクを動作させるという条件はなく、それぞれ勝手な周期で処理を進めるだけです。

実行結果

スクリプトを走らせた後の、Node-REDダッシュボード「ATOMLite」タブの様子が以下です。Statusをみると 10:45:08に Runningとありますが、ATOMLiteから送られてくるJSONオブジェクトのBに載せたタイムスタンプは10:45:53と異なるものになっています。

Dashboardまた、M5ATOM LiteのREPLに接続しているターミナル上で観察すると、Node-RED側からの「指令」はNode-RED側のボタンを押すと「間もなく」到来しているように見えました。前回の反応の鈍さにくらべると改善。いい加減な確認ですが。

connecting to network...
timeout: 2
network config: ('xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx')
2021/10/07 10:45:08
ATOMLite/Settings [{"key":"PARAM_A","value":"initial_A"},{"key":"PARAM_B","value":"initial_B"}]
ATOMLite/Settings: 
PARAM_A = initial_A
PARAM_B = initial_B
disconnect.

まあ、これでお仕事毎の切り分けもできそうなので、次回からはM5ATOM Liteに手足をつけるなどして、実質的な通信を行っていきたいと思います。

MicroPython的午睡(45) MQTTでJSON-OBJ受信、M5ATOMLite へ戻る

MicroPython的午睡(47) genericポートでもM5ATOMLiteはOKよ へ進む

MicroPython スクリプト全文
import network, ntptime, machine, time, sys
import ujson
import umqtt.simple
import uasyncio

global wlan, mqttBroker, client
ssid = "Your SSID"
password = "Your Password"
ntpAdr = "Your NTP server"
brokerAdr = "Your Broker Address"
brokerPort = 1883
tz = 9

def callbackSub(topic, msg):
    topicS = topic.decode('utf-8')
    msgS = msg.decode('utf-8')
    print(topicS, msgS)
    if topicS=="ATOMLite/Color":
        print("ATOMLite/Color Message: ", msgS)
    if topicS=="ATOMLite/Settings":
        msgJ = ujson.loads(msgS)
        print("ATOMLite/Settings: ")
        for item in msgJ:
            for k,v in item.items():
                if k=="key":
                    ky = v
                if k=="value":
                    vl = v
            print(ky, " = ", vl)

def connectMqttBroker(bAdr, bPort, clientId):
    global client
    try:
        client = umqtt.simple.MQTTClient(clientId, bAdr, port=bPort)
        client.set_callback(callbackSub)
        client.connect()
        client.subscribe("ATOMLite/Color")
        client.subscribe("ATOMLite/Settings")
    except:
        return False
    return True

def do_connect(ssid, pw, timeout, opt=True):
    global wlan
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('connecting to network...')
        wlan.connect(ssid,pw)
        while (not wlan.isconnected()) and (timeout > 0):
            start = time.ticks_ms()
            while time.ticks_diff(time.ticks_ms(), start) < 1000:
                pass
            timeout -= 1
    if opt:
        print('timeout: ', timeout)
        print('network config: ', wlan.ifconfig())
    return wlan.isconnected()

def currentTIME(timZone):
    t0 = machine.RTC().datetime()
    return "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(t0[0],t0[1],t0[2],t0[4]+timZone,t0[5],t0[6])
    
def initRTC(timZone, opt=True):
    ntptime.host = ntpAdr
    ntptime.settime()
    if opt:
        print(currentTIME(timZone))

def makeOBJ(num):
    workDic = dict()
    workDic['A'] = num
    workDic['B'] = currentTIME(tz)
    return ujson.dumps(workDic)

async def reportStatus(period_s):
    while True:
        client.publish("ATOMLite/Status", "ATOMLite Running: " + currentTIME(tz))
        await uasyncio.sleep(period_s)

async def sendJson(period_s):
    sendCounter = 0
    while True:
        jsonSTR = makeOBJ(sendCounter)
        client.publish("ATOMLite/Json", jsonSTR)
        sendCounter += 1
        await uasyncio.sleep(period_s)

async def mainLoop(period_ms):
    taskStatus = uasyncio.create_task(reportStatus(60))
    taskJson = uasyncio.create_task(sendJson(5))
    loopCounter = 0
    while( loopCounter < 360 ):
        loopCounter += 1
        client.check_msg()
        await uasyncio.sleep_ms(period_ms)
    try:
        taskStatus.cancel()
    except:
        print("Exception at taskStatus:", sys.exc_info()[0])
    try:       
        taskJson.cancel()
    except:
        print("Exception at taskJson:", sys.exc_info()[0])
    client.publish("ATOMLite/Status", "ATOMLite: out of service.")
    client.disconnect()
    print("disconnect.")
    
def main():
    if not do_connect(ssid, password, 10):
        print("ERROR: WiFi connection.")
        sys.exit(1)
    initRTC(tz)
    if not connectMqttBroker(brokerAdr, brokerPort, "ATOMLite"):
        print("ERROR: MQTT Broker connection.")
        sys.exit(1)
    uasyncio.run(mainLoop(1000))
            
if __name__ == "__main__":
    main()