virt_flyのブログ -4ページ目

virt_flyのブログ

フライトシミュレーターソフトのFlightGearで仮想飛行を楽しむブログです。

FlightGearで外部表示させた速度・高度・方位データ

↑FlightGearのデータをAmbientdeで表示させることができた グラフ化する意味はないと言えばないが

 

Wi-Fi接続、可視化クライアントサービス

Generic protocol+UDP+Ambientで実現

 

眼が悪くなり、FlightGearの飛行機の計器の値やHUDの目盛りがまるで見えなくなったのが苦痛で、手元に置いたタブレットに表示させたり、計器をこしらえて表示できたなら、と思うこの頃です。

 

先ごろ電子工作で、温度センサーの値をデータ可視化クライアントサービスのAmbientに送信してグラフを描かせることができたものだから、少々欲がでてきました。

 

FlightGearのGeneric protocolやUDPを利用した例もあるようなので、試してみようかなということで、今回のテーマは「FlightGearのデータを外部表示させよう」です。

 

いろいろ試行錯誤しつつ方向を見定めるようにしてきましたが、順次FlightGearのGeneric protocolを利用して目的のプロパティの値をUDPで送信、送信先は慣れたAmbientにするとの方向ですすめ、最終的に成功しましたので報告とします。

 

Ambientでグラフを描いても仕方がないのですが、まずは外部送信を試すことが目的でしたので、今回はご勘弁を。

 

■必要なものと手順

 

ここではWindowsで説明します。

今回は、速度と高度、方位の3項目を送信することにしましたが、他にも様々な値を扱うことができるでしょう。

 

・注意事項

 

最新のFlightGearはディレクトリ構造が変更になったようで、以前ならROOTディレクトリ直下にサブディレクトリとしてあったものが、別途指定した場所に変わってしまっており、FlightGear起動時のオプション指定もややこしくなりました。

 

・Python

 

Pythonでプログラムしますので、あらかじめPythonがインストールされている必要があり、requestsなど必要なものはpip installします。

※Pythonインストール時には、「Add Python 3.x to PATH」にチェックを入れること

 

なお、Raspberry Pi PicoはMicroPythonを使用します。

 

・fgdata.xml

 

プロトコルはfgdata.xmlに記述し、FlightGearのデータファルダ内のProtocolサブフォルダに置きます。

 

<?xml version="1.0"?> <PropertyList>
 <generic>
  <output>
   <line_separator>\n</line_separator>
   <var_separator>,</var_separator>
   <chunk>
     <name>groundspeed-kt</name>
     <node>/velocities/groundspeed-kt</node>
   </chunk>
   <chunk>
     <name>altitude-ft</name>
     <node>/position/altitude-ft</node>
   </chunk>
   <chunk>
     <name>true-heading-deg</name>
     <node>/orientation/true-heading-deg</node>
   </chunk>
  </output>
 </generic>
</PropertyList>

 

・fg_ambient.py

 

データを送信させるためのPythonプログラムを記述し、fg_ambient.py名でC:/Users/自分のフォルダ名/下に置きます。

 

import socket
import json
import requests
import time
import math

# FlightGearからのUDP受信設定
UDP_IP = "127.0.0.1"
UDP_PORT = 5501 # FlightGearの送信ポートに合わせる

# Ambientの設定(自分のチャネルIDとライトキーを入れる)
AMBIENT_CHANNEL_ID = チャネルID
AMBIENT_WRITE_KEY = "ライトキー"
AMBIENT_URL = f"http://ambidata.io/api/v2/channels/{AMBIENT_CHANNEL_ID}/data"

# UDPソケットの準備
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

print("受信開始...")

