ブロックを積みながら(25) BBC micro:bitとラズパイでサウンドモニタ その2

Joseph Halfmoon

前々回、本シリーズ側で作成した「システム?」をアプリ運用側「夜間頻尿モニタリングの試み」で使った結果、フィードバックがかかったので、その修正対応を記していきます。ぶっちゃけMakeCodeエディタに遡ってプログラム修正したのでこちらのシリーズで取り扱うべきであろうかと。

※「ブロックを積みながら」投稿順 index はこちら

さて、アプリ運用側もシステム製造側も同一人物であるので、多分、何も言わずとも心の中では分かりあっていると思われるのでありますが、言葉にして書き留めておきます。まずは実証実験?システムの狙いですが

  1. 夜間頻尿の「頻度」をモニタリングする
  2. 手段としてトイレドア付近に設置したBBC micro:bit v2を使ってドアの開閉音を検出し、Bluetooth無線にてイベント発生を報告する
  3. 報告はRaspberry Pi 上で動作しているnodeREDサーバにて処理し、ダッシュボード上で各種PCやスマホから確認することができる

実際に1週間弱運用して見えてきた問題を整理すると以下のようであります。

  1. micro:bit を単三電池x2本(使いかけのもの)で動作させたが約5日で電池切れとなった。LEDマトリックスをやたら光らせているけれど、電池的には無駄じゃね。
  2. トイレの中に本棚がある、という特殊事情?に鑑み、本来目的外使用時を識別する手段が提供されるべきである!
  3. micro:bitの電池切れたら、ラズパイ上のPythonスクリプトで予想しないエラーがRaiseされて落ちたっす
  4. ドアの開閉イベントの表示が鋸波上で見難い、もっと見やすくしろ
  5. いったい何度トイレに行ったのか集計がない
  6. 頻尿といって間隔はどのくらいか数字が欲しい
  7. 取得データをデータベースに記録しておきたい(ビッグじゃないデータ)
MakeCodeブロックの修正

修正点はLEDの「輝かしい」表示の抑圧と、想定外のイベントの手動による例外処理機能の追加です。まずは、LEDの brightnessを低下させました。BBC micro:bitの特徴であるボード表面のLEDマトリックスがPWM波形で駆動されており、そのデューティによりbrightnessを手加減できるようになっています。ともあれ、数値と見た目の印象は線形でないので、視認性を損なわない程度ということで32(最大255)としてあります。

MC2_0LEDの問題は、「音が大きい」時と「音が小さい」ときでそれぞれを象徴するアイコンを表示するという仕様にもかかわります。音が大きいイベントは待ち受け対象でもあり確認したいですが、音が小さい方は「どうでもよい」感じです。常時「小さい」アイコンを点灯するのは電気の無駄なので「空」のアイコンに替えました。もっと「小さい」消し方ないのかいな。。。

MC2_1キーボードイベント関係では、AとBの2つの物理キーを音響スレショルドの上げ下げに使っています。第3のキーとしてmicro:bitロゴマークを「タッチ」した場合は、目的外使用を示すBLEパケットを送信するような機能を追加しました。v2で拡張されている部分。

MC2_2ハートビート信号として温度を測って報告する関数と、音響スレショルドの変更報告関数については変更なし。念のため再掲します。

MC2_3

Raspberry Pi上のPython3スクリプトの修正

BBC micro:bit V2側の修正を受けて、送信されてくるBLEパケットを受け止めて処理し、MQTTブローカにパブリッシュするPython3スクリプトにも修正が入っています。全文を末尾に再掲載してあります。修正はsoundMonitorクラス内がほとんどです。micro:bitから上がってくるイベントをどう取り扱うべきなのか未だ試行錯誤中です。再度の実証実験結果をまって再々改訂の予定。

NodeRED フローの修正

NodeRED側は大筋に変更はないのですが、フローレベルで回数とか間隔を記録するための「大域変数」を追加、集計値を適宜表示するようになっています。まあこれも再度の実証実験用の暫定実装。MonitorFlow2

データベースへの接続以外は、ほぼフィードバック事項に対応済。昨晩から再度の実証実験中であります。しかしね、トイレにmicro:bitを仕掛けると夜間頻尿収まる感じなのはなぜ?

ブロックを積みながら(24) BBC micro:bit v1.5、消費電流測定その1 へ戻る

ブロックを積みながら(26) BBC micro:bitとラズパイでサウンドモニタ その3 へ進む

micro:bitとnode-REDの橋渡しをするRaspberry Pi 上のPython3スクリプト
#! /usr/bin/python3
import argparse
import bluepy
import sys
import time
import paho.mqtt.publish as publish

