いよいよ受信処理&DBMS記録です。

ネット上で「PostgreSQL MQTT」で検索すると、
先達が多々いらっしゃいましたので、
 やればできそう

と安心して臨めました。

 

■データの全体的流れ

複数PICO-WからPub(メッセージ発信)

→MQTTサーバーで受信
 →PCでSub(メーセージ受取)

  →受信1件ごとにPostgreSQLへ接続
   →データINSERT
    →1件ごとにPostgreSQL接続を閉鎖(同時コミット)

■機器配置

 ・Pub:PICO-W、固定IP接続、Micropython

 ・MQTTサーバー:前回同様N5105、Windows11Pro

 ・Sub:macOS、Jupyter@VScode、Python3.11

 ・PostgreSQLサーバー:i5-4000台、Windows11Pro、PostgreSQLサービス稼働

 ・PostgreSQLデータ確認:Subと同じmacOS、pgAdmin4

 機器関連ポイント:

 ・PICO-W含めて関係する機器を固定IP化

 ・MQTTサーバー、PostgreSQLサーバともにサーバー動作に専念
  サーバ上ではPythonなどのデータ処理をできるだけ実行させない。

  できないわけではなく、機能分離にて要因切り分けを容易にしておく。

 

■結果

 

先に結論・・・できました!

出典多々引用し、今回の目的用に編集しました。

ということで、いきなりコード羅列。

 

■Subコード(注:アドレスなどは例です)

 

#!usr/bin/env python
# -*- coding: utf-8 -*- 

import datetime
import paho.mqtt.client as mqtt     # MQTTライブラリインポート
import psycopg2     # PostgreSQL接続ライブラリインポート

char_del =","
# 定数定義
address_kabaN5105 = '192.168.10.24'
host = address_kabaN5105 #MQTT-Server IPv4 address
port = 1883   # MQTT-port
topic_sub = 'test_mqtt/+'
ip_postgreSQL = "192.168.10.28"
location_db = "host=" + ip_postgreSQL +" dbname=kabadb user=postgres"

# ブローカーに接続できたときの処理
def on_connect(client, userdata, flag, rc):
  print("Connected with result code " + str(rc))  # 接続できた旨表示
  client.subscribe(topic_sub)  # subするトピックを設定 

# ブローカー接続が切れたときの処理
def on_disconnect(client, userdata, rc):
  if  rc != 0:
    print("Unexpected disconnection.")

# メッセージが届いたときの処理
def on_message(client, userdata, msg):
  # msg.topicにトピック名が,msg.payloadに届いたデータ本体が入っている
  now = datetime.datetime.now()                   
  time_sub = str("{0:%Y/%m/%d %H:%M:%S}".format(now))
  msg_sub = str(msg.payload)
  topic_sub = str(msg.topic)
  qos_sub = str(msg.qos)
  print(time_sub +char_del+ msg_sub +char_del+ topic_sub +char_del+ qos_sub)
  # データベース接続
  connection_db = psycopg2.connect(location_db)
  cursor = connection_db.cursor()
  #データの取得
  with connection_db:
    with connection_db.cursor() as cursor:
        #データの追加
        sql_insert = "INSERT INTO table_sub (time_sub,topic_sub,msg_sub,qos_sub) VALUES (%s, %s, %s, %s)"
        cursor.execute(sql_insert, (time_sub,topic_sub,msg_sub,qos_sub))
        # データベース切断
        cursor.close()
  connection_db.close()

# MQTTの接続設定
client = mqtt.Client()                 # クラスのインスタンス(実体)の作成
client.on_connect = on_connect         # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect   # 切断時のコールバックを登録
client.on_message = on_message         # メッセージ到着時のコールバック
client.connect(host, port, 3600)  # 接続先はMQTT-Server
client.loop_forever()                  # 永久ループして待ち続ける

 

■Pubコード(注:アドレスなどは例です)

 

import time
import network
from umqtt.simple import MQTTClient
import machine
import sys

#  Wi-Fi接続function
def connect():
    # 自分の固定IPアドレス設定
    IPaddress_mine = '192.168.10.81'
    # Wifi起動
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    # Wifi接続       
    wlan.connect(ssid, password)
    while wlan.isconnected() == False:
        print('Waiting for connection...')
        time.sleep(1)
    wlan_status = wlan.ifconfig()
    wlan.ifconfig((IPaddress_mine, wlan_status[1], wlan_status[2], wlan_status[3]))
    ip = wlan.ifconfig()[0]
    print(f'Connected on {ip}')
    return ip

# Subメッセージ表示function
def printMessage(topic, message):    
    # 受信データをytes型からUTF-8文字列へ変換、標準出力表示
    print("topic:" + topic.decode("utf-8") )
    print("message:" + message.decode("utf-8") )

# メインルーチンーーーーーーーーーーーーーーーーーーーーーー

