
前回別件に行ってしまいましたが、今回は前々回の流れに戻ってMQTTのSubscribeを行ってみたいと思います。使用するMicroPythonは ESP32用の”generic” portです。M5ATOM Lite上での運用ですが、”generic”なので他のESP32デバイスでも動くと思います。
※「MicroPython的午睡」投稿順 Indexはこちら
(末尾に実験で使用したMicroPythonスクリプト全文を掲げました。前々回のものの「チョイ直し」です。)
Node-Redフロー側の設定
Subscribeに対応するNode-REDのフローは以下のようなシンプルなものです(かなり前にCで実験したときに作成したままのフロー。)左側のLED Colorと書かれているのは、Node-REDダッシュボード上に配置されたドロップダウンメニューです。これに接続している右側のノードが MQTToutノードです。フローからながれてきたメッセージを指定のMQTTトピックに対して発行(Publish)するのがお役目です。
まずドロップダウンメニューの設定を復習してみます。このフローはNode-REDエディタ上、ATOMLiteというタブに置かれているため [ATOMLite]となります。Optionsをつかって、ドロップダウンメニューで表示される文字列(右側)と、そのときに送られる実際のメッセージ(左側)を指定します。M5ATOM Liteは、微妙な色を表現できるRGB-3色LEDを搭載しています。以前に実験したときには単にR、G、Bの頭文字1文字を送信して各単色でLEDを光らせていました(今回は “generic” なのでLEDも光らせませんが。)
次に、MQTToutノードですが、既に設定済のMQTTブローカをサーバに設定します。重要なのは「トピック」に設定する文字列です。M5ATOM Lite側でSubscribeするにあたっては、これを目当てに握ることになるのでこの文字列をM5ATOM Lite側のMicroPythonにコピペしました。
MicroPythonのスクリプト
今回のスクリプト(末尾掲載)は前々回にPublishだけしたときのスクリプトのチョイ変です。変更した部分をかいつまむと、
-
- set_callback()関数でサブスクリプトしたメッセージを処理するコールバック関数を指定
- subscribe()関数でサブスクリプトするTopicを指定
- メインループの中のcheck_msg()関数でメッセージの到着をチェック
コールバック関数自体は、引数にtopicとmsgの2つを取ります。今回設定であると、topicには”ATOMList/Color”、msgにはR, G, Bいずれかの文字が入っている筈です。今回は特段の処理はせず、M5ATOM Liteの標準出力に受信内容を垂れ流すだけです。なお、サブスクリプションの結果として渡されるのは byte型であるので、関数内で通常の文字列(utf-8)に変換しています。
subscribeの場合、いつメッセージが到着するか分からないので、ブロッキングのwait_msg()関数か、ノンブロッキングのcheck_msg()関数でメッセージチェックをする必要があります。今回はノンブロッキングのcheck_msg()関数を約10秒に1回のペースで回っている緩いループの中で使っています。反応遅いですが、テストなので。。。
なお、詳細については、以下のumqtt.simpleのドキュメントの下の”Attension”に説明があります。
umqtt.simple — MQTT client function
細かい話なのですが一個気づきました。uosモジュールとか、usysモジュールとか、頭にMicroPython用であることを示す u の文字があるモジュールの多くは、実際には u をつけずに osとか sys としても問題なく使えます。しかし、umqttはダメなタイプでした。ちゃんと umqtt.simpleとしないとならないようです。
動作確認
前々回同様、スクリプトを起動するとWiFiコネクションを接続し、NTPでM5ATOM Lite内蔵のRTCに正確な時刻(UTC)を設定します。そして毎10秒のゆるゆるのループの中でNode-REDダッシュボードのATOMLite/Statusにむけて時刻を報告します(その時のダッシュボードの様子は先頭のアイキャッチ画像に掲げました。)
その後、Subscribeしているトピックのチェックをして、メッセージが到着していたら標準出力に報告します。こんな感じ。
connecting to network...
timeout: 8
network config: ('xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx', 'xxx.xxx.xxx.xxx')
2021/09/16 10:20:14
ATOMLite/Color R
Message: R
ATOMLite/Color B
Message: B
ATOMLite/Color G
Message: G
パソコン上のブラウザで眺めているNode-REDダッシュボードのメニューからRGB選択すると、10秒以内に(10秒は長すぎて大分イライラ待ちますが)M5ATOM Lite側の標準出力に報告があがります。まあ、一応、上り下り両方できたということで(どういうことだ?)
MicroPython的午睡(42) array.array、意外に小さい使える領域 へ戻る
MicroPython的午睡(44) MQTTでJSON-OBJ送信、M5ATOMLite へ進む
実験に使ったMicroPythonコード全文
実験用なのでテスト時間約1分でループ部分を抜けてしまうので御注意。
import network, ntptime, machine, time, sys
import umqtt.simple
global wlan, mqttBroker, client
ssid = "Your SSID"
password = "Your Password"
ntpAdr = "Your NTP server"
brokerAdr = "Your Broker address"
brokerPort = 1883
tz = 9
def callbackColor(topic, msg):
topicS = topic.decode('utf-8')
msgS = msg.decode('utf-8')
print(topicS, msgS)
if topicS=="ATOMLite/Color":
print("Message: ", msgS)
def connectMqttBroker(bAdr, bPort, clientId):
global client
try:
client = umqtt.simple.MQTTClient(clientId, bAdr, port=bPort)
client.set_callback(callbackColor)
client.connect()
client.subscribe("ATOMLite/Color")
except:
return False
return True
def setupSubscriber():
global client
def do_connect(ssid, pw, timeout, opt=True):
global wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect(ssid,pw)
while (not wlan.isconnected()) and (timeout > 0):
start = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), start) < 1000:
pass
timeout -= 1
if opt:
print('timeout: ', timeout)
print('network config: ', wlan.ifconfig())
return wlan.isconnected()
def currentTIME(timZone):
t0 = machine.RTC().datetime()
return "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(t0[0],t0[1],t0[2],t0[4]+timZone,t0[5],t0[6])
def initRTC(timZone, opt=True):
ntptime.host = ntpAdr
ntptime.settime()
if opt:
print(currentTIME(timZone))
def main():
if not do_connect(ssid, password, 10):
print("ERROR: WiFi connection.")
sys.exit(1)
initRTC(tz)
if not connectMqttBroker(brokerAdr, brokerPort, "ATOMLite"):
print("ERROR: MQTT Broker connection.")
sys.exit(1)
#loop
loopCounter = 0
while( loopCounter < 6 ):
loopCounter += 1
client.publish("ATOMLite/Status", "FROM ATOMLite: " + currentTIME(tz))
client.check_msg()
time.sleep(10)
client.publish("ATOMLite/Status", "ATOMLite: out of service.")
client.disconnect()
if __name__ == "__main__":
main()

