お父にゃんの電子工作 -18ページ目

お父にゃんの電子工作

暇なおじさんが、電子工作(主にラジオ製作)をして勝手な感想を書く

前回、Webラジオを再生しながら基本情報を取得するところまで調べた。

いよいよWebラジオに仕立てていこうと思う。

 

回路図はこちら

 

ブレッドボードでこんな感じ

とりあえず、アンプ無しでPCM5102基板にイヤホンを挿している

 

前回のプログラムでは、ステーションを切り替える度にwebラジオ再生ソフトのmpvを停止してから再起動するやり方となっていたので、ステーション切替が非常に遅かった。

なのでChatGPTと何度も話し合った結果、mpvは止めずに、 選局と音量調整はソケットを使って切替えるように変更。その他諸々調整して、簡単なwebラジオのコードが出来た。

(コードに色を付けると、HTML文字数が60000を超えて投稿できなくなるので色なし)
import subprocess
import json
import socket
import os
import threading
import time
import requests

IPC_SOCKET = "/tmp/mpv_socket"

# ==== グローバル変数 ====
station_name = None
streamtitle  = None
bit_rate     = None
codec        = None
sample_rate  = None

# ==== mpv起動 ====
def start_mpv():
    if os.path.exists(IPC_SOCKET):
        os.remove(IPC_SOCKET)
    cmd = [
        "mpv", "--no-terminal", "--quiet",
        f"--input-ipc-server={IPC_SOCKET}",
        "--idle=yes", "--force-window=no", "--audio-display=no",
        "--msg-level=all=no"
    ]
    subprocess.Popen(cmd)
    # IPC接続待機
    for _ in range(50):
        if os.path.exists(IPC_SOCKET):
            try:
                s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                s.connect(IPC_SOCKET)
                s.close()
                print("[mpv] connected to IPC.")
                # 起動時に音量を70%に設定
                set_volume(70)
                return
            except:
                pass
        time.sleep(0.2)
    raise RuntimeError("mpv IPC接続失敗")


# ==== 音量設定 ====
def set_volume(level: int):
    """
    mpvの音量を0〜100%に設定
    """
    if level < 0: level = 0
    if level > 100: level = 100
    mpv_command(["set_property", "volume", level])
    print(f"[VOLUME] Set to {level}%")



# ==== mpvコマンド送信 ====
def mpv_command(cmd):
    try:
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        s.connect(IPC_SOCKET)
        msg = json.dumps({"command": cmd}) + "\n"
        s.send(msg.encode("utf-8"))
        resp = s.recv(4096).decode("utf-8")
        s.close()
        return json.loads(resp)
    except Exception:
        return None


# ==== .pls / .m3u を展開して実ストリームURLを取得 ====
def resolve_stream_url(url):
    try:
        if url.endswith(".pls"):
            r = requests.get(url, timeout=10)
            for line in r.text.splitlines():
                if line.lower().startswith("file1="):
                    return line.split("=", 1)[1].strip()
        elif url.endswith(".m3u") or url.endswith(".m3u8"):
            r = requests.get(url, timeout=10)
            for line in r.text.splitlines():
                line = line.strip()
                if line and not line.startswith("#"):
                    return line
        return url
    except Exception as e:
        print(f"[resolve] failed to resolve {url}: {e}")
        return url


# ==== ffprobeでステーション情報取得 ====
def get_station_info(url):
    global station_name, bit_rate, codec, sample_rate
    try:
        cmd = [
            "ffprobe",
            "-v", "error",
            "-analyzeduration", "20000000",
            "-probesize", "20000000",
            "-show_entries", "format=format_name,bit_rate:format_tags=icy-name",
            "-show_entries", "stream=codec_name,sample_rate,channels",
            "-of", "json",
            url
        ]
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
        data = json.loads(result.stdout)

        # ステーション名
        station_name = data.get("format", {}).get("tags", {}).get("icy-name", "(unknown)")

        # コーデック・サンプルレート
        streams = data.get("streams", [])
        codec, sample_rate = None, None
        for s in streams:
            if "codec_name" in s:
                codec = s.get("codec_name")
            if "sample_rate" in s and s.get("sample_rate"):
                sample_rate = int(s.get("sample_rate"))

        # ビットレート
        br = data.get("format", {}).get("bit_rate")
        bit_rate = int(br) if br else None

        print(f"[ffprobe] Station Info: {{'station': {station_name}, 'codec': {codec}, 'samplerate': {sample_rate}, 'bitrate': {bit_rate}}}")

    except Exception as e:
        print(f"[ffprobe] probe failed: {e}")
        station_name, bit_rate, codec, sample_rate = "(unknown)", None, None, None


