MicroPython的午睡(48) 中途半端なリファクタリング。組み込みには組み込みの?

Joseph Halfmoon

前回、内蔵のLEDとスイッチがNode-REDから使えるように復旧しました。今回はいよいよ外付けハード追加と思ったのですが、その前にスクリプトを整理することにいたしました。カッコよく言うならリファクタリングというやつ。でも作業しながら、どうするのが良いのか迷いましたです。迷える子羊(?そうは見えないが)にお導きを。

※「MicroPython的午睡」投稿順 Indexはこちら

(実験に使ったソース全文は末尾に。機能は前回とまったく同じ。)

やろうとしたことは単純です。毎度の「インクリメンタル」な作業の結果、MicroPythonのスクリプトが長くなりすぎました。今後書き換えずに済みそうな部分は「モジュール」として別ファイルに分離して忘れてしまいたい(再掲載不要とな。)幸いMicroPythonは、ローカルなファイルシステムをサポートしているので、そこにモジュールを置いておくことができます。ESP32用 “generic” ポートの場合、デフォルトのモジュール・サーチパスに以下が含まれています。

    • /  (ルートディレクトリ)
    • /lib

今回はライブラリらしく、/lib に分離したファイルを設置することにいたしました。ファイルを分割して置けばいいだけのことだろーとも思ったのですが、いろいろ迷いました。列挙すれば以下のとおり。

    1. 分割の粒度はどのくらいにしておくべきか?
    2. 装置固有のIDとか、どこに書いておくべきか?
    3. ハードウエア機能は重複起動しないように管理すべきか?
    4. ドキュメンテーション・コメントはどこまで書く?

迷いながらも、適当に決めたのは以下のとおりであります。

分割

今回の場合、asyncio を使って、複数の「タスク」を走らせています。それぞれに担当する仕事はほぼ関係がありません。現在のところそれぞれの「タスク」はとても小さいデス。

    • 全体を眺めやすいように、非同期タスクを起動する「MAINな」ところとタスクの上位関数は同じファイルに並べておく。
    • タスクの中身に大きな仕事をいれる場合には別ファイルのクラスにお願いするようにする。

今回、別ファイルに「追い出した」のは、M5ATOMLiteの内蔵ペリフェラル(といってLEDとプッシュスイッチですが)を管理するクラス(LiteOnBoard)と、WiFiネットワークに接続し、NTPで実時間タイマを使えるようにし、またMQTTのクライアントとしてMQTTブローカへのアクセスを提供するクラス(LiteNetwork)の2つとしました。もともとLEDとプッシュスイッチは別々な装置、また、WiFiとNTPとMQTTも別なネットワーク階層ですが、一緒に動いているのだし、今回のスクリプトの場合一つにまとめておいた方が見通しがいいじゃん、という考えです(アサハカ?)

後で外部装置を追加するときには、その装置を管理するクラスを別ファイルで追加することにします。そして、非同期タスクのどれかに管理を追加するか、またはタスク新設すれば、他の部分は変更しないで済むだろ~。本当か?

即座に分割。

装置固有の情報はどこに書くべきか?

装置のIDとか、アドレスとか、装置固有の情報をソースに書き込んでおくのはよろしくないだろ~とは考えました。例えば、数十台程度の装置を使って「IoTの実証実験」的なことをするとしたら、いちいち個別にソースを書き換えることはしないでしょう。ネットワーク経由で必要な情報を配れるのが一番です。最低限の情報は、ほぼ全てのマイコンが何等かのサポートをしている設定用のノンボラタイルメモリ領域か、バッテリバックアップされているRAM領域に書き込んでおく、というのが現実的な気がします。

幸いESP32にもそのような領域はあります。しかしそのような領域の使用方法はマイコン機種毎に特殊です。たった1台だけの今回ケースでそんな配慮は、正直いってメンドい、という判断に傾きました。クラスの中の変数に「とりあえず」設定情報は閉じ込めておけば、いいんじゃね。1か所に書いてあれば、後でどうとでもなるし。。。(結局、メンドイだけじゃん。)

ハードウエア固有機能の複数起動の回避