# LED制御値設定
on_led = 1
off_led = 0
# 動作条件設定
time_delay = 2.5
time_on_led = 0.5
time_sleep = time_delay - time_on_led
# Wi-Fi接続SSID、パスワード設定
ssid = '*****'
password = '*****'
# MQTTブローカーサーバーIP設定
address_kabaN5105 = '192.168.10.24'
mqttBroker = address_kabaN5105

# ConnectID(ユニーク)設定
myId = 'rpp01'
# トピック(構造化)設定
topic= b"test_mqtt/rpp01"
# メッセージヘッダー設定
msg_header = "rpp01 send "
# 接続維持時間設定
time_keepalive_MQTTClient = 3600

# LEDオブジェクト定義
led_rpp = machine.Pin("LED",machine.Pin.OUT)
# 初動LED確認
led_rpp.value(on_led)
time.sleep(time_on_led)
led_rpp.value(off_led)

# 初期設定
# MQTTオブジェクト定義
client = MQTTClient(myId, mqttBroker, keepalive=time_keepalive_MQTTClient)
# 受信(Subscribe)時に呼ぶ関数の設定
client.set_callback(printMessage)
# Wi-Fi接続
connect()

# ブローカーに接続
try:
    # ブローカー接続処理
    client.connect()
    # Subトピック登録
    client.subscribe(topic)
except:
    # ブローカー接続失敗時、プログラム終了
    print("Could not connect to mqtt server.")
    sys.exit()
print("mqqtt connect done ")

# Pub回数リセット
i_count = 0
while True:
    i_count = i_count +1
    # Pubメッセージ生成
    msg = msg_header+str(i_count)
    # Pub実行
    client.publish(topic, msg)
    # LED点滅  
    led_rpp.value(on_led)
    time.sleep(time_on_led)
    led_rpp.value(off_led)
    # ブローカーPubメッセージ確認チェック
    client.check_msg()
    time.sleep(time_sleep)

■PostgreSQLのテーブル

idはいわゆる自動増番です。
テストだけならSmallserialで十分です。
が、、、Bigserialにしておけば将来もあまり気を使う必要がないので、、、

 

■関連出典、メモなど

出典元さま各位、大変参考になりました。コードも結構そのまま使いました。
ありがとうございます。

 

出典:PosgtreSQL で利用できるデータ型の中で連番型の使い方
   https://www.javadrive.jp/postgresql/type/index5.html#section1
 →MySQLのAutoincrementはPostgreSQLではSerialが該当する・・・

  とまではわかっていますが、じゃ実際どうやって使うの?が

  この出典に丁寧に解説されていました。


出典:Psycopg2でPostgreSQLを操作するときのトランザクションについて
  https://qiita.com/fauntleroy/items/70c1bfb450bcc61c0964
 ・INSERT実行後、必ずconn.commit()する。
 ・SELECTしてfetchした後、必ずcursor.close()する。

 →こういった必須ルールの解説、めっちゃ助かりました。

  ちゃんとクローズしないとロックかかったままでしたし。。。


出典:psycopg2公式
  https://www.psycopg.org/docs/module.html

出典:MQTTのQoS (Quality of Service) とは
  https://qiita.com/emqx_japan/items/7f818cb2071183ef7253

  →今回の実行例ではQosは全部0になっていました。
   要するに非同期ということだそうです。

 

出典:PythonでPostgreSQLへ接続

  →えーのんあるやん!と見てみたら・・・自分自身の今年5月の記事でした。

   いまごろになって動作確認完了です。まあ、一件落着です。

 

出典:PythonでPostgreSQLとやりとりする
  https://zenn.dev/collabostyle/articles/36e822520182d3

  →とくにSQL文の記述の仕方が参考になりました。

 

■結果

Subの標準出力スクショ抜粋

 

PostgreSQLのデータ抜粋(pgAdmin4利用)

 

両者、反転していて見づらいですが、最後のデータを見比べますと・・・

あたりまえですが、データが一致しました。

 

どうやらMQTTに関わる技術的な準備は終わりになったようです。
 

PICO-Wの絶縁入力やロケーション設定、表示などGPIO絡みの課題はまだいっぱいありますが、それらはこれまでもI2CやRTCなどの使い方で積み上げたものを再利用すれば良く、大きな壁ではなさそうです。
 

最後に、そもそもの狙いのシチュエーションをおさらいしますと・・・

・1区画に設備10台程度、いずれの設備でも稼働サイクル30秒程度、

・それぞれPub機器(PICO-W)設置、

・区画ごとにMQTTサーバとSub機器(ともにPC)設置、
・PostgreSQLサーバは複数区画に1台でもいけるかも。

業種は違えどこんな製造工場はいっぱいあると思います。


いや、もっと速いサイクルで動作する設備だと困るやん?

まあそういうのはそもそもがIoTで稼働を監視・記録するものではなくて、
PLCなどからリアルタイムでSCADAへ上げなくちゃ、と思います。

保安(安全動作)機器などもそうですね。

 

以上、ラズパイPICO-W&Micropython&MQTT化、3回にて終わり。