# ==== 曲情報監視 (mpv IPC) ====
def now_playing_monitor():
    global streamtitle
    while True:
        try:
            resp = mpv_command(["get_property", "metadata"])
            if resp and "data" in resp:
                meta = resp["data"]
                title = (
                    meta.get("icy-title")
                    or meta.get("StreamTitle")
                )
                # Vorbis系 (artist + title)
                if not title:
                    artist = meta.get("artist")
                    track  = meta.get("title")
                    if artist or track:
                        title = f"{artist or ''} - {track or ''}".strip(" -")

                if title and title != streamtitle:
                    streamtitle = title
                    print(f"[Now Playing] {streamtitle}")
        except Exception as e:
            print(f"[mpv metadata error] {e}")
        time.sleep(2)


# ==== ステーション再生 ====
def play_station(url):
    global station_name, streamtitle, bit_rate, codec, sample_rate
    # 変数を初期化
    station_name, streamtitle, bit_rate, codec, sample_rate = None, None, None, None, None

    real_url = resolve_stream_url(url)
    print(f"\n=== Switching to station: {url} ===")
    mpv_command(["loadfile", real_url, "replace"])

    # ffprobe を別スレッドで実行
    threading.Thread(target=get_station_info, args=(real_url,), daemon=True).start()


# ==== メイン ====
def main():
    start_mpv()

    # Now Playing 監視スレッド起動
    threading.Thread(target=now_playing_monitor, daemon=True).start()

    # テスト用ステーション
    stations = [
        "https://somafm.com/digitalis130.pls",
        "http://stream.radioparadise.com/rock-320",
        "http://streamingp.shoutcast.com/JamendoLounge",
        "https://listen.moe/stream",
        "http://www.101smoothjazz.com/101-smoothjazz.m3u"
    ]

    for url in stations:
        play_station(url)
        time.sleep(20)  # 各局を20秒ずつ再生


if __name__ == "__main__":
    main()

 
 

ストリームタイトル、ステーション名、コーデック、サンプルレート、ビットレートも取得できる

 

次に、このベースとなるコードに以前作っておいた液晶画面表示とロータリーエンコーダーの操作のコードを追加して動かしていく。(プログラミングが苦手なおじさんはかなり苦労する)

そしたら、何故か音が出なくなった

訳が分からなくて、PCM5102のI2S端子をオシロで見ていくと、GPIO18のBCK(PCM_CLK)だけが動作しなくなっている。

ChatGPTに訊いてみると、

sudo raspi-gpio get
を実行した結果を返せというので返した。

 

電源投入時 GPIO18-21は全てPCMに割り付けられている

 

おじさん書いたソフトを走らせると・・

 

にゃんと、勝手にGPIO18がPCMからoutputに切り替わっているではないか!
で、ChatGPTとやり取りして、犯人は液晶を制御するLuma.lcdである事が判明。

LCDのバックライトを調整する目的でGPIO18を勝手にPWM機能に割り当ててしまうのである。

 

ChatGPTにどうすれば良いか訊いたら、Luma.lcdのコードを修正してGPIO18に触らないように変更するか、LCDの初期化が終わった時点で、強制的にGPIO18の設定を元に戻すのどちらかだという。

と、いうことでLCDの初期化後に強制的にGPIO18の設定を元に戻すことにした

 

start_mpv()
 
# SPI設定、LCD初期化
serial = spi(port=0, device=0, gpio_DC=24, gpio_RST=25,bus_speed_hz=52000000)
device = st7789(serial, width=320, height=240, rotate=0)
 
# ★ここで強制的にI2S BCLKに戻す
subprocess.run(["raspi-gpio", "set", "18", "a0"])

 

なんとも後味が悪いが、これでちゃんと音が出るようになった。

かなり焦ったが、何とかなって良かった。(Raspberry Piはこういうことが多くて困る)

 

とりあえず、ビットレートとストリームタイトルは表示する。

 

エンコーダーを回して選局も出来る。

 

まだ音量調整や詳細表示モード等のコードがまだ書けていない。選局中の文字色変化やも未対応で文字の表示位置とかも微調整が必要。やらなきゃいけない事が山積みなのである。

しかし、でも、まぁ、道のりは遠いが、何となく遠くにゴールは見えてきているのである。

 

一応、今回のコードは以下に置いておきます。未完成なので参考程度です。
興味のある方はどうぞ。

 

Web_Radio_V0_1

 

ちゃんと動くものは まだまだ時間がかかるので、待っていて欲しい

 

 

「待っていればちゃんと動くのかニャ?」

ちょっと自信無い