Apache Igniteとのインメモリーコンピューティング | サイバーエージェント 公式エンジニアブログ
シュティフ ロマン@秋葉原ラボ
@rshtykh

初めに

全世界に無数に散らばったサービスやデバイスから生み出されるデータ量の急激な増大に伴って、ストリーム処理とインメモリーコンピューティングは避けられない近未来であり、重要なデータ処理パラダイムでもあると言える。ストリーム処理とインメモリーコンピューティングの融合により、多様なデータから新しい価値を発見し、新たなビジネスチャンスを引き出した利益拡大が可能である。なぜなら、データの価値はデータの新しさ(freshness)と強い相関関係を持つ。

インメモリーコンピューティングは、Hadoopのようにスポットライトを浴びるテクノロジーではないが、以前数回も期待の技術としてハイライトがされることがあり、メモリーの大容量化と低コスト化が進められる今こそ、その期待度が高まっている。
低価格DRAMは、ディスクより速い。さらに、DRAMのI/Oは一般的にフラッシュメモリーのI/Oより1000x倍速い。

データストレージ観点から、分散キャッシュ中心の以下のような素晴らしいインメモリー技術がある
─ 
Apache Ignite [1]
─ 
Hazelcast [2]
─ Apache Geode [3]
─ Infinispan [4]
上記のソリューションは、データストレージのみならず、メモリー上に格納されるデータの超高速な処理機能も提供する。


Apache Ignite

Apache Ignite In-Memory Data Fabricはリアルタイムで(トランザクションを含め)膨大なデータの処理を行うための高性能分散インメモリープラットフォームである[5]。インメモリーストレージのみならず、Igniteクラスター上でMapReduceのような様々な分散処理を行ったり、Apache Spark[6]との統合ができたりするプラットフォームでもある。

つまり、データグリッド+Key-Valueインターフェース、インデックス検索、自動データ再分配、トランザクションサポート等を備えた計算グリッド。

ApacheIgnite

