ブロックを積みながら(23) BBC micro:bit v2とラズパイでサウンドモニタ

Joseph Halfmoon

前回、懸案解決(いつの間にか)。ということで、早速、ペンディングにしていた実験のプロトタイプを作ってみました。「サウンドモニタ」と呼ばせていただきますが、単にドアなどの開閉音などをイベントとして検出し、bluetoothでホスト機に報告するものです。今回は「実証実験?」用のプロトタイプもどきを作成。

(末尾に micro:bit v2に書き込んだJavaScriptコードと、Raspberry Pi OS上のPython3のデータ受信コード全文を掲げました。)

今回想定しますシステムは、以下のような段取りであります。

  1. BBC micro:bit V2をどこか「ターゲット」の場所に設置
  2. micro:bit で「音」イベントを検出、BLE Uartでイベントを送信
  3. Raspberry Pi 3 model B+上で動作しているPython3スクリプトで受信

この後作成予定のnode-RED「アプリ」にて受信したデータを処理し、目論見の実験をすることにいたします。とりあえずハードに近いところの下回り部分。

ちょこまかとプロトタイプもどきを作成できたのは、先月来、折角のmicro bit v2の「拡張された」メモリが有効活用できない、という悩みがいつの間にか解決されていたためであります。以下にURLを貼り付けておりますMakeCodeエディタのお陰。

Microsoft MakeCode for micro:bit

BBC micro:bit V2側のコード

まずは、micro:bit v2に書き込んだコードから眺めてまいります。

  • 使用するBluetooth サービスは UARTサービスです
  • micro:bit v2内蔵のマイクをつかって音響イベントを検出します
  • イベント検出の閾値は soundTHという変数で調整可能としています(初期値128)
  • ホスト機にmicro:bitが生きていることを知らせるために時々 heartBeat信号を送ります。
  • ホスト機とmicro:bitがコネクトしたとき、またディスコネクトしたときはスピーカを鳴らして知らせます。

こんな感じ

soundMonitor_start

さて肝心の音響イベントの検出は、以下のようです。

  • 閾値上超えのときは、LEDマトリックスに人型?(漢字の「大」くりそつ)を表示し、BLE Uartで”L”を送信
  • 閾値下超えのときは、LEDマトリックスに控えめマーク?を表示し、BLE Uartで”Q”を送信

soundMonitor_onSoundさらに音響イベントの閾値調整は、ボタンAで上昇、ボタンBで下降できます。その際、設定閾値をBLE Uartで報告します。

soundMonitor_onButton

最後に heartBeat信号および、閾値報告の関数です。

  • heartBeat信号を送信するか否かは5秒に一回関数よびだしてチェックする。
  • 12回チェック(1分)毎に micro:bit v2の内蔵温度計で温度を測り、温度を送信することでheartBeatとする。
  • ホスト側では温度の送信が数分途切れたら micro:bitが死んだか、コネクションが切れたと判断すればよい
  • 閾値はセットされるたびに送信関数を呼び出して値をホストに送る

soundMonitor_funcs

データ受信コード(動作確認用)

末尾にデータ受信用のPython3コード全文を掲げましたが、以下の投稿で使ったもののチョイ直しです。

ブロックを積みながら(20) micro:bit、BLE UartへPython書き出し

今回追加した部分は、-sm コマンドラインオプションで呼び出される「Sound Monitor」部分です。

実行結果

以下にホストの Raspberry Pi OS 上での動作の様子を示します。 「–N 1」という部分がありますが、これはターゲットの micro:bit v2のアドレスを記す代わりにスクリプトに「覚えさせてある」値を使用させるだけのオプションです(入力メンドイので。)

micro:bit v2のBLE Uartから文字列を受信するとタイムスタンプとともにダンプします。

音響イベントはLまたQ。閾値設定は SoundTH:、ハートビート信号相当の温度測定は TEMP:となっています。スクリプトは無限ループになっているので、停止させる場合はキーボードインタラプトを入力します。するとBLEをディスコネクトして終わります。

$ python3 soundMonitor.py --N 1 -sm
soundMonitor v0.1
CONNECTED: F7:FB:7A:03:96:AA
2021-06-10 12:30:00=L
2021-06-10 12:30:01=Q
2021-06-10 12:30:01=L
2021-06-10 12:30:02=Q
2021-06-10 12:30:15=SoundTH:130
2021-06-10 12:30:17=SoundTH:129
2021-06-10 12:30:20=TEMP:31
2021-06-10 12:30:26=SoundTH:128
2021-06-10 12:30:38=L
2021-06-10 12:30:39=Q
2021-06-10 12:30:39=L
2021-06-10 12:30:39=Q
2021-06-10 12:31:21=TEMP:31
^C
Exit by User Interrupt.
DISCONNECTED.

