介護の隙間から(45) BLEデバイスで夜間頻尿モニタリングの試み

Joseph Halfmoon

1年半ぶりくらいで「介護の隙間から」再始動であります。その初回は「セルフ介護」ネタ。年齢もあり各種治療も受けてはおるのであります。その症状の一端?として夜間頻尿あり!(断言してどうなる)ということで、その様子を手元のIoTデバイスにて自動モニタリングせん、ということであります。極めて個人的な。

夜間頻尿の説明については、日本泌尿器科学会様の以下のページの説明が分かり易いかと思います。

夜間、何度も排尿で起きる

そこにも記されておりますが原因となりそうな基礎疾患、私はデパート状態、糖尿、高血圧、そして腎臓には小さな石ころも鎮座(落ちてくると七転八倒)、そしてなんといってももっとも大きいのが

睡眠時無呼吸症候群

であります。CPAP使用しておりますので、症状は抑えられているとはいいつつ、時々装着失敗したまま眠りこむとトイレに行きたくなって起きます(起きないとお陀仏の可能性あり、頻尿原因に「とりあえず」救われているという認識もありますが。)

さて上記のホームページでは現状把握のために「排尿日誌」なるものの作成をお勧めしておられます。ううむ、でもメンドイです。尿量を測定するためには毎回計量が必要です(昔、検査で一日の尿量測ったことありますがメンドイ。)しかし、トイレに行ったことくらいは自動記録ができるでしょう。こんな感じ。

  1. トイレのドアの開け閉めの「音」をマイクで拾う
  2. 音量レベルの急激な増加「イベント」を検出したら無線でサーバに連絡
  3. サーバ側でイベント時刻を記録
  4. 記録データやその処理結果はスマホやPCのブラウザで見られるようにする

ハッキリ言ってチョロい(などと言うと得てしてうまく行かないノダ。)そして既に1,2,3は以下の別シリーズ投稿にて準備済(勿論、この目的に使うつもりでした。)

ブロックを積みながら(23) BBC micro:bit v2とラズパイでサウンドモニタ

具体的な構成は以下の通り

  • お子様に人気の低価格なBluetooth Low EnergyデバイスBBC micro:bit V2を末端の検出装置といたします。(末端価格?2000円チョイ)
  • micro:bit V2はマイクロホンを搭載、サウンドレベルの変化を検出する関数があります。また、定番のNordic社のBLE UARTサービスも起動可能。これを使えばBLE対応の装置に通信可能です。なお、古いV1.xにはマイクロホンが搭載されていないです。
  • BLEパケットの受信装置、兼サーバとしては、これまた定番の Raspberry Pi 3 model B+(以下ラズパイ3と略称)を使用いたします。(末端価格?5000円チョイ)
  • ラズパイ3上ではいくつかのソフトウエアを走らせておきます。第1はPython3上で実行されるBLEパケットを受信して前処理をしMQTTブローカに接続するためのスクリプトです。今回実験に使用した仮版全文を末尾に掲げました。第2はMQTTブローカです。定番のMosquitto使わせていただいとります。第3は nodeREDです。MQTTブローカの関係トピックをサブスクライブしてフロー処理してくれます。結局スクリプト以外は定番のソフトばかりなので、適当にインストールして走らせるのは造作もないと思います。
  • ラズパイ3の上でnodeREDが走っているので、WiFiあるいは有線LANで接続できるパソコン、スマホからnodeREDのダッシュボードへアクセスすればモニタできるです。

まあ、実験設備のハードウエア費用としては確実に1万円以下。そしてmicro:bitのファームウエアとPython3スクリプトは既に原型を作ってあるので、今回新設したのは以下のnodeREDのフローのみです。
nodeRed Frequent Urination Monitor Flow

micro:bit側の処理のおさらい