versionSTR = "soundMonitor v0.3"

class microbitBLE:

    def __init__(self, adr, vb, db):
        self.devadr = adr
        self.pobj = None
        self.uartOBJ = None
        self.verbose = vb
        self.debug = db
        self.callback = None

    def conPobj(self):
        try:
            self.pobj = bluepy.btle.Peripheral()
            self.pobj.connect(self.devadr, bluepy.btle.ADDR_TYPE_RANDOM)
        except:
            print("ERROR: connect.")
            return False
        return True

    def listCharacteristics(self):
        chrLis = self.pobj.getCharacteristics()
        for item in chrLis:
            print(" UUID: {0} Handle: 0x{1:04x} Prop: {2}".format(item.uuid, item.getHandle(), item.propertiesToString()))

    def getDeviceInfo(self):
        work = dict()
        work['DeviceName'] = str(self.pobj.readCharacteristic(0x0003), 'utf-8')
        modelNumCH = self.pobj.getCharacteristics(uuid="00002a24-0000-1000-8000-00805f9b34fb")[0]
        serialNumCH = self.pobj.getCharacteristics(uuid="00002a25-0000-1000-8000-00805f9b34fb")[0]
        firmRevCH = self.pobj.getCharacteristics(uuid="00002a26-0000-1000-8000-00805f9b34fb")[0]
        work['ModelNumber'] = str(self.pobj.readCharacteristic(modelNumCH.getHandle()), 'utf-8')
        work['SerialNumber'] = str(self.pobj.readCharacteristic(serialNumCH.getHandle()), 'utf-8')
        work['FirmwareRevision'] = str(self.pobj.readCharacteristic(firmRevCH.getHandle()), 'utf-8')
        return work

    def getServiceInfo(self):
        return self.pobj.getServices()

    def getDescInfo(self):
        return self.pobj.getDescriptors()

    def setupUart(self):
        chTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        ch_cccd=chTX.getDescriptors(forUUID=0x2902)[0]
        ch_cccd.write(b"\x03\x00", False)
        chRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        self.uartOBJ = bleDelegate(chTX, chRX, self.verbose, self.debug, self.callback)
        self.pobj.withDelegate( self.uartOBJ )

    def sendStrUart(self, arg):
        if self.uartOBJ is None:
            self.setupUart()
        barg = bytes(arg + '\n', 'utf-8')
        if len(barg) > 20:
            return None
        self.pobj.writeCharacteristic(0x002a, barg, False)
        return barg

    def uartInfoTX(self):
        uartTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("TX Handle: 0x{0:04x} Prop: {1}".format(uartTX.getHandle(), uartTX.propertiesToString()))
        descTX = uartTX.getDescriptors()
        for item in descTX:
            print(str(item))

    def uartInfoRX(self):
        uartRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("RX Handle: 0x{0:04x} Prop: {1}".format(uartRX.getHandle(), uartRX.propertiesToString()))
        descRX = uartRX.getDescriptors()
        for item in descRX:
            print(str(item))

    def wfn(self, tout):
        return self.pobj.waitForNotifications(tout)

    def close(self):
        self.pobj.disconnect()

class bleDelegate(bluepy.btle.DefaultDelegate):
    def __init__(self, TXD, RXD, vb, db, cb):
        bluepy.btle.DefaultDelegate.__init__(self)
        self.TX = TXD
        self.RX = RXD
        self.verbose = vb
        self.debug = db
        self.callback = cb

    def handleNotification(self, hdl, dat):
        if self.verbose:
            print("hdl={0}, dat={1}".format(hdl, dat))
        if self.callback is None:
            print("{0}".format(dat))
        else:
            self.callback(dat)