while True:
  try:
    data, addr =sock.recvfrom(1024)
    decoded = data.decode("utf-8").strip()
    print("受信データ:", decoded)
    values = decoded.split(",")

     # FlightGearの送信順に合わせてインデックスを調整
     groundspeed = float(values[0]) # 速度(ノット)
     altitude = float(values[1]) # 高度(フィート)
     heading = float(values[2]) # 方位(度)

     # 無効な値をチェック
     if any(map(lambda x: x is None or math.isnan(x), [groundspeed, altitude, heading])):
      print("無効なデータを検出、送信スキップ")
      continue

     print(f"Groundspeed: {groundspeed}, Altitude: {altitude}, Heading: {heading}")

     # Ambientに送信するデータ
     payload = {
       "writeKey": AMBIENT_WRITE_KEY,
       "d1": groundspeed,
       "d2": altitude,
       "d3": heading
     }

     headers = {"Content-Type": "application/json"}
     response = requests.post(AMBIENT_URL, data=json.dumps(payload), headers=headers)

     if response.status_code == 200:
       print(" Ambientに送信成功!")
     else:
       print(f" 送信失敗: {response.status_code} - {response.text}")

     time.sleep(1) # 送信間隔を1秒に調整

     except Exception as e:
       print("エラー:", e)
       time.sleep(2) # エラー時は少し待って再試行

 

実行は、CMD(コマンドプロンプト)で以下の通り入力して行います。

 

C:/Users/自分のフォルダ名>python fg_ambient.py

 

 

・起動オプション

 

FlightGearの起動オプションでfgdata.xmlの読み込を指定します。

実行は、CMD(コマンドプロンプト)で以下の通り入力して行います。pathや飛行機、空港は例です。

 

C:/Users/自分のフォルダ名〉"C:\Program Files\FlightGear 2024.1\bin\fgfs.exe" --fg-root="C:/Users/自分のフォルダ名/FlightGear/Downloads/fgdata_2024_1" --fg-aircraft="C:/Users/自分のフォルダ名/FlightGear/Custom Aircraft" --aircraft=F1M2 --airport=RJBB --runway=24L --generic=socket,out,0.1,127.0.0.1,5501,udp,fgdata

 

下の画像は、パイソンプログラムfg_ambient.pyを実行したCMD画面です。FlightGearで飛行中のデータが表示されています。

 

FlightGearデータ、速度・高度・方位をAmbientに表示

↑Pythonプログラムfg_ambient.pyを実行したCMD画面 飛行中の速度、高度、方位の値が流れています

 

今回成功したのは、fgdata.xmlの記述に<node>を使用、<var_separator>を明示したからかと思われます。Copilotに助けられました。

 

 

《関連》

 

 

 

 

 

 

Ambientで表示された気温と湿度のグラフ

 

↑Raspberry Pi Oico Wからセンサー値を送信、クラウドサービスのAmbientでグラフ化

 

Pico Wでデータ可視化クラウドサービスを利用

 

前回は、Raspberry Pi Pico Wを使って、温湿度センサーの測定値をOLEDに表示するとともに、せっかくあるWi-Fi機能を利用したいと、測定値のWEBページ表示を試しました。

 

Webページ表示と言っても、Pico WをWebサーバーにしてイントラネット上のPCのブラウザで値を確認しようとするものでした。

 

ここはやはりWi-Fi機能をいかして、インターネット上のクラウドサービスを利用すべしと測定値の可視化を試すことにしました。

 

■Ambientで測定値のCSVファイル化も

 

Raspberry Pi Pico Wで温湿度を測定しAmbientに送信

↑温湿度センサー(Si7021)の値をOLED、Webページ、Ambientで表示するRaspberry Pi Pico W

 

データをグラフ表示など可視化するクラウドサービスはいろいろあるのでしょうが、使うのは以前にも使用したことのあるAmbient。運よくまだ登録はいきていました。

 

恥ずかしながら、他により便利なサービスがあるのかも知れませんが、一応グラフ化できれば良し、生のデータ自体もCSVファイルとしてダウンロードできるのなら、後でいくらでも加工できろのだからそれで充分と思っています。

 

CSVファイルインポート設定画面

↑Ambientでリアルタイムに受信して時刻を付したたデータはCSVファイルにDL可能

 

多少タイムラグはあるでしょうが、測定値を受信する際にAmbientの方で順次時刻も記録してくれますから、Pico W側で測定時刻をいちいち付して送信する必要がないのも楽です。

 

■Ambientの使い方ープログラミングは生成AIで

 

Ambientの使い方は、おおよそ次の通りです。

①ユーザー登録(無料)

②チャネル生成

③マイコン側プログラミング

④データー送信

