
今回からようやく外付けのデバイスをM5ATOMLiteに接続して、ラズパイで走っているNode-REDとの間を繋ぐところに入ってまいります。その1は、ありがちな「温度、湿度」のご報告です。ここまでの準備で、回路も簡単、ソフトも簡単、ほとんど何もやることがないです。本当か?
※「MicroPython的午睡」投稿順 Indexはこちら
さて、今回温湿度の測定に使うのは「Arduino業界のド定番」DHT11センサであります。そして、ありがたいことに ESP32用の “generic” ポートのMicroPythonでもDHT11センサ用のライブラリ・モジュールは最初から含まれております。日本語版のクイックリファレンスはこちら。
完全ソフト処理なので、接続に使う端子(DHT11はたった1本で接続できます)を自由に選べます。またDHT11は、Arduino Unoであると5V電源で使いますが、3.3V電源からでも動作するようなので、3.3V電源を与えて使用します。
現物は下の写真です。左端の青色のデバイスがDHT11です。電源、GNDとDATA線の3本を接続します。DATA線は5kΩでプルアップせよ、と書いてあったのですが、手持ちの4.7kΩで代用してます。このくらい大丈夫だろ~。右がM5ATOMLiteをお手製の治具に載せたものです。M5ATOMLiteの裏面端子を接続しやすくしただけのもの。
自分でも端子がどうなっているのか忘れていたので、上記の全回路図を一応下記に掲げます。上の方にPCA9306が居ますが、今回は5V系のデバイスを接続しているわけでないので使ってないです(上記写真の治具の右端に載っている8ピンのDIP化ボードがAE-PCA9306です。)