各種機能をクラスとして分離、独立させていくと、複数のインスタンスを作ることができます。スイッチやLED、無線ネットワークIFなど一つしかないものに複数インスタンスを起動するのは困るので、「シングルトン」化して複数起動を防ぐようにするべきか、と思いました。しかし、これもメンドイから止めておこう、です。そんなハードウエアを操作する輩は、もともと分かって使っているのだから、わざわざな配慮など不要、コードとメモリの無駄!という言い訳です。(結局、メンドイだけじゃん。)

ドキュメンテーション・コメント

パソコン上の「普通の」Pythonで「書き捨て」でないスクリプトを書いているときは、PYDOCでドキュメントを抽出できるようにコメントを書いています。気がつくとファイル全長の半分くらいコメント書いていたりします。

モジュール化にあたってはドキュメンテーション・コメントを充実した方が良いかな、とも思ったのです。しかしMicroPythonの場合、ささやかな容量の「ローカルファイルシステム依存です。M5ATOMLiteの場合、昔のフロッピイディスクよりはマシな容量が使えますが、micro:bit などだと30Kバイト!です。ダラダラしていると溢れる、という理由から「ほぼ無」としました。

なお、記事でも「どーせ、地の文章がコメントみたいなもんだし」という理由と、「コメントいれるとタダでさえ長くて読みずらいソース部分がダラダラと長くなりすぎる」という言い訳から、ドキュメンテーション・コメントは含めないようにしています。(結局、メンドイだけじゃん。)

リファクタリングの結果

/lib/LiteOnBoard.py

import machine, time
import neopixel

class LiteOnBoard:
    """M5ATOM Lite LED and BUTTON Class

    """
    def __init__(self):
        self.ledpin = machine.Pin(27, machine.Pin.OUT)
        self.led = neopixel.NeoPixel(self.ledpin, 8)
        self.ledColor = (0, 0, 0)
        self.buttonpin = machine.Pin(39, machine.Pin.IN, machine.Pin.PULL_UP)
        self.buttonCounter = 0
    
    def M5ATOMLiteLED(self, tpl):
        self.ledColor = tpl
        self.led[0] = tpl
        self.led.write()

    def M5ATOMLiteUserButton(self):
        if self.buttonpin.value() == 0:
            self.buttonCounter += 1
            return True
        self.buttonCounter = 0
        return False

def main():
    dut = LiteOnBoard()
    dut.M5ATOMLiteLED((255, 255, 255))
    while dut.buttonCounter < 3:
        dut.M5ATOMLiteUserButton()
        time.sleep(1)
    dut.M5ATOMLiteLED((0, 0, 0))

if __name__ == "__main__":
    main()

/lib/LiteNetwork.py

import network, ntptime, machine, time, sys
import ujson
import umqtt.simple

class LiteNetwork:
    """M5ATOM Lite WiFi and MQTT Class

    """
    def __init__(self):
        self.ssid = "Your SSID"
        self.password = "Your PASSWORD"
        self.ntpAdr = "Your NTP server"
        self.tz = Your_Time_zone(JST=9)
        self.timeout = 20
        self.brokerAdr = "Your Broker ADDRESS"
        self.brokerPort = 1883
        self.clientId = "Your MQTT CLIENT ID"
        self.wlan = None
        self.client = None
        self.callbackSub = None

    def do_connect(self, opt=True):
        self.wlan = network.WLAN(network.STA_IF)
        self.wlan.active(True)
        if not self.wlan.isconnected():
            print('connecting to network...')
            self.wlan.connect(self.ssid, self.password)
            while (not self.wlan.isconnected()) and (self.timeout > 0):
                start = time.ticks_ms()
                while time.ticks_diff(time.ticks_ms(), start) < 1000:
                    pass
                self.timeout -= 1
        if opt:
            print('timeout: ', self.timeout)
            print('network config: ', self.wlan.ifconfig())
        return self.wlan.isconnected()

    def currentTIME(self):
        t0 = machine.RTC().datetime()
        return "{0}/{1:02d}/{2:02d} {3:02d}:{4:02d}:{5:02d}".format(t0[0],t0[1],t0[2],t0[4]+self.tz,t0[5],t0[6])
    
    def initRTC(self, opt=True):
        ntptime.host = self.ntpAdr
        ntptime.settime()
        if opt:
            print(self.currentTIME())
    
    def connectMqttBroker(self, callbackFuncName):
        try:
            self.callbackSub = callbackFuncName
            self.client = umqtt.simple.MQTTClient(self.clientId, self.brokerAdr, port=self.brokerPort)
            self.client.set_callback(self.callbackSub)
            self.client.connect()
            self.client.subscribe("ATOMLite/Color")
            self.client.subscribe("ATOMLite/Settings")
        except:
            return False
        return True
    
    def close(self):
        self.client.disconnect()
        self.client = None
        self.wlan.disconnect()
        self.wlan = None

