はじめに

暗号資産の自動売買 BOT や相場データバックエンドを開発した経験がある方なら、オーダーブック配信で 2 大致命的な不具合に悩まされたことがあるはずです。 REST のポーリングを使うとすぐアクセス制限に引っ掛かり、長時間稼働するたびローカルの板情報が市場実態と徐々にズレていきます。 また、一般的な WebSocket 実装では監視銘柄を追加・削除するたび接続を切り直す仕組みのため、相場が急変し複数銘柄を一斉切り替えると再接続ストームが発生、実取引とバックテストのデータが完全に乖離します。

本記事で紹介する保存構造・差分更新ロジック・Python 実装コードの通信仕様は AllTick API を基に作成しています。メモリ保存の設計、シーケンス番号による整合性検証、本番環境向けエラー回避策、すぐ実行可能なコードまで分かりやすく解説します。クオンツ初心者から相場基盤のエンジニアまで活用できる内容です。

よく見かける 2 つの不完全なオーダーブック実装

1. REST 周期ポーリングで全スナップショット取得

初心者に多い簡易実装手法ですが、重大なデメリットが複数存在します。 頻繁なリクエストで API 制限に抵触する上、差分配信に対応していないため毎回全板データを上書き・ソートする必要があり CPU 負荷が高止まります。 長時間連続稼働させると買い板・売り板の価格帯が市場とズレ、アービトラージやマーケットメイキングのシグナルが不正確になり、バックテストの信憑性が失われます。

2. 銘柄切り替え時に WebSocket を切断・再作成

基礎的な WebSocket 購読ロジックの欠点として、監視ペアの追加・削除のたびソケット接続を破棄し再ハンドシェイクする点が挙げられます。 相場高変動時に複数銘柄を一斉切り替えると大量の再接続リクエストが発生する「再接続ストーム」が起き、サーバー側の seq シーケンス番号がリセット。ローカルに保存した深度キャッシュが全消去され新旧データが分断され、事後検証が困難になります。

本記事の核心である単一長時間接続による動的購読を採用すれば、同一接続のまま銘柄の監視範囲を変更可能、seq 番号が途切れず深度保存データが不用意に消去されないため、手動によるデータ校正工数を大幅に削減できます。

3 種類の相場取得手法比較表

Ameba Blog で見やすいシンプルな表形式で整理しました。

表格

接続手法 深度保存・更新時の課題 単一接続動的購読のメリット
REST 深度ポーリング 高頻度リクエストで制限に抵触、毎回全データ上書き、ソート処理で CPU 負荷増加、差分配信なし 変更のある価格帯のみ diff 配信、メモリの部分更新のみ実行、周期リクエスト不要
通常 WebSocket(銘柄切替時再接続) 銘柄追加削除で接続破棄、seq リセットにより深度キャッシュ消去、一斉切替時に再接続ストーム発生 同一接続内で監視リスト変更、seq が連続して更新、他銘柄の深度データは維持
複数 WebSocket 並列接続 回線ごとにハートビート・キャッシュ・更新ロジックを個別実装、メモリ・帯域を大量消費 1 回線で数十銘柄を一括管理、ハートビート・seq 検証・更新処理を統一、コードが簡素化

オーダーブック深度 保存・更新の核心アーキテクチャ

1. 2 階層辞書による軽量メモリ保存構造

相場 API はオーダーブック全体を送信せず、変更が発生した価格帯のみ配信する仕様です。

  • size > 0:価格帯を新規追加、または注文数量を更新
  • size = 0:対象の価格帯を完全削除

推奨するメモリ格納フォーマット

plaintext

{code: {"bids": {}, "asks": {}, "last_seq": シーケンス番号}}

価格をキー、注文量を値として辞書に保管。更新時は追加・削除のみ実行し、価格のソート処理は戦略が板情報を参照するタイミングまで遅延させ、高頻度 Tick 配信時の処理停滞を防ぎます。

2. cmd_id=22004 統一コマンドによる銘柄管理

購読・解除の操作は全て同一コマンドcmd_id=22004を使用し、actionパラメータで動作を分岐させます。

  • sub:銘柄を監視に追加、独立した深度保存領域を自動作成
  • unsub:銘柄の監視を解除、使用していたメモリ領域を解放

