いよいよ受信処理&DBMS記録です。
ネット上で「PostgreSQL MQTT」で検索すると、
先達が多々いらっしゃいましたので、
やればできそう
と安心して臨めました。
■データの全体的流れ
複数PICO-WからPub(メッセージ発信)
→MQTTサーバーで受信
→PCでSub(メーセージ受取)
→受信1件ごとにPostgreSQLへ接続
→データINSERT
→1件ごとにPostgreSQL接続を閉鎖(同時コミット)
■機器配置
・Pub:PICO-W、固定IP接続、Micropython
・MQTTサーバー:前回同様N5105、Windows11Pro
・Sub:macOS、Jupyter@VScode、Python3.11
・PostgreSQLサーバー:i5-4000台、Windows11Pro、PostgreSQLサービス稼働
・PostgreSQLデータ確認:Subと同じmacOS、pgAdmin4
機器関連ポイント:
・PICO-W含めて関係する機器を固定IP化
・MQTTサーバー、PostgreSQLサーバともにサーバー動作に専念
サーバ上ではPythonなどのデータ処理をできるだけ実行させない。
できないわけではなく、機能分離にて要因切り分けを容易にしておく。
■結果
先に結論・・・できました!
出典多々引用し、今回の目的用に編集しました。
ということで、いきなりコード羅列。
■Subコード(注:アドレスなどは例です)
#!usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import paho.mqtt.client as mqtt # MQTTライブラリインポート
import psycopg2 # PostgreSQL接続ライブラリインポート
char_del =","
# 定数定義
address_kabaN5105 = '192.168.10.24'
host = address_kabaN5105 #MQTT-Server IPv4 address
port = 1883 # MQTT-port
topic_sub = 'test_mqtt/+'
ip_postgreSQL = "192.168.10.28"
location_db = "host=" + ip_postgreSQL +" dbname=kabadb user=postgres"
# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
print("Connected with result code " + str(rc)) # 接続できた旨表示
client.subscribe(topic_sub) # subするトピックを設定
# ブローカー接続が切れたときの処理
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
# メッセージが届いたときの処理
def on_message(client, userdata, msg):
# msg.topicにトピック名が,msg.payloadに届いたデータ本体が入っている
now = datetime.datetime.now()
time_sub = str("{0:%Y/%m/%d %H:%M:%S}".format(now))
msg_sub = str(msg.payload)
topic_sub = str(msg.topic)
qos_sub = str(msg.qos)
print(time_sub +char_del+ msg_sub +char_del+ topic_sub +char_del+ qos_sub)
# データベース接続
connection_db = psycopg2.connect(location_db)
cursor = connection_db.cursor()
#データの取得
with connection_db:
with connection_db.cursor() as cursor:
#データの追加
sql_insert = "INSERT INTO table_sub (time_sub,topic_sub,msg_sub,qos_sub) VALUES (%s, %s, %s, %s)"
cursor.execute(sql_insert, (time_sub,topic_sub,msg_sub,qos_sub))
# データベース切断
cursor.close()
connection_db.close()
# MQTTの接続設定
client = mqtt.Client() # クラスのインスタンス(実体)の作成
client.on_connect = on_connect # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect # 切断時のコールバックを登録
client.on_message = on_message # メッセージ到着時のコールバック
client.connect(host, port, 3600) # 接続先はMQTT-Server
client.loop_forever() # 永久ループして待ち続ける
■Pubコード(注:アドレスなどは例です)
import time
import network
from umqtt.simple import MQTTClient
import machine
import sys
# Wi-Fi接続function
def connect():
# 自分の固定IPアドレス設定
IPaddress_mine = '192.168.10.81'
# Wifi起動
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Wifi接続
wlan.connect(ssid, password)
while wlan.isconnected() == False:
print('Waiting for connection...')
time.sleep(1)
wlan_status = wlan.ifconfig()
wlan.ifconfig((IPaddress_mine, wlan_status[1], wlan_status[2], wlan_status[3]))
ip = wlan.ifconfig()[0]
print(f'Connected on {ip}')
return ip
# Subメッセージ表示function
def printMessage(topic, message):
# 受信データをytes型からUTF-8文字列へ変換、標準出力表示
print("topic:" + topic.decode("utf-8") )
print("message:" + message.decode("utf-8") )
# メインルーチンーーーーーーーーーーーーーーーーーーーーーー
# LED制御値設定
on_led = 1
off_led = 0
# 動作条件設定
time_delay = 2.5
time_on_led = 0.5
time_sleep = time_delay - time_on_led
# Wi-Fi接続SSID、パスワード設定
ssid = '*****'
password = '*****'
# MQTTブローカーサーバーIP設定
address_kabaN5105 = '192.168.10.24'
mqttBroker = address_kabaN5105
# ConnectID(ユニーク)設定
myId = 'rpp01'
# トピック(構造化)設定
topic= b"test_mqtt/rpp01"
# メッセージヘッダー設定
msg_header = "rpp01 send "
# 接続維持時間設定
time_keepalive_MQTTClient = 3600
# LEDオブジェクト定義
led_rpp = machine.Pin("LED",machine.Pin.OUT)
# 初動LED確認
led_rpp.value(on_led)
time.sleep(time_on_led)
led_rpp.value(off_led)
# 初期設定
# MQTTオブジェクト定義
client = MQTTClient(myId, mqttBroker, keepalive=time_keepalive_MQTTClient)
# 受信(Subscribe)時に呼ぶ関数の設定
client.set_callback(printMessage)
# Wi-Fi接続
connect()
# ブローカーに接続
try:
# ブローカー接続処理
client.connect()
# Subトピック登録
client.subscribe(topic)
except:
# ブローカー接続失敗時、プログラム終了
print("Could not connect to mqtt server.")
sys.exit()
print("mqqtt connect done ")
# Pub回数リセット
i_count = 0
while True:
i_count = i_count +1
# Pubメッセージ生成
msg = msg_header+str(i_count)
# Pub実行
client.publish(topic, msg)
# LED点滅
led_rpp.value(on_led)
time.sleep(time_on_led)
led_rpp.value(off_led)
# ブローカーPubメッセージ確認チェック
client.check_msg()
time.sleep(time_sleep)
■PostgreSQLのテーブル
idはいわゆる自動増番です。
テストだけならSmallserialで十分です。
が、、、Bigserialにしておけば将来もあまり気を使う必要がないので、、、
■関連出典、メモなど
出典元さま各位、大変参考になりました。コードも結構そのまま使いました。
ありがとうございます。
出典:PosgtreSQL で利用できるデータ型の中で連番型の使い方
https://www.javadrive.jp/postgresql/type/index5.html#section1
→MySQLのAutoincrementはPostgreSQLではSerialが該当する・・・
とまではわかっていますが、じゃ実際どうやって使うの?が
この出典に丁寧に解説されていました。
出典:Psycopg2でPostgreSQLを操作するときのトランザクションについて
https://qiita.com/fauntleroy/items/70c1bfb450bcc61c0964
・INSERT実行後、必ずconn.commit()する。
・SELECTしてfetchした後、必ずcursor.close()する。
→こういった必須ルールの解説、めっちゃ助かりました。
ちゃんとクローズしないとロックかかったままでしたし。。。
出典:psycopg2公式
https://www.psycopg.org/docs/module.html
出典:MQTTのQoS (Quality of Service) とは
https://qiita.com/emqx_japan/items/7f818cb2071183ef7253
→今回の実行例ではQosは全部0になっていました。
要するに非同期ということだそうです。
出典:PythonでPostgreSQLへ接続
→えーのんあるやん!と見てみたら・・・自分自身の今年5月の記事でした。
いまごろになって動作確認完了です。まあ、一件落着です。
出典:PythonでPostgreSQLとやりとりする
https://zenn.dev/collabostyle/articles/36e822520182d3
→とくにSQL文の記述の仕方が参考になりました。
■結果
Subの標準出力スクショ抜粋
PostgreSQLのデータ抜粋(pgAdmin4利用)
両者、反転していて見づらいですが、最後のデータを見比べますと・・・
あたりまえですが、データが一致しました。
どうやらMQTTに関わる技術的な準備は終わりになったようです。
PICO-Wの絶縁入力やロケーション設定、表示などGPIO絡みの課題はまだいっぱいありますが、それらはこれまでもI2CやRTCなどの使い方で積み上げたものを再利用すれば良く、大きな壁ではなさそうです。
最後に、そもそもの狙いのシチュエーションをおさらいしますと・・・
・1区画に設備10台程度、いずれの設備でも稼働サイクル30秒程度、
・それぞれPub機器(PICO-W)設置、
・区画ごとにMQTTサーバとSub機器(ともにPC)設置、
・PostgreSQLサーバは複数区画に1台でもいけるかも。
業種は違えどこんな製造工場はいっぱいあると思います。
いや、もっと速いサイクルで動作する設備だと困るやん?
まあそういうのはそもそもがIoTで稼働を監視・記録するものではなくて、
PLCなどからリアルタイムでSCADAへ上げなくちゃ、と思います。
保安(安全動作)機器などもそうですね。
以上、ラズパイPICO-W&Micropython&MQTT化、3回にて終わり。