前回、懸案解決(いつの間にか)。ということで、早速、ペンディングにしていた実験のプロトタイプを作ってみました。「サウンドモニタ」と呼ばせていただきますが、単にドアなどの開閉音などをイベントとして検出し、bluetoothでホスト機に報告するものです。今回は「実証実験?」用のプロトタイプもどきを作成。
※「ブロックを積みながら」投稿順 index はこちら
(末尾に micro:bit v2に書き込んだJavaScriptコードと、Raspberry Pi OS上のPython3のデータ受信コード全文を掲げました。)
今回想定しますシステムは、以下のような段取りであります。
- BBC micro:bit V2をどこか「ターゲット」の場所に設置
- micro:bit で「音」イベントを検出、BLE Uartでイベントを送信
- Raspberry Pi 3 model B+上で動作しているPython3スクリプトで受信
この後作成予定のnode-RED「アプリ」にて受信したデータを処理し、目論見の実験をすることにいたします。とりあえずハードに近いところの下回り部分。
ちょこまかとプロトタイプもどきを作成できたのは、先月来、折角のmicro bit v2の「拡張された」メモリが有効活用できない、という悩みがいつの間にか解決されていたためであります。以下にURLを貼り付けておりますMakeCodeエディタのお陰。
Microsoft MakeCode for micro:bit
BBC micro:bit V2側のコード
まずは、micro:bit v2に書き込んだコードから眺めてまいります。
- 使用するBluetooth サービスは UARTサービスです
- micro:bit v2内蔵のマイクをつかって音響イベントを検出します
- イベント検出の閾値は soundTHという変数で調整可能としています(初期値128)
- ホスト機にmicro:bitが生きていることを知らせるために時々 heartBeat信号を送ります。
- ホスト機とmicro:bitがコネクトしたとき、またディスコネクトしたときはスピーカを鳴らして知らせます。
こんな感じ
さて肝心の音響イベントの検出は、以下のようです。
- 閾値上超えのときは、LEDマトリックスに人型?(漢字の「大」くりそつ)を表示し、BLE Uartで”L”を送信
- 閾値下超えのときは、LEDマトリックスに控えめマーク?を表示し、BLE Uartで”Q”を送信
さらに音響イベントの閾値調整は、ボタンAで上昇、ボタンBで下降できます。その際、設定閾値をBLE Uartで報告します。
最後に heartBeat信号および、閾値報告の関数です。
- heartBeat信号を送信するか否かは5秒に一回関数よびだしてチェックする。
- 12回チェック(1分)毎に micro:bit v2の内蔵温度計で温度を測り、温度を送信することでheartBeatとする。
- ホスト側では温度の送信が数分途切れたら micro:bitが死んだか、コネクションが切れたと判断すればよい
- 閾値はセットされるたびに送信関数を呼び出して値をホストに送る
データ受信コード(動作確認用)
末尾にデータ受信用のPython3コード全文を掲げましたが、以下の投稿で使ったもののチョイ直しです。
ブロックを積みながら(20) micro:bit、BLE UartへPython書き出し
今回追加した部分は、-sm コマンドラインオプションで呼び出される「Sound Monitor」部分です。
実行結果
以下にホストの Raspberry Pi OS 上での動作の様子を示します。 「–N 1」という部分がありますが、これはターゲットの micro:bit v2のアドレスを記す代わりにスクリプトに「覚えさせてある」値を使用させるだけのオプションです(入力メンドイので。)
micro:bit v2のBLE Uartから文字列を受信するとタイムスタンプとともにダンプします。
音響イベントはLまたQ。閾値設定は SoundTH:、ハートビート信号相当の温度測定は TEMP:となっています。スクリプトは無限ループになっているので、停止させる場合はキーボードインタラプトを入力します。するとBLEをディスコネクトして終わります。
$ python3 soundMonitor.py --N 1 -sm soundMonitor v0.1 CONNECTED: F7:FB:7A:03:96:AA 2021-06-10 12:30:00=L 2021-06-10 12:30:01=Q 2021-06-10 12:30:01=L 2021-06-10 12:30:02=Q 2021-06-10 12:30:15=SoundTH:130 2021-06-10 12:30:17=SoundTH:129 2021-06-10 12:30:20=TEMP:31 2021-06-10 12:30:26=SoundTH:128 2021-06-10 12:30:38=L 2021-06-10 12:30:39=Q 2021-06-10 12:30:39=L 2021-06-10 12:30:39=Q 2021-06-10 12:31:21=TEMP:31 ^C Exit by User Interrupt. DISCONNECTED.
いちおう動作しているみたい。次は、MQTTからnode-REDして「実証実験」ですな。しかし、これは「ブロックを積みながら」ではない感じ。別シリーズにての予定。
ブロックを積みながら(22) MakeCode、いつの間にmicro:bit v2対応? へ戻る
ブロックを積みながら(24) BBC micro:bit v1.5、消費電力測定その1 へ進む
micro:bit v2に書き込んだコード(JavaScript形態)
bluetooth.onBluetoothConnected(function () { soundExpression.hello.playUntilDone() }) bluetooth.onBluetoothDisconnected(function () { soundExpression.giggle.playUntilDone() }) input.onButtonPressed(Button.A, function () { if (soundTH < 255) { soundTH += 1 reportTH() } }) function heartBeat () { counter += 1 if (counter >= 12) { counter = 0 bluetooth.uartWriteValue("TEMP", input.temperature()) } } input.onSound(DetectedSound.Loud, function () { images.iconImage(IconNames.StickFigure).showImage(0) bluetooth.uartWriteLine("L") }) input.onButtonPressed(Button.B, function () { if (soundTH > 0) { soundTH += -1 reportTH() } }) function reportTH () { bluetooth.uartWriteValue("SoundTH", soundTH) } input.onSound(DetectedSound.Quiet, function () { images.iconImage(IconNames.SmallDiamond).showImage(0) bluetooth.uartWriteLine("Q") }) let counter = 0 let soundTH = 0 bluetooth.startUartService() soundTH = 128 input.setSoundThreshold(SoundThreshold.Loud, soundTH) counter = 0 basic.forever(function () { heartBeat() basic.pause(5000) })
Raspberry Pi OS上のPython3でのデータ受信スクリプト
#! /usr/bin/python3 import argparse import bluepy import sys import time versionSTR = "soundMonitor v0.1" class microbitBLE: def __init__(self, adr, vb, db): self.devadr = adr self.pobj = None self.uartOBJ = None self.verbose = vb self.debug = db self.callback = None def conPobj(self): try: self.pobj = bluepy.btle.Peripheral() self.pobj.connect(self.devadr, bluepy.btle.ADDR_TYPE_RANDOM) except: print("ERROR: connect.") return False return True def listCharacteristics(self): chrLis = self.pobj.getCharacteristics() for item in chrLis: print(" UUID: {0} Handle: 0x{1:04x} Prop: {2}".format(item.uuid, item.getHandle(), item.propertiesToString())) def getDeviceInfo(self): work = dict() work['DeviceName'] = str(self.pobj.readCharacteristic(0x0003), 'utf-8') modelNumCH = self.pobj.getCharacteristics(uuid="00002a24-0000-1000-8000-00805f9b34fb")[0] serialNumCH = self.pobj.getCharacteristics(uuid="00002a25-0000-1000-8000-00805f9b34fb")[0] firmRevCH = self.pobj.getCharacteristics(uuid="00002a26-0000-1000-8000-00805f9b34fb")[0] work['ModelNumber'] = str(self.pobj.readCharacteristic(modelNumCH.getHandle()), 'utf-8') work['SerialNumber'] = str(self.pobj.readCharacteristic(serialNumCH.getHandle()), 'utf-8') work['FirmwareRevision'] = str(self.pobj.readCharacteristic(firmRevCH.getHandle()), 'utf-8') return work def getServiceInfo(self): return self.pobj.getServices() def getDescInfo(self): return self.pobj.getDescriptors() def setupUart(self): chTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0] ch_cccd=chTX.getDescriptors(forUUID=0x2902)[0] ch_cccd.write(b"\x03\x00", False) chRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0] self.uartOBJ = bleDelegate(chTX, chRX, self.verbose, self.debug, self.callback) self.pobj.withDelegate( self.uartOBJ ) def sendStrUart(self, arg): if self.uartOBJ is None: self.setupUart() barg = bytes(arg + '\n', 'utf-8') if len(barg) > 20: return None self.pobj.writeCharacteristic(0x002a, barg, False) return barg def uartInfoTX(self): uartTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0] print("TX Handle: 0x{0:04x} Prop: {1}".format(uartTX.getHandle(), uartTX.propertiesToString())) descTX = uartTX.getDescriptors() for item in descTX: print(str(item)) def uartInfoRX(self): uartRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0] print("RX Handle: 0x{0:04x} Prop: {1}".format(uartRX.getHandle(), uartRX.propertiesToString())) descRX = uartRX.getDescriptors() for item in descRX: print(str(item)) def wfn(self, tout): return self.pobj.waitForNotifications(tout) def close(self): self.pobj.disconnect() class bleDelegate(bluepy.btle.DefaultDelegate): def __init__(self, TXD, RXD, vb, db, cb): bluepy.btle.DefaultDelegate.__init__(self) self.TX = TXD self.RX = RXD self.verbose = vb self.debug = db self.callback = cb def handleNotification(self, hdl, dat): if self.verbose: print("hdl={0}, dat={1}".format(hdl, dat)) if self.callback is None: print("{0}".format(dat)) else: self.callback(dat) class soundMonitor(): def __init__(self, bobj, itvl): self.bleObj = bobj self.waitInterval = itvl self.ERROR = 0 self.runFlag = False self.currentTime = None def processData(self, arg): datStr = arg.strip().decode() self.runFlag = True self.currentTime = time.time() wtime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.currentTime)) print("{0}={1}".format(wtime,datStr)) def waitLoop(self): if self.bleObj.uartOBJ is None: self.bleObj.callback = self.processData self.bleObj.setupUart() else: self.ERROR = 1 return False try: while True: if not self.bleObj.wfn(self.waitInterval): self.ERROR = 2 return False except KeyboardInterrupt: return True def tryIntParse(argstr): try: return int(argstr) except ValueError: return None def main(): parser = argparse.ArgumentParser(description='microbitBLEutil.') parser.add_argument('--ADR', nargs=1, help='BLE address.') parser.add_argument('--N', nargs=1, help='0) Gv1.5 1) Gv2 2) Rv1.5') parser.add_argument('--SEND', nargs=1, help='Send string via UART service.') parser.add_argument('-uartinfo', dest='uartinfo', help='uart Info.', action='store_true', default=False) parser.add_argument('-info', dest='info', help='device Info.', action='store_true', default=False) parser.add_argument('-service', dest='service', help='available service list.', action='store_true', default=False) parser.add_argument('-desc', dest='descriptor', help='available descriptor list.', action='store_true', default=False) parser.add_argument('-uuid', dest='uuid', help='list all UUIDs.', action='store_true', default=False) parser.add_argument('-sm', dest='smonitor', help='Start Sound Monitor.', action='store_true', default=False) parser.add_argument('-d', dest='debug', help='print debug information.', action='store_true', default=False) parser.add_argument('-v', dest='verbose', help='Verbose mode.', action='store_true', default=False) parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False) args = parser.parse_args() print(versionSTR) if args.VERSION: sys.exit(0) devLis = ["F5:D1:5F:02:68:25", "F7:FB:7A:03:96:AA", "F8:5C:FE:9B:B0:86"] devadr = devLis[0] if args.ADR is not None: devadr = args.ADR[0] if args.N is not None: idx = tryIntParse(args.N[0]) if (idx is not None) and (idx < len(devLis)): devadr = devLis[idx] mb = microbitBLE(devadr, args.verbose, args.debug) if mb.conPobj(): print("CONNECTED: ", devadr) else: sys.exit(1) if args.uuid: mb.listCharacteristics() if args.info: for key, value in mb.getDeviceInfo().items(): print(key, "=", value) if args.service: for srv in mb.getServiceInfo(): print(str(srv)) if args.descriptor: for desc in mb.getDescInfo(): print(str(desc)) if args.uartinfo: mb.uartInfoTX() mb.uartInfoRX() if args.SEND is not None: print("SEND:", mb.sendStrUart(args.SEND[0])) print("Wait for Notification") if not mb.wfn(10): print("NO NOTIFICATION within 10 seconds.") if args.smonitor: sm = soundMonitor(mb, 180) if not sm.waitLoop(): print("ERROR: sound monitor. ECODE={0}".format(sm.ERROR)) else: print("\nExit by User Interrupt.") mb.close() print("DISCONNECTED.") if __name__ == "__main__": main()