micro:bit側では、以下の処理を行っています。

  • 毎分1回、温度を測ってBLE UART経由で報告する(一種のハートビート信号のつもり。毎分1回の報告が途切れたら電池切れなどの問題発生ということ。)
  • 設定された音量を超えるサウンドイベントを検出したらBLE UART経由で報告
  • 設定音量を下回った時も報告。
  • 設定音量はキーで上下調整可能。調整した際もBLE UARTで報告

なお、micro:bitはトイレの内部の本棚(なぜかあります)の棚の上に設置してあります。トイレのドアの開閉に関して、確実にサウンドイベントが発生するように音量調整済です。

ラズパイ3はトイレからは壁を隔てて数m離れた場所に設置してあるのですが、特にBLE通信に支障は無いです。

Pythonスクリプトの処理

末尾に全文掲げたスクリプトの処理の骨子は以下のようです。

  • 特定のBBC micro:bit V2とコネクトする
  • BLE UART経由でのパケット到来を待つ。
  • 何かパケット到来すればMQTTブローカにトピックStatusはRunningと報告。
  • 今回設定では連続3分間パケットが到来しなかったらStatusにエラー発生を報告。
  • 温度のパケットであれば、トピックTemperatureに報告。
  • サウンドイベントであれば、トピックEVENTに報告。
nodeREDダッシュボード

MQTTブローカからのデータを処理するフロー図は上に示したので、結果を表示するためのダッシュボードを以下に示します。

MonitorDashBoard4Status欄は、Runningになっているので、継続的にBLEパケットが到来していることが分かります。Temperatureみるとトイレ内の温度に関係する温度(ボード上のセンサの)が分かります。西日の当たるトイレのため夕方にかなり高温になっていることが見てとれます。オンボードの温度センサの精度は「それなり」なのでもっと高精度な値が必要だったら外付けセンサを使った方が良いかもです。

肝心のイベントですが、EVENT CHARTのところの縦棒がトイレのドアを開け閉めした時刻になっています。残念なこと?に

昨晩は夜中1回しかトイレに起きませんでした。

ううむ、モニタしているだけで抑制効果があったのか?ともかく、トイレの回数は記録可能そうです。

今回の「実証実験」後の改良について

やはり実証実験すると改良点が見えてきます。今回は以下のとおり。

  1. micro:bitのLEDをやたら光らせているが、電池の持ちを考えるともっと抑制した方が良いのでないか
  2. 該当トイレに他に利用者が居ないことを確認しているが、中の本棚のための開閉回(主に昼間)も検出されてしまう。識別方法が必要。
  3. イベントチャートが鋸波的で醜い。
  4. 今回は初回の実証実験ということで生データだけにしたが、毎日の回数、インターバル時間などの集計値も表示したい
  5. 折角のデータはデータベースにも記録しておきたい

次回は(まさか1年半後ではないだろうな)、上記を改良し、再度、実証実験を行いたいと思います。

介護の隙間から(44) 血圧測れるスマートウオッチ買ってみた へ戻る

介護の隙間から(46) 山善 YDR-200AT 防犯通話録音機 へ進む

BBC micro:bit V2からのBLE通信をRaspberry Pi 上のMQTTブローカに接続するためのPython3スクリプト
#! /usr/bin/python3
import argparse
import bluepy
import sys
import time
import paho.mqtt.publish as publish

versionSTR = "soundMonitor v0.2"