スクリプト本体

import sys
import ujson
import uasyncio
from LiteOnBoard import LiteOnBoard
from LiteNetwork import LiteNetwork

ledColor = (0, 0, 0)
OnBoardDevice = LiteOnBoard()
LiteNet = LiteNetwork()

def colorChange(msg):
    global ledColor
    if msg=="R":
        ledColor = (255, 0, 0)
    elif msg=="G":
        ledColor = (0, 255, 0)
    elif msg=="B":
        ledColor = (0, 0, 255)

def callbackSub(topic, msg):
    topicS = topic.decode('utf-8')
    msgS = msg.decode('utf-8')
    print(topicS, msgS)
    if topicS=="ATOMLite/Color":
        print("ATOMLite/Color Message: ", msgS)
        colorChange(msgS)
    if topicS=="ATOMLite/Settings":
        msgJ = ujson.loads(msgS)
        print("ATOMLite/Settings: ")
        for item in msgJ:
            for k,v in item.items():
                if k=="key":
                    ky = v
                if k=="value":
                    vl = v
            print(ky, " = ", vl)

def makeOBJ(num):
    workDic = dict()
    workDic['A'] = num
    workDic['B'] = LiteNet.currentTIME()
    return ujson.dumps(workDic)

async def reportStatus(period_s):
    while True:
        LiteNet.client.publish("ATOMLite/Status", "ATOMLite Running: " + LiteNet.currentTIME())
        await uasyncio.sleep(period_s)

async def sendJson(period_s):
    sendCounter = 0
    while True:
        jsonSTR = makeOBJ(sendCounter)
        LiteNet.client.publish("ATOMLite/Json", jsonSTR)
        sendCounter += 1
        await uasyncio.sleep(period_s)

async def ledAndUserButton(period_s):
    global OnBoardDevice
    ledFlag = True
    buttonFlag = False
    while True:
        if ledFlag:
            OnBoardDevice.M5ATOMLiteLED(ledColor)
            ledFlag = False
        else:
            OnBoardDevice.M5ATOMLiteLED( (0, 0, 0) )
            ledFlag = True
        if OnBoardDevice.M5ATOMLiteUserButton():
            print("Button ON")
            LiteNet.client.publish("ATOMLite/Button", "Button ON")
            buttonFlag = True
        elif buttonFlag:
            buttonFlag = False
            LiteNet.client.publish("ATOMLite/Button", "Button OFF")            
        await uasyncio.sleep(period_s)

async def mainLoop(period_ms):
    global OnBoardDevice
    taskStatus = uasyncio.create_task(reportStatus(60))
    taskJson = uasyncio.create_task(sendJson(5))
    taskATOMhard = uasyncio.create_task(ledAndUserButton(1))
    while( OnBoardDevice.buttonCounter < 6 ):
        LiteNet.client.check_msg()
        await uasyncio.sleep_ms(period_ms)
    try:
        taskStatus.cancel()
    except:
        print("Exception at taskStatus:", sys.exc_info()[0])
    try:       
        taskJson.cancel()
    except:
        print("Exception at taskJson:", sys.exc_info()[0])
    try:       
        taskATOMhard.cancel()
    except:
        print("Exception at taskATOMhard:", sys.exc_info()[0])
    LiteNet.client.publish("ATOMLite/Button", "N/A")            
    LiteNet.client.publish("ATOMLite/Status", "ATOMLite: out of service.")
    LiteNet.close()
    print("disconnect.")
    
def main():
    if not LiteNet.do_connect():
        print("ERROR: WiFi connection.")
        sys.exit(1)
    LiteNet.initRTC()
    if not LiteNet.connectMqttBroker(callbackSub):
        print("ERROR: MQTT Broker connection.")
        sys.exit(1)
    uasyncio.run(mainLoop(1000))
            
if __name__ == "__main__":
    main()

MicroPython的午睡(47) genericポートでもM5ATOMLiteはOKよ へ戻る

MicroPython的午睡(49) ATOMLite、温湿度を測ってNode-RED報告 へ戻る