銘柄はcodeで一意に識別(BTCUSDT、ETHUSDT など)、接続を切断せず監視対象を切り替えられるため、既存の深度データや seq 番号が破棄されません。

3. seq 連続性検証で長期稼働時の板ズレを防止

各差分パケットには自動インクリメントされる seq 番号が付与され、銘柄ごとに直前の番号last_seqを保持します。

  1. 受信した seq = last_seq + 1 の場合のみ深度データを更新
  2. 番号が飛んだ・不連続な場合、該当銘柄の bids/asks を全消去し、全スナップショットを取得して再初期化

この検証ロジックが長時間稼働時の徐々な板ズレを防ぐ鍵となります。

4. 生データ保存と表示ソートの分離

底层の辞書は生の深度データの保管・差分更新だけを担当し、買い板・売り板の価格ソートはアルゴリズムがデータを読み込む瞬間のみ実行します。更新処理の流れに重いソート演算を挟まないことで、相場急変時の遅延を抑えます。

実務シナリオ別設定一覧表

開発現場で頻出する 5 パターンをまとめました。

表格

実行シナリオ 深度保存時の課題 設定パラメータ 動作検証基準
起動時に複数銘柄を一括購読 初期化時に複数回スナップ取得が発生、保存領域作成が煩雑 cmd_id=22004,action="sub",code=["BTCUSDT","ETHUSDT"] on_open コールバック内でコマンド送信、各 code に独立した深度領域が作成
実行中に SOLUSDT を追加監視 従来手法では再接続が必要、既存深度が全消去 cmd_id=22004,action="sub",code=["SOLUSDT"] 接続維持、SOLUSDT 用の領域だけ新規作成、他銘柄のデータは保持
取引終了時 ETCUSDT の監視解除 不要な差分が継続配信されメモリを消費 cmd_id=22004,action="unsub",code=["ETCUSDT"] ローカル購読リストから削除、対象銘柄の配信が停止
同一銘柄を重複購読送信 重複配信により無駄なメモリ上書きが発生 cmd_id=22004,action="sub",code=["BTCUSDT"] ローカル集合で重複を除外、重複コマンドは送信しない
空の code 配列でコマンド送信 無効リクエストで帯域を消費 cmd_id=22004,action="sub",code=[] 配列長を事前判定、送信処理をスキップ

開発で頻出するトラブルと対策

トラブル 1:大量 Tick が一気に到達、更新キュー滞留で古い板を参照

現象:相場急変時毎秒数十件の差分が届きコールバックが追いつかず、戦略が過去の価格帯を読み取る。 検知方法:ログに 1 秒間の受信件数・単一更新の処理時間を出力し監視。 回避策:メッセージ受信とメモリ書き込みを別スレッドのキューで分離。ローカル深度は上位 50 段のみ保持しメモリ消費を制限。

トラブル 2:ネット不安定による Socket 疑似生存、深度更新が停止

現象回線揺れが起きても切断エラーが出ず、API からの配信だけ停止し板情報が静止したまま。 検知方法:10 秒間隔のハートビートを設定、連続周期内に業務データが届くか確認。 回避策:3 回連続で深度データが届かない場合自動再接続、全深度キャッシュをクリア後再購読・スナップ取得。

トラブル 3:短時間で銘柄追加・削除を連打するとゴースト購読発生

現象:sub/unsub を連続送信するとローカル購読リストとサーバー配信が不整合、監視解除した銘柄の差分が届き続ける。 検知方法:コマンド送信後に現在の監視銘柄一覧を出力、受信データの code と照合。 回避策:購読操作にスレッドロックをかけ逐次実行。メッセージ受信時に code が有効リストに存在するか事前チェック。

トラブル 4:code 記述ミスで購読が静かに失敗

現象:BTC-USDT のように区切り文字を誤記するとコマンド送信は成功するが差分が一切届かずエラーログも出ない。 検知方法:定期的にアクティブな監視銘柄と深度保存状態をログ出力。 回避策:正しい銘柄 code のホワイトリストを実装、送信前に書式検証を行い警告ログを出力。

本手法の対応・非対応機能

実装可能な機能

  1. 単一 WebSocket 長時間接続内で暗号資産銘柄を自由に追加・削除、各銘柄の深度領域を自動作成・破棄
  2. seq 番号による連続性検証、不連続時にローカル深度を自動リセット
  3. 差分 diff による軽量更新、ハートビートで接続安定維持
  4. 複数銘柄の深度領域を分離、他ペアのデータが混ざらない