class soundMonitor():

        def __init__(self, bobj, itvl):
            self.bleObj = bobj
            self.waitInterval = itvl
            self.ERROR = 0
            self.runFlag = False
            self.currentTime = None
            self.lastEventTime = None
            self.Qtime = None
            self.eventCount = 0
            self.eventStat = 0 #0..normal 1..L 2..LL(EVENT)
            self.QMAX = 300

        def processData(self, arg):
            datStr = arg.strip().decode()
            datLis = datStr.split(":")
            self.runFlag = True
            self.currentTime = time.time()
            command = ""
            wtime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.currentTime))
            publish.single("MBV2/Status", "Running", hostname="127.0.0.1")
            if (len(datLis) > 1) and (datLis[0]=="TEMP"):
                publish.single("MBV2/Temperature", datLis[1], hostname="127.0.0.1")
                command += "T "
            if self.Qtime is not None:
                if int(self.currentTime - self.Qtime) > self.QMAX:
                    self.eventStat = 0
                    self.Qtime = None
                    command += "Qtime "
            if datLis[0].startswith("L"):
                self.eventCount += 1
                self.eventStat += 1
                if self.eventStat >=2:
                    publish.single("MBV2/EVENT", "0", hostname="127.0.0.1")
                    if self.lastEventTime is None:
                        publish.single("MBV2/EVENT", "180", hostname="127.0.0.1")
                        command += "180 "
                    else:
                        tInterval = str(int(self.currentTime - self.lastEventTime))
                        publish.single("MBV2/EVENT", tInterval, hostname="127.0.0.1")
                        command += tInterval + " "
                    publish.single("MBV2/EVENT", "0", hostname="127.0.0.1")
                    self.lastEventTime = self.currentTime
                    self.eventStat = 0
            if datLis[0].startswith("Q"):
                self.Qtime = self.currentTime
            if datLis[0].startswith("C"):
                self.eventStat -= 1
                command += "CAN "
            print("{0}={1}, {2}".format(wtime,datStr,command))

        def waitLoop(self):
            if self.bleObj.uartOBJ is None:
                self.bleObj.callback = self.processData
                self.bleObj.setupUart()
            else:
                self.ERROR = 1
                return False
            try:
                while True:
                    if not self.bleObj.wfn(self.waitInterval):
                        self.ERROR = 2
                        return False
            except KeyboardInterrupt:
                return True
            except:
                self.ERROR = 3
                return False

def tryIntParse(argstr):
    try:
        return int(argstr)
    except ValueError:
        return None

def main():
    parser = argparse.ArgumentParser(description='microbitBLEutil.')
    parser.add_argument('--ADR', nargs=1, help='BLE address.')
    parser.add_argument('--N', nargs=1, help='0) Gv1.5 1) Gv2 2) Rv1.5')
    parser.add_argument('--SEND', nargs=1, help='Send string via UART service.')
    parser.add_argument('-uartinfo', dest='uartinfo', help='uart Info.', action='store_true', default=False)
    parser.add_argument('-info', dest='info', help='device Info.', action='store_true', default=False)
    parser.add_argument('-service', dest='service', help='available service list.', action='store_true', default=False)
    parser.add_argument('-desc', dest='descriptor', help='available descriptor list.', action='store_true', default=False)
    parser.add_argument('-uuid', dest='uuid', help='list all UUIDs.', action='store_true', default=False)
    parser.add_argument('-sm', dest='smonitor', help='Start Sound Monitor.', action='store_true', default=False)
    parser.add_argument('-d', dest='debug', help='print debug information.', action='store_true', default=False)
    parser.add_argument('-v', dest='verbose', help='Verbose mode.', action='store_true', default=False)
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    print(versionSTR)
    if args.VERSION:
        sys.exit(0)

    devLis = ["F5:D1:5F:02:68:25", "F7:FB:7A:03:96:AA", "F8:5C:FE:9B:B0:86"]
    devadr =  devLis[0]
    if args.ADR is not None:
        devadr = args.ADR[0]

    if args.N is not None:
        idx = tryIntParse(args.N[0])
        if (idx is not None) and (idx < len(devLis)):
            devadr = devLis[idx]

    mb = microbitBLE(devadr, args.verbose, args.debug)

    if mb.conPobj():
        print("CONNECTED: ", devadr)
    else:
        sys.exit(1)

    if args.uuid:
        mb.listCharacteristics()

    if args.info:
        for key, value in mb.getDeviceInfo().items():
            print(key, "=", value)

    if args.service:
        for srv in mb.getServiceInfo():
            print(str(srv))

    if args.descriptor:
        for desc in mb.getDescInfo():
            print(str(desc))

    if args.uartinfo:
        mb.uartInfoTX()
        mb.uartInfoRX()

    if args.SEND is not None:
        print("SEND:", mb.sendStrUart(args.SEND[0]))
        print("Wait for Notification")
        if not mb.wfn(10):
            print("NO NOTIFICATION within 10 seconds.")

    if args.smonitor:
        sm = soundMonitor(mb, 180)
        if not sm.waitLoop():
            msg = "ECODE={0}".format(sm.ERROR)
            print("ERROR: ", msg)
            publish.single("MBV2/Status", msg, hostname="127.0.0.1")
        else:
            print("\nExit by User Interrupt.")
            publish.single("MBV2/Status", "Terminated by User", hostname="127.0.0.1")

    mb.close()
    print("DISCONNECTED.")

if __name__ == "__main__":
    main()