いちおう動作しているみたい。次は、MQTTからnode-REDして「実証実験」ですな。しかし、これは「ブロックを積みながら」ではない感じ。別シリーズにての予定。

ブロックを積みながら(22) MakeCode、いつの間にmicro:bit v2対応? へ戻る

micro:bit v2に書き込んだコード(JavaScript形態)
bluetooth.onBluetoothConnected(function () {
    soundExpression.hello.playUntilDone()
})
bluetooth.onBluetoothDisconnected(function () {
    soundExpression.giggle.playUntilDone()
})
input.onButtonPressed(Button.A, function () {
    if (soundTH < 255) {
        soundTH += 1
        reportTH()
    }
})
function heartBeat () {
    counter += 1
    if (counter >= 12) {
        counter = 0
        bluetooth.uartWriteValue("TEMP", input.temperature())
    }
}
input.onSound(DetectedSound.Loud, function () {
    images.iconImage(IconNames.StickFigure).showImage(0)
    bluetooth.uartWriteLine("L")
})
input.onButtonPressed(Button.B, function () {
    if (soundTH > 0) {
        soundTH += -1
        reportTH()
    }
})
function reportTH () {
    bluetooth.uartWriteValue("SoundTH", soundTH)
}
input.onSound(DetectedSound.Quiet, function () {
    images.iconImage(IconNames.SmallDiamond).showImage(0)
    bluetooth.uartWriteLine("Q")
})
let counter = 0
let soundTH = 0
bluetooth.startUartService()
soundTH = 128
input.setSoundThreshold(SoundThreshold.Loud, soundTH)
counter = 0
basic.forever(function () {
    heartBeat()
    basic.pause(5000)
})
Raspberry Pi OS上のPython3でのデータ受信スクリプト
#! /usr/bin/python3
import argparse
import bluepy
import sys
import time

versionSTR = "soundMonitor v0.1"

class microbitBLE:

    def __init__(self, adr, vb, db):
        self.devadr = adr
        self.pobj = None
        self.uartOBJ = None
        self.verbose = vb
        self.debug = db
        self.callback = None

    def conPobj(self):
        try:
            self.pobj = bluepy.btle.Peripheral()
            self.pobj.connect(self.devadr, bluepy.btle.ADDR_TYPE_RANDOM)
        except:
            print("ERROR: connect.")
            return False
        return True

    def listCharacteristics(self):
        chrLis = self.pobj.getCharacteristics()
        for item in chrLis:
            print(" UUID: {0} Handle: 0x{1:04x} Prop: {2}".format(item.uuid, item.getHandle(), item.propertiesToString()))

    def getDeviceInfo(self):
        work = dict()
        work['DeviceName'] = str(self.pobj.readCharacteristic(0x0003), 'utf-8')
        modelNumCH = self.pobj.getCharacteristics(uuid="00002a24-0000-1000-8000-00805f9b34fb")[0]
        serialNumCH = self.pobj.getCharacteristics(uuid="00002a25-0000-1000-8000-00805f9b34fb")[0]
        firmRevCH = self.pobj.getCharacteristics(uuid="00002a26-0000-1000-8000-00805f9b34fb")[0]
        work['ModelNumber'] = str(self.pobj.readCharacteristic(modelNumCH.getHandle()), 'utf-8')
        work['SerialNumber'] = str(self.pobj.readCharacteristic(serialNumCH.getHandle()), 'utf-8')
        work['FirmwareRevision'] = str(self.pobj.readCharacteristic(firmRevCH.getHandle()), 'utf-8')
        return work

    def getServiceInfo(self):
        return self.pobj.getServices()

    def getDescInfo(self):
        return self.pobj.getDescriptors()

    def setupUart(self):
        chTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        ch_cccd=chTX.getDescriptors(forUUID=0x2902)[0]
        ch_cccd.write(b"\x03\x00", False)
        chRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        self.uartOBJ = bleDelegate(chTX, chRX, self.verbose, self.debug, self.callback)
        self.pobj.withDelegate( self.uartOBJ )

    def sendStrUart(self, arg):
        if self.uartOBJ is None:
            self.setupUart()
        barg = bytes(arg + '\n', 'utf-8')
        if len(barg) > 20:
            return None
        self.pobj.writeCharacteristic(0x002a, barg, False)
        return barg

    def uartInfoTX(self):
        uartTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("TX Handle: 0x{0:04x} Prop: {1}".format(uartTX.getHandle(), uartTX.propertiesToString()))
        descTX = uartTX.getDescriptors()
        for item in descTX:
            print(str(item))

    def uartInfoRX(self):
        uartRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("RX Handle: 0x{0:04x} Prop: {1}".format(uartRX.getHandle(), uartRX.propertiesToString()))
        descRX = uartRX.getDescriptors()
        for item in descRX:
            print(str(item))

    def wfn(self, tout):
        return self.pobj.waitForNotifications(tout)

    def close(self):
        self.pobj.disconnect()