⑤可視化(グラフ化)

 

登録(無料)やチャネルの作成、グラフの設定は簡単ですので、具体的には当該サイトをご覧になればよいでしょう。問題になるのは、Ambient用のmicropythonプログラムの書き方です。実例をネットで探すのもよいですが、生成AIに書かせてもよいのでは。私はそうしました。生成AIも進化したもので、今ではプログラミングなら全面的に任せられます(注参照)。

 

今回のプログラムは次の通りです。前回のプログラムにAmbientへの送信部分を追加の上、若干整理しました。

 

import network
import socket
import time
import urequests
import gc
from machine import Pin, I2C
from ssd1315 import SSD1315_I2C

# ===== Wi-Fi =====
SSID = "Wi-Fiネットワーク名"
PASSWORD = "パスワード"

# ===== Ambient =====
AMBIENT_CHANNEL_ID = "チャネルID"  #←数値です ””は取る
AMBIENT_WRITE_KEY = "ライトキー"

# ===== I2C =====
i2c_oled = I2C(0, scl=Pin(1), sda=Pin(0), freq=400000)
i2c_sensor = I2C(1, scl=Pin(3), sda=Pin(2), freq=400000)

oled = SSD1315_I2C(128, 64, i2c_oled, addr=0x3C)
SENSOR_ADDR = 0x40

# ===== センサー =====
def read_si7021():
    try:
        i2c_sensor.writeto(SENSOR_ADDR, b'\xE3')
        t = i2c_sensor.readfrom(SENSOR_ADDR, 2)
        temp = (175.72 * (t[0]<<8 | t[1]) / 65536) - 46.85

        i2c_sensor.writeto(SENSOR_ADDR, b'\xE5')
        h = i2c_sensor.readfrom(SENSOR_ADDR, 2)
        hum = (125.0 * (h[0]<<8 | h[1]) / 65536) - 6

        return temp, hum
    except:
        return None, None

# ===== OLED =====
def draw(temp, hum, ip):
    oled.fill(0)
    oled.text("Env Monitor", 10, 0)
    oled.text("Temp:{:.1f}C".format(temp), 0, 18)
    oled.text("Hum :{:.1f}%".format(hum), 0, 34)
    oled.text(ip, 0, 52)
    oled.show()

# ===== Wi-Fi接続 =====
wlan = network.WLAN(network.STA_IF)
wlan.active(True)

def ensure_wifi():
    if wlan.isconnected():
        return True
    wlan.connect(SSID, PASSWORD)
    for _ in range(10):
        if wlan.isconnected():
            return True
        time.sleep(1)
    return False

print("connecting wifi...")
ensure_wifi()
ip = wlan.ifconfig()[0]
print("wifi ok:", ip)

# ===== Ambient送信 =====
def send_ambient(temp, hum):
    if not ensure_wifi():
        return
    try:
        url = "https://ambidata.io/api/v2/channels/{}/data".format(AMBIENT_CHANNEL_ID)
        data = {"writeKey": AMBIENT_WRITE_KEY, "d1": temp, "d2": hum}
        r = urequests.post(url, json=data)
        print("ambient:", r.status_code)
        r.close()
        gc.collect()
    except Exception as e:
        print("ambient error:", e)

# ===== Webサーバー =====
srv = socket.socket()
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(('0.0.0.0', 80))
srv.listen(1)
srv.settimeout(0.2)

latest_temp = 0
latest_hum = 0

def handle_web():
    try:
        cl, addr = srv.accept()
        cl.settimeout(1)

        body = """<html>
<head>
<meta charset="utf-8">
<meta http-equiv="refresh" content="5">
</head>
<body style="text-align:center;font-family:sans-serif;margin-top:40px;">
<h1>環境モニター</h1>
<h2>温度: {:.1f} ℃</h2>
<h2>湿度: {:.1f} %</h2>
<p>IP: {}</p>
</body>
</html>
""".format(latest_temp, latest_hum, ip)

        response = "HTTP/1.1 200 OK\r\n"
        response += "Content-Type: text/html\r\n"
        response += "Connection: close\r\n"
        response += "Content-Length: {}\r\n\r\n".format(len(body))
        response += body

        cl.send(response)
        cl.close()

    except:
        pass