class microbitBLE:

    def __init__(self, adr, vb, db):
        self.devadr = adr
        self.pobj = None
        self.uartOBJ = None
        self.verbose = vb
        self.debug = db
        self.callback = None

    def conPobj(self):
        try:
            self.pobj = bluepy.btle.Peripheral()
            self.pobj.connect(self.devadr, bluepy.btle.ADDR_TYPE_RANDOM)
        except:
            print("ERROR: connect.")
            return False
        return True

    def listCharacteristics(self):
        chrLis = self.pobj.getCharacteristics()
        for item in chrLis:
            print(" UUID: {0} Handle: 0x{1:04x} Prop: {2}".format(item.uuid, item.getHandle(), item.propertiesToString()))

    def getDeviceInfo(self):
        work = dict()
        work['DeviceName'] = str(self.pobj.readCharacteristic(0x0003), 'utf-8')
        modelNumCH = self.pobj.getCharacteristics(uuid="00002a24-0000-1000-8000-00805f9b34fb")[0]
        serialNumCH = self.pobj.getCharacteristics(uuid="00002a25-0000-1000-8000-00805f9b34fb")[0]
        firmRevCH = self.pobj.getCharacteristics(uuid="00002a26-0000-1000-8000-00805f9b34fb")[0]
        work['ModelNumber'] = str(self.pobj.readCharacteristic(modelNumCH.getHandle()), 'utf-8')
        work['SerialNumber'] = str(self.pobj.readCharacteristic(serialNumCH.getHandle()), 'utf-8')
        work['FirmwareRevision'] = str(self.pobj.readCharacteristic(firmRevCH.getHandle()), 'utf-8')
        return work

    def getServiceInfo(self):
        return self.pobj.getServices()

    def getDescInfo(self):
        return self.pobj.getDescriptors()

    def setupUart(self):
        chTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        ch_cccd=chTX.getDescriptors(forUUID=0x2902)[0]
        ch_cccd.write(b"\x03\x00", False)
        chRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        self.uartOBJ = bleDelegate(chTX, chRX, self.verbose, self.debug, self.callback)
        self.pobj.withDelegate( self.uartOBJ )

    def sendStrUart(self, arg):
        if self.uartOBJ is None:
            self.setupUart()
        barg = bytes(arg + '\n', 'utf-8')
        if len(barg) > 20:
            return None
        self.pobj.writeCharacteristic(0x002a, barg, False)
        return barg

    def uartInfoTX(self):
        uartTX = self.pobj.getCharacteristics(uuid="6E400002-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("TX Handle: 0x{0:04x} Prop: {1}".format(uartTX.getHandle(), uartTX.propertiesToString()))
        descTX = uartTX.getDescriptors()
        for item in descTX:
            print(str(item))

    def uartInfoRX(self):
        uartRX = self.pobj.getCharacteristics(uuid="6E400003-B5A3-F393-E0A9-E50E24DCCA9E")[0]
        print("RX Handle: 0x{0:04x} Prop: {1}".format(uartRX.getHandle(), uartRX.propertiesToString()))
        descRX = uartRX.getDescriptors()
        for item in descRX:
            print(str(item))

    def wfn(self, tout):
        return self.pobj.waitForNotifications(tout)

    def close(self):
        self.pobj.disconnect()

class bleDelegate(bluepy.btle.DefaultDelegate):
    def __init__(self, TXD, RXD, vb, db, cb):
        bluepy.btle.DefaultDelegate.__init__(self)
        self.TX = TXD
        self.RX = RXD
        self.verbose = vb
        self.debug = db
        self.callback = cb

    def handleNotification(self, hdl, dat):
        if self.verbose:
            print("hdl={0}, dat={1}".format(hdl, dat))
        if self.callback is None:
            print("{0}".format(dat))
        else:
            self.callback(dat)

class soundMonitor():

        def __init__(self, bobj, itvl):
            self.bleObj = bobj
            self.waitInterval = itvl
            self.ERROR = 0
            self.runFlag = False
            self.currentTime = None
            self.eventCount = 0

        def processData(self, arg):
            datStr = arg.strip().decode()
            datLis = datStr.split(":")
            self.runFlag = True
            self.currentTime = time.time()
            wtime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(self.currentTime))
            print("{0}={1}".format(wtime,datStr))
            publish.single("MBV2/Status", "Running", hostname="127.0.0.1")
            if (len(datLis) > 1) and (datLis[0]=="TEMP"):
                publish.single("MBV2/Temperature", datLis[1], hostname="127.0.0.1")
            if datLis[0].startswith("L"):
                self.eventCount += 1
                publish.single("MBV2/EVENT", "1", hostname="127.0.0.1")
            if datLis[0].startswith("Q"):
                publish.single("MBV2/EVENT", "0", hostname="127.0.0.1")

        def waitLoop(self):
            if self.bleObj.uartOBJ is None:
                self.bleObj.callback = self.processData
                self.bleObj.setupUart()
            else:
                self.ERROR = 1
                return False
            try:
                while True:
                    if not self.bleObj.wfn(self.waitInterval):
                        self.ERROR = 2
                        return False
            except KeyboardInterrupt:
                return True