class bleDelegate(bluepy.btle.DefaultDelegate):
    def __init__(self, TXD, RXD, vb, db, cb):
        bluepy.btle.DefaultDelegate.__init__(self)
        self.TX = TXD
        self.RX = RXD
        self.verbose = vb
        self.debug = db
        self.callback = cb

    def handleNotification(self, hdl, dat):
        if self.verbose:
            print("hdl={0}, dat={1}".format(hdl, dat))
        if self.callback is None:
            print("{0}".format(dat))
        else:
            self.callback(dat)

class soundMonitor():

        def __init__(self, bobj, itvl):
            self.bleObj = bobj
            self.waitInterval = itvl
            self.ERROR = 0
            self.runFlag = False
            self.currentTime = None

        def processData(self, arg):
            datStr = arg.strip().decode()
            self.runFlag = True
            self.currentTime = time.time()
            wtime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.currentTime)) 
            print("{0}={1}".format(wtime,datStr))

        def waitLoop(self):
            if self.bleObj.uartOBJ is None:
                self.bleObj.callback = self.processData
                self.bleObj.setupUart()
            else:
                self.ERROR = 1
                return False
            try:
                while True:
                    if not self.bleObj.wfn(self.waitInterval):
                        self.ERROR = 2
                        return False
            except KeyboardInterrupt:
                return True

def tryIntParse(argstr):
    try:
        return int(argstr)
    except ValueError:
        return None

def main():
    parser = argparse.ArgumentParser(description='microbitBLEutil.')
    parser.add_argument('--ADR', nargs=1, help='BLE address.')
    parser.add_argument('--N', nargs=1, help='0) Gv1.5 1) Gv2 2) Rv1.5')
    parser.add_argument('--SEND', nargs=1, help='Send string via UART service.')
    parser.add_argument('-uartinfo', dest='uartinfo', help='uart Info.', action='store_true', default=False)
    parser.add_argument('-info', dest='info', help='device Info.', action='store_true', default=False)
    parser.add_argument('-service', dest='service', help='available service list.', action='store_true', default=False)
    parser.add_argument('-desc', dest='descriptor', help='available descriptor list.', action='store_true', default=False)
    parser.add_argument('-uuid', dest='uuid', help='list all UUIDs.', action='store_true', default=False)
    parser.add_argument('-sm', dest='smonitor', help='Start Sound Monitor.', action='store_true', default=False)
    parser.add_argument('-d', dest='debug', help='print debug information.', action='store_true', default=False)
    parser.add_argument('-v', dest='verbose', help='Verbose mode.', action='store_true', default=False)
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    print(versionSTR)
    if args.VERSION:
        sys.exit(0)

    devLis = ["F5:D1:5F:02:68:25", "F7:FB:7A:03:96:AA", "F8:5C:FE:9B:B0:86"]
    devadr =  devLis[0]
    if args.ADR is not None:
        devadr = args.ADR[0]

    if args.N is not None:
        idx = tryIntParse(args.N[0])
        if (idx is not None) and (idx < len(devLis)):
            devadr = devLis[idx]

    mb = microbitBLE(devadr, args.verbose, args.debug)

    if mb.conPobj():
        print("CONNECTED: ", devadr)
    else:
        sys.exit(1)

    if args.uuid:
        mb.listCharacteristics()

    if args.info:
        for key, value in mb.getDeviceInfo().items():
            print(key, "=", value)

    if args.service:
        for srv in mb.getServiceInfo():
            print(str(srv))

    if args.descriptor:
        for desc in mb.getDescInfo():
            print(str(desc))

    if args.uartinfo:
        mb.uartInfoTX()
        mb.uartInfoRX()

    if args.SEND is not None:
        print("SEND:", mb.sendStrUart(args.SEND[0]))
        print("Wait for Notification")
        if not mb.wfn(10):
            print("NO NOTIFICATION within 10 seconds.")
    
    if args.smonitor:
        sm = soundMonitor(mb, 180)
        if not sm.waitLoop():
            print("ERROR: sound monitor. ECODE={0}".format(sm.ERROR))
        else:
            print("\nExit by User Interrupt.")
    
    mb.close()
    print("DISCONNECTED.")

if __name__ == "__main__":
    main()