前回、uasyncioのお陰でNode-REDサーバとの通信タイミングを自由に決められるようになりました。今回からM5ATOMLiteに「手足」をつけ、前回までに整備?した「インフラ」を使ってNode-REDとの間を接続して行きたいとおもいます。外付けハードの制御の前に、最初は内蔵のLEDとスイッチだな。
※「MicroPython的午睡」投稿順 Indexはこちら
(今回実験に使ったスクリプト全文は末尾に)
MicroPythonを使う前に、CでプログラムしていたM5ATOMLiteとNode-REDとの間で接続はできていました。その折にM5ATOMLite内蔵の以下のハードウエアはNode-REDダッシュボードから操作できるようになっていました。
-
- Node-REDダッシュボードからの指令でM5ATOMLiteのフルカラーLEDの点灯色を変更(単純な赤、緑、青だけれども)
- M5ATOMLiteのユーザーボタンが押されているか否かをNode-REDダッシュボードに報告
それが M5ATOMLite にESP32用のMicroPython “generic” ポートを書き込んで仕切り直したために使えなくなっていました。今回は「失われた機能」の復活です。
M5ATOMLite のハードウエアを調べるのは以下のページからです。Schematicというよりブロックダイアグラムですが、そこにはピン番号が書いてあるので、今回目的からはそれで足ります。
さて、その制御です。M5xx系デバイスにはそれぞれのデバイスに特化したMicroPythonが用意されているものが多いですが、今回M5ATOMLiteに搭載しているMicroPythonは ESP32用の “generic” ポートで特定のハードウエア構成向けに特化しているバージョンではありません。しかし、以下のドキュメントを見ると
単純なプッシュスイッチである、M5ATOMLiteのユーザーボタンは勿論、1個しかないフルカラーLEDも、なんらモジュール等を追加インストールすることなく使えることが分かりました。”generic”ポートでも「ほぼほぼ標準」neopixel、フルカラーLEDをサポートしているためです。”generic”ポートはM5ATOMLite向きだね!
前回から追加の仕様
以下の機能を追加することにいたしました。
-
- フルカラーLEDを点滅させる(毎秒)
- 点灯色はNode-REDダッシュボードから指定された色(赤、緑、青)とする
- ユーザーボタンを押す(ON)、離す(OFF)をNode-REDダッシュボードに報告する
- ユーザーボタンの長押し(6秒)でNode-REDとの接続を切り、スクリプトを終了させる
前回まではテスト用に360秒動作後、勝手に終了していましたが、今回からはユーザーボタンの長押しをしない限りM5ATOMLiteはNode-REDとの連携動作を続けます。また動作中はLEDが点滅するので動いていることが視認できます。すこしまともになった?
ハードウエアの設定
全文は末尾に追加しましたが、M5ATOMLiteのハードウエアを反映した設定部分は以下のあたりです。
ledpin = machine.Pin(27, machine.Pin.OUT) led = neopixel.NeoPixel(ledpin, 8) ledColor = (0, 0, 0) buttonpin = machine.Pin(39, machine.Pin.IN, machine.Pin.PULL_UP) buttonCounter = 0
追加のタスクと関数
MQTTのTopicは特に追加せず以前のものをそのまま使っています。また、LEDとUserボタンのサポートのため、uasyncioのタスクを1個追加しました。以下のタスクの中でLEDの点滅とキーのチェックを行っています。
async def ledAndUserButton(period_s)
毎秒チェックが入るような設定にしています。
LEDの点灯色の設定は前回もSubscribeしていた”ATOMLite/Color”トピックの処理に追加しています。チェックしたキーの状態は”ATOMLite/Button”トピックへPublishするだけです。またキーチェック関数ではキーが押し続けられると長押しカウンタがカウントアップされるようになっています。キーの長押しによるタスクの終了は、mainLoop 内で長押しカウンタの値を読んで判断しています。
実験結果
M5ATOMLite動作中のNode-REDダッシュボードの様子を以下に示します。MQTTブローカとNode-REDはRaspberry Pi 3で動作しています。画面はパソコン上のブラウザからNode-REDダッシュボードに接続してキャプチャしたものです。
一応、内蔵ペリフェラルとの接続は「復旧」した感じ。次回は、外付け回路だな。
MicroPython的午睡(46) uasyncioでMQTT送受信、ATOMLite へ戻る
MicroPython的午睡(48) 中途半端なリファクタリング。組み込みには組み込みの? へ進む
import network, ntptime, machine, time, sys import neopixel import ujson import umqtt.simple import uasyncio global wlan, mqttBroker, client ssid = "Your SSID" password = "Your Password" ntpAdr = "Your NTP server" brokerAdr = "Your MQTT broker" brokerPort = 1883 tz = 9 ledpin = machine.Pin(27, machine.Pin.OUT) led = neopixel.NeoPixel(ledpin, 8) ledColor = (0, 0, 0) buttonpin = machine.Pin(39, machine.Pin.IN, machine.Pin.PULL_UP) buttonCounter = 0 def M5ATOMLiteLED(tpl): global led led[0] = tpl led.write() def M5ATOMLiteUserButton(): global buttonpin, buttonCounter if buttonpin.value() == 0: buttonCounter += 1 return True buttonCounter = 0 return False def colorChange(msg): global ledColor if msg=="R": ledColor = (255, 0, 0) elif msg=="G": ledColor = (0, 255, 0) elif msg=="B": ledColor = (0, 0, 255) def callbackSub(topic, msg): topicS = topic.decode('utf-8') msgS = msg.decode('utf-8') print(topicS, msgS) if topicS=="ATOMLite/Color": print("ATOMLite/Color Message: ", msgS) colorChange(msgS) if topicS=="ATOMLite/Settings": msgJ = ujson.loads(msgS) print("ATOMLite/Settings: ") for item in msgJ: for k,v in item.items(): if k=="key": ky = v if k=="value": vl = v print(ky, " = ", vl) def connectMqttBroker(bAdr, bPort, clientId): global client try: client = umqtt.simple.MQTTClient(clientId, bAdr, port=bPort) client.set_callback(callbackSub) client.connect() client.subscribe("ATOMLite/Color") client.subscribe("ATOMLite/Settings") except: return False return True def setupSubscriber(): global client def do_connect(ssid, pw, timeout, opt=True): global wlan wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('connecting to network...') wlan.connect(ssid,pw) while (not wlan.isconnected()) and (timeout > 0): start = time.ticks_ms() while time.ticks_diff(time.ticks_ms(), start) < 1000: pass timeout -= 1 if opt: print('timeout: ', timeout) print('network config: ', wlan.ifconfig()) return wlan.isconnected() def currentTIME(timZone): t0 = machine.RTC().datetime() return "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(t0[0],t0[1],t0[2],t0[4]+timZone,t0[5],t0[6]) def initRTC(timZone, opt=True): ntptime.host = ntpAdr ntptime.settime() if opt: print(currentTIME(timZone)) def makeOBJ(num): workDic = dict() workDic['A'] = num workDic['B'] = currentTIME(tz) return ujson.dumps(workDic) async def reportStatus(period_s): while True: client.publish("ATOMLite/Status", "ATOMLite Running: " + currentTIME(tz)) await uasyncio.sleep(period_s) async def sendJson(period_s): sendCounter = 0 while True: jsonSTR = makeOBJ(sendCounter) client.publish("ATOMLite/Json", jsonSTR) sendCounter += 1 await uasyncio.sleep(period_s) async def ledAndUserButton(period_s): ledFlag = True buttonFlag = False while True: if ledFlag: M5ATOMLiteLED(ledColor) ledFlag = False else: M5ATOMLiteLED( (0, 0, 0) ) ledFlag = True if M5ATOMLiteUserButton(): print("Button ON") client.publish("ATOMLite/Button", "Button ON") buttonFlag = True elif buttonFlag: buttonFlag = False client.publish("ATOMLite/Button", "Button OFF") await uasyncio.sleep(period_s) async def mainLoop(period_ms): global buttonCounter taskStatus = uasyncio.create_task(reportStatus(60)) taskJson = uasyncio.create_task(sendJson(5)) taskATOMhard = uasyncio.create_task(ledAndUserButton(1)) while( buttonCounter < 6 ): client.check_msg() await uasyncio.sleep_ms(period_ms) try: taskStatus.cancel() except: print("Exception at taskStatus:", sys.exc_info()[0]) try: taskJson.cancel() except: print("Exception at taskJson:", sys.exc_info()[0]) try: taskATOMhard.cancel() except: print("Exception at taskATOMhard:", sys.exc_info()[0]) client.publish("ATOMLite/Button", "N/A") client.publish("ATOMLite/Status", "ATOMLite: out of service.") client.disconnect() print("disconnect.") def main(): if not do_connect(ssid, password, 10): print("ERROR: WiFi connection.") sys.exit(1) initRTC(tz) if not connectMqttBroker(brokerAdr, brokerPort, "ATOMLite"): print("ERROR: MQTT Broker connection.") sys.exit(1) uasyncio.run(mainLoop(1000)) if __name__ == "__main__": main()