お父にゃんの電子工作 -19ページ目

お父にゃんの電子工作

暇なおじさんが、電子工作(主にラジオ製作)をして勝手な感想を書く

以前に作ったESP32S3を使ったWebラジオ↓

 

使い勝手・操作感も良いし、とても良いものが出来たと自画自賛しているのである。

今でも毎日電源を入れて使っている。

 

しかし1点だけ不満がある。ステーションによっては音が途切れてしまって まともに聞けないのである。

例えばこちら等↓

Epic Lounge - PIANO & JAZZ BAR

Jazz London Radio

Free FM 80 Tokyo

FMぎのわん

 

シリアルモニタでは

slow stream, dropouts are possible

 

と表示されて、どこかでデータ処理が滞っているようだ。

PC等で再生するには全く問題がないので、ESP32のどこかでデータ処理能力が不足しているのだろう。なにか設定とかで改善できないかと少し探ってみたが改善しなかった。

 

ESP32S3でダメならもっと強力なCPUのRaspberry Pi Zero 2 Wを使ってWebラジオを作ってみたらどうなのか、試してみたいのである。

 

そう、そのために、これまでRaspberry Pi Zero 2Wの使い方を調べてきたのである。おかげで基本的な使い方は何となくわかってきたので、何とかなるぢゃろう。

 

まず、回路図はこちら

 

ブレッドボードでこんな感じ

 

以前の記事で書いた通り、I2Sの有効化とpigpioがPWMタイマーを使う設定は必須)

 

で、早速ChatGPTに とりあえず指定したurlを再生する簡単なpythonプログラムをお願いした。いつもの通り、いろいろやり取りして、試行錯誤の結果、以下のようになった。

 

最初にWebラジオの再生ソフトのmpvをインストール

sudo apt update
sudo apt install mpv

 

で、このコードをコピー

import subprocess
 
url = "https://stream.epic-lounge.com/piano-jazz-bar?ref=radiobrowser"
 
try:
    print("再生開始...")
    proc = subprocess.Popen([
        "mpv",
        "--audio-device=alsa/hw:1,0",
        "--volume=30",   # 音量30%
        "--no-terminal", # 端末に余計な出力をしない
        url
    ])
    proc.wait()
 
except KeyboardInterrupt:
    print("停止します...")
    proc.terminate()

 

RealVNCでリモート接続し、Thonnyに貼り付けて実行

 
スタートを押してから音が鳴るまでタイムラグが有るが、問題なく綺麗に聞こえる。
最初の方に挙げた、ESP32S3のWebラジオだと音が途切れてしまうステーションを全部試してみたが、何の問題も無く再生できた。

おお、これは期待できそうだ。

しかし、このままではビットレートやストリームタイトル(曲名等)が分からない。
いつものようにChatGPTにお願いしてみる。

しかし、ここからが長かった
おそらく50回くらい、あ~でもない、こ~でもないを繰り返して何となく動くものが出来た。
ストリーム基本情報はffprobeで取得、再生はmpvと分けて処理するのが安定して良いようだ。
3分ごとにリストされているURLを切り替えるプログラム
import subprocess
import socket
import time
import json
import os
import signal
import threading
import requests
 
stations = [
    "http://www.181.fm/stream/pls/181-power.pls",
    "https://stream.epic-lounge.com/piano-jazz-bar?ref=radiobrowser",
    "https://listen.moe/stream",
    "http://playerservices.streamtheworld.com/pls/977_HITS.pls",
]
 
IPC_SOCKET = "/tmp/mpvsocket"
current_title = None
stop_flag = False
 
def resolve_playlist(url):
    """pls/m3uを実ストリームURLに変換"""
    if url.endswith(".pls") or "pls" in url:
        try:
            txt = requests.get(url, timeout=5).text
            for line in txt.splitlines():
                if line.lower().startswith("file"):
                    return line.split("=", 1)[1].strip()
        except Exception:
            pass
    elif url.endswith(".m3u") or url.endswith(".m3u8") or "m3u" in url:
        try:
            txt = requests.get(url, timeout=5).text
            for line in txt.splitlines():
                if line and not line.startswith("#"):
                    return line.strip()
        except Exception:
            pass
    return url
 