def tryIntParse(argstr):
    try:
        return int(argstr)
    except ValueError:
        return None

def main():
    parser = argparse.ArgumentParser(description='microbitBLEutil.')
    parser.add_argument('--ADR', nargs=1, help='BLE address.')
    parser.add_argument('--N', nargs=1, help='0) Gv1.5 1) Gv2 2) Rv1.5')
    parser.add_argument('--SEND', nargs=1, help='Send string via UART service.')
    parser.add_argument('-uartinfo', dest='uartinfo', help='uart Info.', action='store_true', default=False)
    parser.add_argument('-info', dest='info', help='device Info.', action='store_true', default=False)
    parser.add_argument('-service', dest='service', help='available service list.', action='store_true', default=False)
    parser.add_argument('-desc', dest='descriptor', help='available descriptor list.', action='store_true', default=False)
    parser.add_argument('-uuid', dest='uuid', help='list all UUIDs.', action='store_true', default=False)
    parser.add_argument('-sm', dest='smonitor', help='Start Sound Monitor.', action='store_true', default=False)
    parser.add_argument('-d', dest='debug', help='print debug information.', action='store_true', default=False)
    parser.add_argument('-v', dest='verbose', help='Verbose mode.', action='store_true', default=False)
    parser.add_argument('-V', dest='VERSION', help='Show Version, then exit', action='store_true', default=False)
    args = parser.parse_args()

    print(versionSTR)
    if args.VERSION:
        sys.exit(0)

    devLis = ["F5:D1:5F:02:68:25", "F7:FB:7A:03:96:AA", "F8:5C:FE:9B:B0:86"]
    devadr =  devLis[0]
    if args.ADR is not None:
        devadr = args.ADR[0]

    if args.N is not None:
        idx = tryIntParse(args.N[0])
        if (idx is not None) and (idx < len(devLis)):
            devadr = devLis[idx]

    mb = microbitBLE(devadr, args.verbose, args.debug)

    if mb.conPobj():
        print("CONNECTED: ", devadr)
    else:
        sys.exit(1)

    if args.uuid:
        mb.listCharacteristics()

    if args.info:
        for key, value in mb.getDeviceInfo().items():
            print(key, "=", value)

    if args.service:
        for srv in mb.getServiceInfo():
            print(str(srv))

    if args.descriptor:
        for desc in mb.getDescInfo():
            print(str(desc))

    if args.uartinfo:
        mb.uartInfoTX()
        mb.uartInfoRX()

    if args.SEND is not None:
        print("SEND:", mb.sendStrUart(args.SEND[0]))
        print("Wait for Notification")
        if not mb.wfn(10):
            print("NO NOTIFICATION within 10 seconds.")

    if args.smonitor:
        sm = soundMonitor(mb, 180)
        if not sm.waitLoop():
            msg = "ECODE={0}".format(sm.ERROR)
            print("ERROR: ", msg)
            publish.single("MBV2/Status", msg, hostname="127.0.0.1")
        else:
            print("\nExit by User Interrupt.")
            publish.single("MBV2/Status", "Terminated by User", hostname="127.0.0.1")

    mb.close()
    print("DISCONNECTED.")

if __name__ == "__main__":
    main()