対応できない機能

  1. 複数 WebSocket 接続間でローカル深度データを同期
  2. リアルタイム API を通じた過去深度 Tick の遡及取得・永続保存
  3. cmd_id=22004以外の独自カスタム購読コマンド

Python 完全実行コード(Ameba 読者向けコピー OK)

import websocket
import json
import threading

# 暗号資産向けWebSocket接続先
WS_CRYPTO_URL = "wss://quote.alltick.co/quote-b-ws-api?token=YOUR_TOKEN"

# グローバル保存領域
subscriptions = set()
order_book_depth_storage = {}  # {code: {"bids": {}, "asks": {}, "last_seq": 0}}

def send_sub_cmd(ws, action: str, code_list: list):
    """購読変更コマンド送信関数 cmd_id=22004"""
    if not code_list:
        return
    cmd_payload = {
        "cmd_id": 22004,
        "action": action,
        "code": code_list
    }
    ws.send(json.dumps(cmd_payload))
    # ローカルの購読状態を同期
    if action == "sub":
        for code in code_list:
            subscriptions.add(code)
            if code not in order_book_depth_storage:
                order_book_depth_storage[code] = {"bids": {}, "asks": {}, "last_seq": 0}
    elif action == "unsub":
        for code in code_list:
            if code in subscriptions:
                subscriptions.remove(code)

def update_depth_storage(side_map: dict, price: float, size: float):
    """深度データ更新ロジック"""
    if size <= 0:
        side_map.pop(price, None)
    else:
        side_map[price] = size

def on_open(ws):
    """接続確立時 初期銘柄購読"""
    init_codes = ["BTCUSDT", "ETHUSDT"]
    send_sub_cmd(ws, "sub", init_codes)
    print("接続完了、初期購読銘柄:", init_codes)

def on_message(ws, message):
    """差分深度メッセージ処理"""
    if not message:
        return
    try:
        data = json.loads(message)
        msg_type = data.get("type")
        if msg_type != "orderbook_diff":
            return
        code = data.get("code")
        seq = data.get("seq")
        side = data.get("side")
        price = float(data.get("price", 0))
        size = float(data.get("size", 0))

        # 監視解除済み銘柄のデータは破棄
        if code not in subscriptions:
            return
        depth_cache = order_book_depth_storage[code]
        last_seq = depth_cache["last_seq"]

        # seq不連続時 深度をクリア
        if last_seq != 0 and seq != last_seq + 1:
            depth_cache["bids"].clear()
            depth_cache["asks"].clear()
            depth_cache["last_seq"] = 0
            print(f"{code} シーケンス不連続、深度データリセット")
            return

        # 買い/売り板更新
        target_map = depth_cache["bids"] if side == "bid" else depth_cache["asks"]
        update_depth_storage(target_map, price, size)
        depth_cache["last_seq"] = seq

    except Exception as e:
        print("メッセージ解析エラー:", str(e))

def on_error(ws, error):
    print("WebSocket接続エラー:", error)

def on_close(ws, close_code, close_msg):
    print("接続切断、全深度データクリア")
    subscriptions.clear()
    order_book_depth_storage.clear()

if __name__ == "__main__":
    ws_app = websocket.WebSocketApp(
        WS_CRYPTO_URL,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    # 10秒ハートビート、5秒タイムアウト
    ws_app.run_forever(ping_interval=10, ping_timeout=5)

運用時の推奨ルール(ブログ読者向け実践 Tips)

  1. プログラム起動時にコア銘柄を一括購読、実行中の銘柄追加削除は専用関数だけ使用し Socket を切断しない
  2. 30 分に 1 回 REST 全深度スナップ API を呼び出し、メモリ内データを校正し長期稼働の微小ズレを補正
  3. 価格ソートは戦略がデータを読むタイミングまで遅延、Tick 受信時の CPU 負荷を抑える
  4. ログには seq 不連続、不正 code、ハートビートタイムアウトの 3 点を重点出力、障害調査を簡単に
  5. 複数銘柄同時監視時は銘柄ごとに深度領域を分離、異なるペアのデータが混ざらないよう管理
  6. 深度データを永続保存したい場合は定時スナップをディスクに書き込み、ライブ更新処理をブロックしない