テーマ:
この記事は、CyberAgent エンジニア Advent Calendar 2014 の6日目の記事です。
5日目はnekoruriさんのAmeba等で利用しているOpenStack Swiftを利用したオブジェクトストレージ
7日目はoinumeさんのGoLangでJavaのenumっぽいライブラリ作った話です。


こんにちは、ピグ事業部のIshimura(Twitter, Github)といいます。アメーバピグのサーバサイドエンジニアをしています。ユニットテストとリファクタリングが好物です。

今回はタイトル通りなのですが、アメーバピグでGoogle BigQueryに実際にログを突っ込むまでに行った設定を記します。アメーバピグではBigQueryを各種施策の検討・評価のための分析用に利用する予定です。

BigQueryの特徴やメリットはググれば(Googleだけに)たくさん出てくるので割愛しますが、一言で言うと「はやい、やすい、うまい」です。

システム構成

アプリサーバ(約30台)からログを収集用のサーバ(2台)にFluentdのforwardプラグインで送って、そこからfluent-plugin-bigqueryを使ってBigQueryのテーブルにinsertするという鉄板パターンです。Fluentd様々です。

ログの種類

アメーバピグでの各種行動時(会話・買い物など)で発生する行動ログを送っています。フォーマットはTSVで、ピーク時でだいたい4000行/sくらい発生します。1時間ごとにログローテされていて、「activity.log.2014-12-06_01」的なファイル名でログを吐きます。

Fluentdの設定

■アプリサーバ側
<ポイント>
tailで受けてforwardで流しているだけなので特筆する点はあまり無いのですが、ファイル名が動的に変わるので、日時のフォーマットを指定して(%Y-%m-%d_%H)動的にtailするファイルを変えていたり、fluent-plugin-config-expanderを利用して、hostnameをタグにつけていたりしています。(調査時に受け取る側で特定のhostからログをちゃんと受け取れているかをチェックするため)
<source>
  type config_expander
  <config>
    type tail
    path /usr/local/app/logs/activity.log.%Y-%m-%d_%H
    format tsv
    pos_file /var/log/td-agent/activity.log.pos
    refresh_interval 2s
    keys datetime,activity_id,user_id,content
    tag bq.activity.${hostname}
  </config>
</source>

<match bq.activity.**>
  type forward

  buffer_type memory
  buffer_chunk_limit 256k
  buffer_queue_limit 1024
  retry_wait 5
  retry_limit 5
  flush_interval 1s
  num_threads 1
  <server>
    name aggr_server01
    host xx.xx.xx.xx
    port 24224
  </server>
  <server>
    name aggr_server02
    host yy.yy.yy.yy
    port 24224
  </server>
</match>
■収集用のサーバ側
<ポイント>
後述するテーブルローテーションに対応するため、fluent-plugin-rewrite-tag-filterでログ内の日付フィールド(datetime)から年月をタグにつけ、fluent-plugin-forestでタグに付与された年月に対応したテーブルにinsertできるようにしています。

insert量のチューニングでは、最初は余裕を見ているつもりでbuffer_chunk_records_limit=300, buffer_chunk_limit=768kにしていたのですが、なぜか「413:Request Entity Too Large」がたまに発生したので、更に値を下げてGoogle側の制限の半分に設定しました。
<match bq.activity.**>
  type rewrite_tag_filter
  capitalize_regex_backreference yes

  # Datetime format :=> "2014-01-23 12:34:56"
  rewriterule1 datetime ^(\d{4})-(\d{2})-.+$ rewrited.bq.activity.$1$2
</match>