# ===== メインループ =====
last_sensor = 0
last_send = 0

while True:
    now = time.time()

    # センサー更新(2秒)
    if now - last_sensor > 2:
        t, h = read_si7021()
        if t is not None:
            latest_temp = t
            latest_hum = h
            draw(t, h, ip)
        last_sensor = now

    # Ambient送信(60秒)
    if now - last_send > 60:
        send_ambient(latest_temp, latest_hum)
        last_send = now
        time.sleep(2)

    # Web応答
    handle_web()
    time.sleep(0.05)

 

プログラムは、ChatGPTの助けを借りて作成しています。

 

 

 

 

注)生成AIが生成したコードには著作権の問題があります。通常、ChatGPTやCopilotの生成物はユーザーに著作権があるとされます。しかし、そうしたコードのソースとの関係で著作権侵害が起こりえる可能性もあります。したがって、個人利用の範囲にとどめ、商業利用はもとより公開も慎重であるべきと考えられます。ユニークなアルゴリズムだったりすれば著作権は守られてしかるべきですが、もちろん、ごく一般的な記述、ありふれた実装パターンまで制限されることはありえません。そんなことになれば、一切プログラムは書けなくなってしまいますから、その範囲であれば問題ないことは自明です。生成AIにプログラミングさせたのなら、生成AIにありふれた実装パターンであるかたずねてみたらどうでしょう。

Raspberry Pi Pico Wで温湿度を測定しOLED表示

↑測定値をWeb上で確認する際に必要なIPアドレスをOLED画面に表示させた

温湿度の測定値表示にてこずる

PR2040 TypeC 16MBとRP2040 Zero、Raspberry Pi Pico W

 

たまにはと電子工作をしてみました。今更ながらですが、使用したのは以前購入してそのまま放置していたRaspberry Pi Pico互換機とPico  Wです(旧~! Pico 2/2 Wの世代だというのに)。互換機は、PR2040 TypeC 16MBとRP2040 Zeroの2つです。

●試用結果

 

Raspberry Pi Pico WとRP2040 Zero
↑左からPR2040 TypeC 16MB、RP2040 Zero、Raspberry Pi Pico W

 

PR2040 TypeC
PR-2040 TypeC(基板裏面にYD-RP2040 2022-V1.3 VCC-GND.COMの印刷がある)は、結果的になぜかOLEDの表示に成功しません。製品の不良あるいは眼が非常に悪くなっていて半田付けがうまくできていないせいかもしれません。

・RP2040 Zero
RP2040 Zero(基板上にRP2040 Zeroの文字とWaveShareのロゴが見える)では、温湿度センサーで測定した値をOLEDに表示できました。大きなメモリー容量が必要でなければ、小型かつ安価な点でお気に入りになりそうです。

 

RP2040 Zeroで温湿度をOLED表示
↑PR2040 Zeroを使って温湿度センサーの測定値をOLEDに表示


・Raspberry Pi Pico W
Raspberry Pi Pico Wは、温湿度センサーの測定値をOLEDに表示するばかりか、Wi-Fi機能によりWeb上での表示もできるようになりました。


●OLEDの表示がおかしかったのは

 

試用は一見順調なようですが、実はOLEDをSSD1315ではなくてSSD1306とばかり思い込み、四苦八苦していました。

0.96インチのOLEDには種類があるのは知っていたはずなのに、確認を忘れてSSD1315のものを購入していたのでした。
 
それでも、多少は動き、センサーなどをつけなければOLEDが表示できたりするので、間違いに気付きにくいところはありました。

OLEDが表示しないのは、低質な温湿度センサー(Si7021)とOLEDとで処理の速さが合わないのかと、タイミングをとったり、いちいち初期化させたり、IC2を分離し2パス構成にしたり、プルアップ抵抗を入れるなどと試行錯誤の上、ようやくOLEDの違いに気付いて動かすことができたもの。

簡単な電子工作だと思ったのにこんなにてこずるとは。簡単なはずなのに恥ずかしいと思ったからこそ、ここまで粘れたのでしょうが。
 
●測定値のWebページ表示
 
