前回別件に行ってしまいましたが、今回は前々回の流れに戻ってMQTTのSubscribeを行ってみたいと思います。使用するMicroPythonは ESP32用の”generic” portです。M5ATOM Lite上での運用ですが、”generic”なので他のESP32デバイスでも動くと思います。
※「MicroPython的午睡」投稿順 Indexはこちら
(末尾に実験で使用したMicroPythonスクリプト全文を掲げました。前々回のものの「チョイ直し」です。)
Node-Redフロー側の設定
Subscribeに対応するNode-REDのフローは以下のようなシンプルなものです(かなり前にCで実験したときに作成したままのフロー。)左側のLED Colorと書かれているのは、Node-REDダッシュボード上に配置されたドロップダウンメニューです。これに接続している右側のノードが MQTToutノードです。フローからながれてきたメッセージを指定のMQTTトピックに対して発行(Publish)するのがお役目です。
まずドロップダウンメニューの設定を復習してみます。このフローはNode-REDエディタ上、ATOMLiteというタブに置かれているため [ATOMLite]となります。Optionsをつかって、ドロップダウンメニューで表示される文字列(右側)と、そのときに送られる実際のメッセージ(左側)を指定します。M5ATOM Liteは、微妙な色を表現できるRGB-3色LEDを搭載しています。以前に実験したときには単にR、G、Bの頭文字1文字を送信して各単色でLEDを光らせていました(今回は “generic” なのでLEDも光らせませんが。)
次に、MQTToutノードですが、既に設定済のMQTTブローカをサーバに設定します。重要なのは「トピック」に設定する文字列です。M5ATOM Lite側でSubscribeするにあたっては、これを目当てに握ることになるのでこの文字列をM5ATOM Lite側のMicroPythonにコピペしました。
MicroPythonのスクリプト
今回のスクリプト(末尾掲載)は前々回にPublishだけしたときのスクリプトのチョイ変です。変更した部分をかいつまむと、
-
- set_callback()関数でサブスクリプトしたメッセージを処理するコールバック関数を指定
- subscribe()関数でサブスクリプトするTopicを指定
- メインループの中のcheck_msg()関数でメッセージの到着をチェック
コールバック関数自体は、引数にtopicとmsgの2つを取ります。今回設定であると、topicには”ATOMList/Color”、msgにはR, G, Bいずれかの文字が入っている筈です。今回は特段の処理はせず、M5ATOM Liteの標準出力に受信内容を垂れ流すだけです。なお、サブスクリプションの結果として渡されるのは byte型であるので、関数内で通常の文字列(utf-8)に変換しています。
subscribeの場合、いつメッセージが到着するか分からないので、ブロッキングのwait_msg()関数か、ノンブロッキングのcheck_msg()関数でメッセージチェックをする必要があります。今回はノンブロッキングのcheck_msg()関数を約10秒に1回のペースで回っている緩いループの中で使っています。反応遅いですが、テストなので。。。
なお、詳細については、以下のumqtt.simpleのドキュメントの下の”Attension”に説明があります。
umqtt.simple — MQTT client function
細かい話なのですが一個気づきました。uosモジュールとか、usysモジュールとか、頭にMicroPython用であることを示す u の文字があるモジュールの多くは、実際には u をつけずに osとか sys としても問題なく使えます。しかし、umqttはダメなタイプでした。ちゃんと umqtt.simpleとしないとならないようです。
動作確認
前々回同様、スクリプトを起動するとWiFiコネクションを接続し、NTPでM5ATOM Lite内蔵のRTCに正確な時刻(UTC)を設定します。そして毎10秒のゆるゆるのループの中でNode-REDダッシュボードのATOMLite/Statusにむけて時刻を報告します(その時のダッシュボードの様子は先頭のアイキャッチ画像に掲げました。)
その後、Subscribeしているトピックのチェックをして、メッセージが到着していたら標準出力に報告します。こんな感じ。
connecting to network... timeout: 8 network config: ('xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx') 2021/09/16 10:20:14 ATOMLite/Color R Message: R ATOMLite/Color B Message: B ATOMLite/Color G Message: G
パソコン上のブラウザで眺めているNode-REDダッシュボードのメニューからRGB選択すると、10秒以内に(10秒は長すぎて大分イライラ待ちますが)M5ATOM Lite側の標準出力に報告があがります。まあ、一応、上り下り両方できたということで(どういうことだ?)
MicroPython的午睡(42) array.array、意外に小さい使える領域 へ戻る
MicroPython的午睡(44) MQTTでJSON-OBJ送信、M5ATOMLite へ進む
実験に使ったMicroPythonコード全文
実験用なのでテスト時間約1分でループ部分を抜けてしまうので御注意。
import network, ntptime, machine, time, sys import umqtt.simple global wlan, mqttBroker, client ssid = "Your SSID" password = "Your Password" ntpAdr = "Your NTP server" brokerAdr = "Your Broker address" brokerPort = 1883 tz = 9 def callbackColor(topic, msg): topicS = topic.decode('utf-8') msgS = msg.decode('utf-8') print(topicS, msgS) if topicS=="ATOMLite/Color": print("Message: ", msgS) def connectMqttBroker(bAdr, bPort, clientId): global client try: client = umqtt.simple.MQTTClient(clientId, bAdr, port=bPort) client.set_callback(callbackColor) client.connect() client.subscribe("ATOMLite/Color") except: return False return True def setupSubscriber(): global client def do_connect(ssid, pw, timeout, opt=True): global wlan wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('connecting to network...') wlan.connect(ssid,pw) while (not wlan.isconnected()) and (timeout > 0): start = time.ticks_ms() while time.ticks_diff(time.ticks_ms(), start) < 1000: pass timeout -= 1 if opt: print('timeout: ', timeout) print('network config: ', wlan.ifconfig()) return wlan.isconnected() def currentTIME(timZone): t0 = machine.RTC().datetime() return "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(t0[0],t0[1],t0[2],t0[4]+timZone,t0[5],t0[6]) def initRTC(timZone, opt=True): ntptime.host = ntpAdr ntptime.settime() if opt: print(currentTIME(timZone)) def main(): if not do_connect(ssid, password, 10): print("ERROR: WiFi connection.") sys.exit(1) initRTC(tz) if not connectMqttBroker(brokerAdr, brokerPort, "ATOMLite"): print("ERROR: MQTT Broker connection.") sys.exit(1) #loop loopCounter = 0 while( loopCounter < 6 ): loopCounter += 1 client.publish("ATOMLite/Status", "FROM ATOMLite: " + currentTIME(tz)) client.check_msg() time.sleep(10) client.publish("ATOMLite/Status", "ATOMLite: out of service.") client.disconnect() if __name__ == "__main__": main()