<match rewrited.bq.activity.**>
  type forest
  subtype bigquery

  <template>
    method insert

    flush_interval 0
    try_flush_interval 0.05          
    queued_chunk_flush_interval 0.01

    buffer_chunk_records_limit 250   # BQ側の最大値が500行
    buffer_chunk_limit 512k          # BQ側の最大値が1mb
    buffer_queue_limit 1024          # チャンクをいくつ保持しておくか(メモリの許す限りなるべく多めに)

    retry_limit 5                    
    retry_wait 1s                    
    num_threads 32                   # 送信処理を行うスレッド数(多めにしておく)

    auth_method private_key
    email **********@developer.gserviceaccount.com
    private_key_path /etc/bigquery/**********.p12

    project **********
    dataset pigg
    tables pigg_activity_${tag_parts[-1]}

    schema_path /etc/bigquery/bq_schema_pigg_activity.json
  </template>
</match>

テーブルローテーション

BigQueryはスキャンしたデータのサイズで課金額が増えるので余計な課金を防ぐために月ごとにテーブルを分ける事にしました。ただ、毎月テーブルを手動で作るのはプログラマの美徳に反するので以下のRubyスクリプトをcronで定期実行して翌月分のテーブルを作成しています。

※注意

google-api-ruby-clientは実装時点(2014/11)で、普通にgem  installした場合、依存ライブラリ(retriable)のバージョンアップの関係で実行時にエラーになるので、gituhubのmasterのコードをspecific_installを使ってインストールしました。
<参考>:  https://github.com/google/google-api-ruby-client/issues/164
require 'date'

class Pigg
  class BigQueryApiClient
    def initialize(private_key_path, email)
      require 'google/api_client'
      require 'google/api_client/client_secrets'
      require 'google/api_client/auth/installed_app'
      require 'json'

      client = Google::APIClient.new(
        :application_name => 'pigg-activity-table-checker',
        :application_version => '1.0.0'
      )

      key = Google::APIClient::KeyUtils.load_from_pkcs12(private_key_path, 'notasecret')

      client.authorization = Signet::OAuth2::Client.new(
        :token_credential_uri => 'https://accounts.google.com/o/oauth2/token',
        :audience => 'https://accounts.google.com/o/oauth2/token',
        :scope => 'https://www.googleapis.com/auth/bigquery',
        :issuer => email,
        :signing_key => key)
      client.authorization.fetch_access_token!

      bq_api = client.discovered_api('bigquery', 'v2')

      @client = client
      @bq_api = bq_api
    end

    def table_exists?(project, dataset, table)
      res = @client.execute(
                :api_method => @bq_api.tables.get,
                :parameters => {
                    :projectId => project,
                    :datasetId => dataset,
                    :tableId => table
                }
      )
      # 404(NotFound)も正常と見なす
      unless res.success? || res.status == 404
        raise "#{res.status} #{res.body}"
      end
      res.success?
    end

    def insert_table(project, dataset, table, schema_file_path)
      body = { :tableReference => { :tableId => table } }
      schema = open(schema_file_path) do |io|
        JSON.load(io)
      end
      body['schema'] = { :fields => schema }

      res = @client.execute(
          :api_method => @bq_api.tables.insert,
          :parameters => {
              :projectId => project,
              :datasetId => dataset
          },
          :body_object => body
      )

      unless res.success?
        raise "#{res.status} #{res.body}"
      end
    end
  end
end

# 翌月のactivityログ用のBigQueryのテーブルがあるかをチェックし、無ければ作成する
# 書式は(pigg_activity_yyyymm)
private_key_path = '/etc/bigquery/**********.p12'
email = '**********@developer.gserviceaccount.com'
project = '**********'
dataset = 'pigg'
schema_file_path = '/etc/bigquery/bq_schema_pigg_activity.json'
table_id_format = 'pigg_activity_%Y%m'
# 来月のテーブルIDを生成する
table = Date.today.next_month.to_time.strftime(table_id_format)

client = Pigg::BigQueryApiClient.new(private_key_path, email)
# テーブルが無ければ生成
unless client.table_exists?(project, dataset, table)
  client.insert_table(project, dataset, table, schema_file_path)
  puts "#{Time.now} Table(#{table}) created."
end
こんな感じで今は安定してBigQueryにログを送れている状態です。実際クエリを叩いてみた感想はやっぱり噂通りの速度でした。。思い立ってから2週間程度でこの環境が手に入るという先人たちの努力に感謝ですねっ。

あわせて読みたいアメーバピグ関連の記事たち

アメーバピグにおけるDB構成&対応記
Node.js Cluster+Socket.IO+Redisによるリアルタイム通知システム
ピグ麻雀のアルゴリズム
アメーバピグのソケットサーバーたち
いいね!した人  |  リブログ(0)