Web表示された温湿度測定値
↑測定値をWeb表示させた例
 
Webページ表示というのは、クラウドサービス利用ではなく、Pico WがWi-FiでWEBサーバーとなり、センサーの測定値をブラウザを開いて確認できるようにしたもののことです。ChatGPTに教えられるまで知りませんでした。
 
温湿度センサーの値をOLEDとWebページに表示させるプログラムにしては、OLEDの勘違いでたいそうなものになってしまいました。本来なら整理が必要なんでしょうが、全文を以下に示します。
 
import network
import socket
import time
from machine import Pin, I2C
from ssd1315 import SSD1315_I2C

# ===== Wi-Fi設定 =====
SSID = "Wi-Fiネットワーク名"
PASSWORD = "パスワード"

# ===== I2C初期化 =====
i2c_oled = I2C(0, scl=Pin(1), sda=Pin(0), freq=40000)
i2c_sensor = I2C(1, scl=Pin(3), sda=Pin(2), freq=40000)

print("OLED I2C:", i2c_oled.scan())
print("SENSOR I2C:", i2c_sensor.scan())  # 0x40が見えるはず

# ===== OLED初期化 =====
oled = SSD1315_I2C(128, 64, i2c_oled, addr=0x3C)
oled.fill(0)
oled.text("Booting...", 0, 20)
oled.show()

# ===== SI7021クラス (Hold Master) =====
class SI7021:
    def __init__(self, i2c, addr=0x40):
        self.i2c = i2c
        self.addr = addr

    def read(self):
        try:
            self.i2c.writeto(self.addr, b'\xE3')
            t = self.i2c.readfrom(self.addr, 2)
            temp = (175.72 * (t[0]<<8 | t[1]) / 65536) - 46.85

            self.i2c.writeto(self.addr, b'\xE5')
            h = self.i2c.readfrom(self.addr, 2)
            hum = (125.0 * (h[0]<<8 | h[1]) / 65536) - 6
            return temp, hum
        except OSError:
            return None, None

sensor = SI7021(i2c_sensor)

# ===== Wi-Fi接続 =====
oled.fill(0)
oled.text("WiFi connecting...", 0, 20)
oled.show()

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while not wlan.isconnected():
    time.sleep(1)
ip = wlan.ifconfig()[0]
print("IP:", ip)

# ===== グローバル =====
latest_temp = None
latest_hum = None
last_update = 0

# ===== Webサーバー準備 =====
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('0.0.0.0', 80))
s.listen(1)
s.settimeout(0.1)  # 非ブロッキング
print("Server running at http://{}".format(ip))

# ===== メインループ =====
while True:
    # ---- センサー更新 ----
    if time.time() - last_update > 2:
        t, h = sensor.read()
        if t is not None:
            latest_temp = t
            latest_hum = h

            # OLED表示
            oled.fill(0)
            oled.text("Temp:{:.1f} C".format(latest_temp), 0, 0)
            oled.text("Hum: {:.1f} %".format(latest_hum), 0, 15)
            oled.text("IP:" + ip, 0, 50)
            oled.show()
        last_update = time.time()

    # ---- Web応答(非ブロッキング) ----
    try:
        cl, addr = s.accept()
        cl.settimeout(1.0)
        try:
            request = cl.recv(1024)
        except OSError:
            request = b""

        if latest_temp is None:
            body = '{"temp":null,"hum":null}'
        else:
            body = '{{"temp":{:.1f},"hum":{:.1f}}}'.format(latest_temp, latest_hum)

        # HTMLページで自動更新
        html = f"""
        <html>
        <head>
            <meta charset="utf-8">
            <meta http-equiv="refresh" content="2">
        </head>
        <body style="text-align:center;font-family:sans-serif;margin-top:40px;">
            <h1>環境モニター</h1>
            <h2>温度: {latest_temp:.1f} ℃</h2>
            <h2>湿度: {latest_hum:.1f} %</h2>
            <p>IP: {ip}</p>
        </body>
        </html>
        """
        cl.send("HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n")
        cl.send(html)
        cl.close()
    except OSError:
        # 接続なしでもループ継続
        pass

 

プログラムの作成には、ChatGPTの力を借りましたので、そのことを付記します。