(画像引用: http://ignite.apache.org/images/apache-ignite.png)

また、Apache Ignite(以下、Ignite)は高いスケーラビリティや耐障害性を実現している。計算ロジックのソースコードをJAR化し各ノードに転送したり、ノードを再起動したりすることを要しないZero DeploymentというP2Pクラスロード技術もサポートしている。

Igniteクラスタ

Igniteクラスタをセットアップするには、Igniteインスタンスを幾つか立ち上げることになる。デフォルト設定のマルチキャストでノード探索が行われる。マルチキャストを利用できない場合、IPアドレスやポート番号リストを用いたTCPディスカバリによりクラスタのネットワークトポロジー構成が可能である。しかし、TCPディスカバリはハードウェア交換、ノード追加などが行われるデータセンターネットワーク内での利用の場合柔軟性に欠けるので、Zookeeperに基づくディスカバリが有効である。

クラスタ内のデータ配置には、partitioningというテクニックが用いられる。また、Igniteは分散コンピューティングでよく使われるConsistent Hashingではなく、Rendezvousとも呼ばれるHighest Random Weight(HRW)ハッシュ化がパティショニング戦略アルゴリズムを使用している。HRWは、人気のConsistent Hashingに比べてよりuniformなデータdistributionを実現できるアルゴリズムとして知られる。
(Consistent Hashingとの比較[7])

クラスタへのリクェストが行われると、ハッシュコードが計算され、そのコードで必要なクラスタノードが発見されることになる。そしてリクェストはそのノードに転送されノード内でデータ処理が行われるのが一般的な流れである。

データ保存手法として、以下の3つがある
- スケーラビリティの高いpartitioned mode
  -- データはアサインされたクラスタのパティションに入る(バックアップ数も指定可能)

- replicated mode
  -- データはクラスタの各ノードにレプリケートされる

- local mode
  -- データはネットワーク上に配信されない。一般的にローカルハッシュに利用される。

データグリッドの用途

サービス開発に様々なIgniteの用途は考えられる。ここではすべてを述べきれないため、ウェブサービス開発に特に有用な機能をピックアップした。

HTTP sessionのキャッシュ

HTTPセッションのキャッシュやクラスタリングはデータグリッドの人気のユースケースである。それによって、サーブレットコンテナやJava EEアプリケーションサーバは
- HTTPセッションへの超高速なアクセスを得る
- アプリケーションノード間やサーブレットコンテナ間のフェイルオーバー、高可用性が可能となる

Spring Caching

Springアプリケーションコンテキスト内でSpringCacheManagerをキャッシュマネージャとして設定してから、@Cacheableアノテーションを利用してソースコード内の引数をキャッシュするのが可能になる。それによってデータグリッドのフェイルオーバー機構を通じてキャッシュされたデータ損失を防げる。

@Cacheable("products")
public Product findProduct(ProductId pid) {...}


たとえば、この例ではfindProductメソッドはproductsというキャッシュと関連付けられる。findProductメソッドは呼び出される度に、このメソッドは以前呼び出されたことがあるかチェックが入る。

データ分析のためのクエリー

IgniteはテキストクエリーやANSI-99規格に準拠するSQLをサポートしているため、人気のある上位nアイテムの発見をはじめ[8]幅広いデータ解析に用いられる。また、Apache Zeppelin[9]と統合されインタラクティブ解析やデータ可視化が可能である。

ベンチマーク

Igniteの一番大きな競合はHazelcast[2]であると思われる。2社共のIgnite対Hazelcastのベンチマークが存在しているが、どれにもグリッドのバージョンが異なるため結論が出ない。

Ignite   Hazelcast  Benchmark
1.0.0    3.5            https://ignite.apache.org/benchmarks/ignite-vs-hazelcast.html
1.3.0    3.6            https://hazelcast.com/resources/benchmark-gridgain/

Apache Igniteを利用した開発

Java、C++ や.NETがサポートされている。また、RESTやmemcachedプロトコルもサポートされているため、他プログラミング言語・環境からの簡単なデータ操作も可能である。

Apache Flumeとの統合


弊社ではBigData処理にログ収集基盤としてApache Flume[10]がよく使われるため、FlumeとIgniteの統合に目を付けた。一般的に、Igniteのストリーム処理には、内部でキーをバッチし適切なノードに配置してくれるIgniteDataStreamerが使われる。一方、Source-Channel-Sinkの概念に従ってFlumeとの統合を実現するのは一般的なので、ここで紹介する統合はIgniteに標準的ではない。

IgniteSink


本実装では、FlumeのSinkを実装しIgniteデータグリッドへのデータ投入を可能にした。最新版Apache Ignite 1.5に含まれる。

https://github.com/apache/ignite/tree/ignite-1.5/modules/flume


セットアップと利用事例


1. 先ず、Flume Channelから受け取ったデータをデータグリッドに格納したい形に変換する処理が必要となる。そのため、以下のようにEventTransformerインターフェースを実装する。

2. ${FLUME_HOME}にあるplugins.d ディレクトリ内で'ignite'ディレクトリ作成する。
3. LogEventTransformerをビルドし${FLUME_HOME}/plugins.d/ignite-sink/libにコピーする。
4. IgniteSinkのJARをはじめ他の関連モジュールを${FLUME_HOME}/plugins.d/ignite-sink/libextにコピーする。


Agent設定に於いて、Sinkの設定は以下のように記述する。

a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
a1.sinks.k1.igniteCfg = /some-path/ignite.xml
a1.sinks.k1.cacheName = testCache
a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
a1.sinks.k1.batchSize = 100

ソースとチャンネルを記述すれば(Flumeマニュアル参照)Flume agentをスタートし、データはtestCacheに転送されることになる。そのデータを利用して様々な分析を行うことができるが、極めて簡単な例とし て、以下のようにContinuousQueryを設けてログにあったエラー通知を行うことができる。

まとめ


この記事では、データグリッドに焦点を当てインメモリーコンピューティングを紹介し、Apache Igniteに特化した実用的な使い方を幾つか述べた。
そのなか、弊社でBigData処理にログ収集基盤として使われるApache FlumeとIgniteの統合の実装や活用例を紹介した。

今後もデータグリッドの動向を見守って他のプロジェクトや新規開発事例を紹介していきたい。


[1] Apache Ignite, https://ignite.apache.org/

[2] Hazelcast, https://hazelcast.com/

[3] Apache Geode, http://geode.incubator.apache.org/

[4] Infinispan, http://infinispan.org/

[5] What is Ignite, http://apacheignite.gridgain.org/docs/what-is-ignite

[6] Apache Spark, http://spark.apache.org/

[7] Rendezvous Hashing, https://github.com/clohfink/RendezvousHash

[8] Ignite Streaming Example, http://apacheignite.gridgain.org/docs/streaming-example

[9] Apache Zeppelin, http://zeppelin.incubator.apache.org/

[10] Apache Flume, https://flume.apache.org/