def start_mpv(url):
    if os.path.exists(IPC_SOCKET):
        os.remove(IPC_SOCKET)
 
    cmd = [
        "mpv",
        url,
        "--no-video",
        f"--input-ipc-server={IPC_SOCKET}",
        "--http-header-fields=Icy-MetaData:1",
        "--metadata-codepage=utf-8",
        "--quiet"
    ]
    return subprocess.Popen(cmd)
 
def stop_mpv(proc):
    if proc:
        proc.send_signal(signal.SIGTERM)
        proc.wait()
 
def send_command(cmd_dict):
    try:
        client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        client.connect(IPC_SOCKET)
        msg = json.dumps(cmd_dict) + "\n"
        client.send(msg.encode("utf-8"))
        data = client.recv(4096).decode("utf-8")
        client.close()
        return json.loads(data)
    except Exception:
        return None
 
def get_metadata():
    """icy-title, あるいは Artist+Title を返す"""
    resp = send_command({"command": ["get_property", "metadata"]})
    if resp and "data" in resp and isinstance(resp["data"], dict):
        data = resp["data"]
        if "icy-title" in data and data["icy-title"]:
            return data["icy-title"]
        # Vorbis/Opus の場合
        artist = data.get("Artist")
        title = data.get("Title")
        if artist or title:
            return f"{artist or ''} - {title or ''}".strip(" -")
    return None
 
def poll_metadata():
    """別スレッドでタイトルを監視"""
    global current_title, stop_flag
    while not stop_flag:
        title = get_metadata()
        if title and title != current_title:
            current_title = title
            print("Now Playing:", title)
        time.sleep(2)  # 2秒間隔でポーリング
 
def ffprobe_info(url):
    """ffprobe でストリーム情報を取得"""
    try:
        cmd = [
            "ffprobe",
            "-v", "error",
            "-select_streams", "a:0",
            "-show_entries", "stream=codec_name,bit_rate,sample_rate",
            "-of", "json",
            url
        ]
        res = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
        if res.returncode == 0:
            data = json.loads(res.stdout)
            if "streams" in data and data["streams"]:
                s = data["streams"][0]
                codec = s.get("codec_name", "unknown")
                br = s.get("bit_rate", "unknown")
                sr = s.get("sample_rate", "unknown")
                print("---- ストリーム基本情報 ----")
                print(f"Codec       : {codec}")
                print(f"Bitrate     : {br}")
                print(f"Sample Rate : {sr}")
                print("----------------------------")
    except subprocess.TimeoutExpired:
        print("ffprobe timeout")
    except Exception as e:
        print("ffprobe error:", e)
 
def async_ffprobe(url):
    """別スレッドで ffprobe 実行"""
    th = threading.Thread(target=ffprobe_info, args=(url,), daemon=True)
    th.start()
 
if __name__ == "__main__":
    idx = 0
    while True:
        url = stations[idx]
        real_url = resolve_playlist(url)
 
        print(f"\n=== Switching to station {idx} ===")
        print("元URL:", url)
        print("実URL:", real_url)
 
        proc = start_mpv(real_url)
 
        # mpv IPC ソケット準備待ち
        for _ in range(20):
            if os.path.exists(IPC_SOCKET):
                break
            time.sleep(0.2)
 
        # ffprobe は別スレッドで実行
        async_ffprobe(real_url)
 
        # タイトル監視スレッド開始
        stop_flag = False
        current_title = None
        t = threading.Thread(target=poll_metadata, daemon=True)
        t.start()
 
        # 1分待機
        time.sleep(180)
 
        # スレッド停止
        stop_flag = True
        t.join(timeout=2)
 
        stop_mpv(proc)
 
        # 次の局へ(最後なら先頭に戻る)
        idx = (idx + 1) % len(stations)
 
このように、基本情報とストリームタイトルが取得できるようになった。
 
これで、基本機能は揃ったので、今までやってきたことを全て統合してWebラジオに仕立ててみたい。

言うのは簡単だが、上手くいくのだろうか?
プログラミングが苦手なおじさんに、ちゃんと全部丸く収められるだろうか?
不安なのである。


「このようにちゃんと、丸く収めるのニャ」
確かに丸くなっとりますな・・・