前回ラズパイのPython3で、micro:bitが送信するBluetooth LEのアドバタイズパケット、Eddystoneを受信することができました。今回は離れた場所においたラズパイ3とラズパイ4で同じパケットを受信、それぞれの受信信号強度をNodeREDダッシュボード上で比較してみました。ロケーション推定もどき。
※「ブロックを積みながら」投稿順 index はこちら
GW前に懸案だったRaspberry Pi 4 を入手しました。既設のRaspberry Pi 3 model B+とあわせ、Bluetoothを受信可能なラズパイ2台を「基地局」に見立て、Eddystone パケットを垂れ流す micro:bit を「ロケーション・タグ」に見立てて、RSSI(受信信号強度)を比較してみたいと思います。
ラズパイ3上では、Mosquitto MQTTブローカ および Node-Red サーバも動作しています。Node-RED接続については以前の投稿で何度もやっていますが、今回のケースではPythonスクリプトで得た生のRSSI値をMQTTブローカに向けて一方通行でPublishすればよしとしました。ラズパイ3上のスクリプトでは自分のRSSIを自分の上で走っているMQTTブローカに送信、ラズパイ4のスクリプトではRSSIをラズパイ3のMQTTブローカに送信です。ラズパイは当然LANに接続されているので2台の間の通信は問題ありません。後の処理はNode-Red側で処理すればよい、という考えであります。今回は、とりあえず、生のRSSI値をNode-REDのダッシュボード上で2つ並べて表示し比較できれば良いとしました。
Node-REDのフローは以下のとおりです。ラズパイ3はMQTTのトピック “Eddystone/Pi3″に向けてRSSI値を文字列で送りこんでくる想定。同様にラズパイ4はトピック”Eddystone/Pi4″に向けてPublishです。両方とも受信データをそのままテキストで表示するとともに、数値に変換し、折れ線グラフにも接続するようにしてあります。
Node-REDのダッシュボードには、Eddystoneという名のタブを新設。上記のダッシュボードのui要素はそこに並べてあります。
ラズパイ3、4ともPython3で動作するスクリプト(末尾に全文を掲げました。装置によってアドレスを変更する必要があります。)を動作させてます。なお、動作には、paho(MQTTクライアント)とbluepyが必要です。それらが未インストールの場合、以下のような感じでインストールしておく必要があります。(bluepyについてはPython以外のライブラリも必要なようなのでGithubのbluepyのインストール手順を確認したほうが良いです。)
$ sudo pip3 paho-mqtt $ sudo pip3 bluepy
末尾のスクリプトはテスト用なので約1分ほどスキャンした後は実行を終えます。
micro:bit については、今回もまた新たなブロックは積んでおらず、以前に作ったEddystone送信プログラムの使いまわしです。
ラズパイ3とラズパイ4は約4mほどの間隔をあけて設置してあります。ラズパイ3から50cmくらい、ラズパイ4からは4mチョイくらいの位置の作業台に micro:bitを置いてみたときのNode-REDダッシュボードの様子を以下に掲げます。ラズパイ4用のスクリプトを後から編集して走らせたので時刻範囲は大分異なります。またmicro:bitの電源を何度かOFFっているので、データが飛んでいるところもありです。
たまたま最後のデータでは10dBくらいの差があり、micro:bitはラズパイ3により近い位置にいることと整合的です。平均値的にもPi3とPi4の間には差がありそうです。しかし、グラフを見るとRSSIの変動幅が結構大きいので、1点の単純な比較では間違うと思います。ラズパイ3と4の間の距離をもっと離すとメリハリついて良いような気もします。また、2台だと位置を絞り込めないので、もう一台追加して形の上で位置を確定できそうな感じにするのもありかと思いました。そういえば、ラズパイZeroもBluetooth使えるのだった。。。
ブロックをつみながら(15) micro:bitのBLEをラズパイPythonで捕捉 へ戻る
ブロックをつみながら(17) micro:bitとラズパイ、ペアリング+Wireshark へ進む
BLE ADVパケットをスキャンしてRSSIをMQTTブローカに送信するPythonサンプル
#! /usr/bin/python3 import paho.mqtt.publish as publish from bluepy.btle import Scanner, DefaultDelegate verbose = True class ScanDelegate(DefaultDelegate): def __init__(self): DefaultDelegate.__init__(self) def handleDiscovery(self, dev, isNewDev, isNewData): if verbose and isNewDev: print("Discoverd device", dev.addr) elif verbose and isNewData: print("Received new data from", dev.addr) def main(): scanner = Scanner().withDelegate(ScanDelegate()) for i in range(5): device = scanner.scan(10.0) for dev in device: if dev.addr == "YOUR BLE ADDRESS": print("Device {0} ({1}), RSSI={2} dB".format(dev.addr, dev.addrType, dev.rssi)) publish.single("Eddystone/Pi4", dev.rssi, hostname="YOUR MQTT BROKER ADDRESS") if verbose: print("Device {0} ({1}), RSSI={2} dB".format(dev.addr, dev.addrType, dev.rssi)) for (adtype, desc, value) in dev.getScanData(): print("{0} = {1}".format(desc, value)) if __name__ == '__main__': verbose = True main()