MicroPythonのコード(追加部分)
前回の「リファクタリング」もどきで、新規追加の機能はすべからく別ファイルの class として追い出すことになったので、DHT11からの温度、湿度読み出し機能について以下のファイルを追加しました。DHT11とインタフェースするための「中のメンドイ」ことはMicroPythonのライブラリモジュールが皆やってくれるので、使用は簡単明快です。わざわざ屋上屋を重ねるようなクラスは要らない感じなのですが、他との釣り合いにより、ラッパ的なものを重ねましたです。ま、ハードウエア端子を隠蔽する役割ですかね。なお、単独ファイルで実行すればDHT11の読み取りテストができます。
import dht
import machine, time
class LiteDHT11:
"""DHT11 on G22 pin
"""
def __init__(self):
self.dht11 = dht.DHT11(machine.Pin(22))
self.TEMP = None
self.HUMI = None
def measure(self):
self.dht11.measure()
self.TEMP = self.dht11.temperature()
self.HUMI = self.dht11.humidity()
return (self.TEMP, self.HUMI)
def main():
dht11 = LiteDHT11()
loopCounter = 0
while( loopCounter < 60 ):
loopCounter += 1
tpl = dht11.measure()
print("TEMPERATURE: ", tpl[0])
print("HUMIDITY : ", tpl[1])
time.sleep(60)
if __name__ == "__main__":
main()
上記のファイル追加により、メインのスクリプトは以下のようになりました。すでに作ってあったJSONオブジェクトをMQTTで送り出す仕組みの中にDHT11の温度と湿度を入れ込んだので、変更箇所はごくわずかです。一番の変更は、JSONオブジクトを送信する非同期タスクの周期を60秒としたことですかね。気温や湿度などはそれほど急激に変わらないだろ~ということで。
import sys
import ujson
import uasyncio
from LiteOnBoard import LiteOnBoard
from LiteNetwork import LiteNetwork
from LiteDHT11 import LiteDHT11
ledColor = (0, 0, 0)
OnBoardDevice = LiteOnBoard()
LiteNet = LiteNetwork()
DHT11 = LiteDHT11()
def colorChange(msg):
global ledColor
if msg=="R":
ledColor = (255, 0, 0)
elif msg=="G":
ledColor = (0, 255, 0)
elif msg=="B":
ledColor = (0, 0, 255)
def callbackSub(topic, msg):
topicS = topic.decode('utf-8')
msgS = msg.decode('utf-8')
print(topicS, msgS)
if topicS=="ATOMLite/Color":
print("ATOMLite/Color Message: ", msgS)
colorChange(msgS)
if topicS=="ATOMLite/Settings":
msgJ = ujson.loads(msgS)
print("ATOMLite/Settings: ")
for item in msgJ:
for k,v in item.items():
if k=="key":
ky = v
if k=="value":
vl = v
print(ky, " = ", vl)
def makeOBJ(num):
tpl = DHT11.measure()
workDic = dict()
workDic['A'] = num
workDic['B'] = LiteNet.currentTIME()
workDic['TEMP'] = tpl[0]
workDic['HUMI'] = tpl[1]
return ujson.dumps(workDic)
async def reportStatus(period_s):
while True:
LiteNet.client.publish("ATOMLite/Status", "ATOMLite Running: " + LiteNet.currentTIME())
await uasyncio.sleep(period_s)
async def sendJson(period_s):
sendCounter = 0
while True:
jsonSTR = makeOBJ(sendCounter)
LiteNet.client.publish("ATOMLite/Json", jsonSTR)
sendCounter += 1
await uasyncio.sleep(period_s)
async def ledAndUserButton(period_s):
global OnBoardDevice
ledFlag = True
buttonFlag = False
while True:
if ledFlag:
OnBoardDevice.M5ATOMLiteLED(ledColor)
ledFlag = False
else:
OnBoardDevice.M5ATOMLiteLED( (0, 0, 0) )
ledFlag = True
if OnBoardDevice.M5ATOMLiteUserButton():
print("Button ON")
LiteNet.client.publish("ATOMLite/Button", "Button ON")
buttonFlag = True
elif buttonFlag:
buttonFlag = False
LiteNet.client.publish("ATOMLite/Button", "Button OFF")
await uasyncio.sleep(period_s)
async def mainLoop(period_ms):
global OnBoardDevice
taskStatus = uasyncio.create_task(reportStatus(60))
taskJson = uasyncio.create_task(sendJson(60))
taskATOMhard = uasyncio.create_task(ledAndUserButton(1))
while( OnBoardDevice.buttonCounter < 6 ):
LiteNet.client.check_msg()
await uasyncio.sleep_ms(period_ms)
try:
taskStatus.cancel()
except:
print("Exception at taskStatus:", sys.exc_info()[0])
try:
taskJson.cancel()
except:
print("Exception at taskJson:", sys.exc_info()[0])
try:
taskATOMhard.cancel()
except:
print("Exception at taskATOMhard:", sys.exc_info()[0])
LiteNet.client.publish("ATOMLite/Button", "N/A")
LiteNet.client.publish("ATOMLite/Status", "ATOMLite: out of service.")
LiteNet.close()
print("disconnect.")
def main():
if not LiteNet.do_connect():
print("ERROR: WiFi connection.")
sys.exit(1)
LiteNet.initRTC()
if not LiteNet.connectMqttBroker(callbackSub):
print("ERROR: MQTT Broker connection.")
sys.exit(1)
uasyncio.run(mainLoop(1000))
if __name__ == "__main__":
main()
Node-RED側の修正
Raspberry Pi 3 で動作しているMQTTブローカについては何も修正の必要はありません。Node-REDのフローについては、MQTTのトピック “ATOMLite/Json” を扱うフローにグラフ表示用のダッシュボードを2つ追加しています。温度グラフと湿度グラフ用です。
大分以前にここのフローを書いたときに、JSONオブジェクトの分解仕分けにfunctionノードを使ってしまいました。他のノードで仕分けた方がカッコ良いようにも思うのですが、今回はMicroPythonスクリプトメインなので、そのままの方法としました。
上記のフローをデプロイした後のNode-REDダッシュボードの様子をアイキャッチ画像に掲げました。グラフが2つ加わって、いかにもIoTっぽくなってきた?感じ。数時間後の温度湿度のグラフは以下です。
今回1ピンだけでインタフェースがとれるデバイスだったから、次回は2ピンか?

