Na-ga Style Develop -2ページ目

Na-ga Style Develop

いろいろ試すよう

今回は RabbitMQ チュートリアルの "Work Queue" を大雑把に翻訳しました。
大体意味はあっているとは思いますが、原文と照らし合わしながらご確認ください!

Work Queues

前回 "Hello World" では、指定した Queue から Message を送受信するシンプルなプログラムを紹介しました。今回 "Work Queues" では、前回のサンプルコードを少し修正し RabbitMQ の Roundobin Dispatching・Message Acknowledgment・Message durability・Fair Dispatching 機能を紹介します。

サンプルコードは、Producer として動作する NewTask と Consumer として動作する Worker の 2 つです。

NewTask は、Task を生成し Message にカプセル化して Work Queue (実際は前回の Queue と変わりありませんが、他と区別するために Work Queue と表記します) に送信します。Work Queue は、受信した Message をすぐに Worker に割り当てを行い、割り当てた Message に対して Woker が行なう処理が完了するのを待ちません。多数の Worker は、割り当てられた Message を後で処理するようにスケジューリング (内部 Queue に格納) を行ない、やがて Message をアンカプセル化し Task を処理します。

補足

今回のサンプルコードの実装では、Task と Message が曖昧になっていますが、実際の実装では何か特定のプロトコル (例えば、Protocol Bufer や、Message Pack など..) にエンコード・デコードして扱われるでしょう。

Preparation

前回は Hello World を含んでいる Message を送りましたが、今回は複雑な Task を含んでいる Message を送ります。複雑な Task に含まれるドットは、1 秒間かかる Work とします。例えば "Hello..." は処理に 3 秒間かかる Task を表します。

上記ルールを適用するために、前回紹介した Send と Recv を一部修正します。

Send は、コマンドラインから任意の Task を送れるように修正します。Work Queue へ Message にカプセル化した Task を送信するため NewTask と名付けます。

Recv は、Task に含まれるドット毎に 1 秒間の Work を行なうように修正します。Work Queue から Message を取り出してアンカプセル化した Task を処理するため Worker と名付けます。

NewTask

コマンドライン引数から NewTask#getMessage() で生成した Message でカプセル化した Task を Work Queue に送信するように修正します。

1767826

コマンドライン引数から Task を取得する処理を追加します。

NewTask#getMessage() は引数が指定されていない場合は "Hello World" を返します。指定されている場合は、引数をスペース区切りで結合した文字列を返します。

1870004

NewTask のすべてのソースコードは Github を参照してください。

Worker

Work Queue から取得した Message を Worker#doWork() に渡します。

1768197

アンカプセル化した Task に含まれているドット毎に 1 秒間の Work を Thread.sleep(1000) でシュミレートします。

1768200

Worker のすべてのソースコードは Github を参照してください。

Roundrobin Dispatching

Work Queue を使用する利点の 1 つは、Worker を簡単に並列化できることです。まずは、実際に試してみましょう。

2 つの Worker を同時に起動し、どのように Work Queue から Message を受信するか確認します。2 つの Worker を起動した状態で、NewTask で処理時間の異なる複数の Task を Message でカプセル化して Work Queue に送信します。

1768253

それぞれの Worker は、次のように Message を受信しています。

1768285

デフォルトの設定では、RabbitMQ は次の Cosumer に順番に Message を送ります。どのような Message でも関係なく、全体の平均では、各 Consumer は同じ数の Message を取得します。この Message 分散方式をラウンドロビンと呼びます。

Worker を 3 つ以上同時に起動する場合も試してみてください。もし片方の Woker に処理時間の大きい Message ばかりが割り当てられた場合の挙動に関しては、後述 Fair Dispatching で紹介しています。

Message Acknowledgment

Task を処理するには数秒を要する場合があります。多数の Worker のうち 1 台が Task を実行中にクラッシュした場合は、どのような挙動をするでしょうか。

現状の Worker のサンプルコードでは、Worker に Message を割り当てると同時に Work Queue (具体的にはメモリー) から Message を削除します。そのため、もし Worker を強敵的に終了した場合、処理を行なっていた Message を失います。また、Worker の内部 Queue に格納されていた "まだ処理されていない Message" も全て失います。

しかし、どのような Message も失いたくない場合があると思います。もし Worker が Message を抱えている状態で終了した場合は、別の Worker に Message を割り当ててほしいです。

Message を失わない仕組みとして、RabbitMQ は Message Acknowledgment (ACK) に対応しています。Consumer は RabbitMQ から Message を受信したタイミングや、Message に対する処理が完了したタイミングで ACK を送り返します。RabbitMQ は Consumer から ACK を受け取ると、Queue から Message を削除します。

ACK が送られるずに Consumer が終了した場合は、RabbitMQ は完全に処理されなかった Message を別の Consumer に割り当てます。もし Consumer がクラッシュ (または意図的に強制終了) しても、Message を失わないことを確認できます。

RabbitMQ では、ACK のタイムアウトはありません。Consumer とのコネクションンが切れたときだけ、他の Consumer に Message の割り当てを行ないます。処理時間がとても長い Message でも (ACK が届くまで特別な処理は行なわないので) 問題ありません。

前回および現状のサンプルコードでは、Channel#basicConsume(queue, autoAck, callback) にて、明示的に autoAck フラグを true にしていました。そのため、Worker は Message を Work Queue から割り当てられた時点で ACK を自動的に送り返します。この autoAck フラグを false にする場合は、(大体の場合は Task の処理が終わった時点で) ACK を手動で送り返す必要があります。

1777730

このようにすると、もし Message を処理している Worker を CTRL+C で強制終了したとしても、何も Message を失いません。そして、すぐに ACK が返ってきていない Message を別の Worker に割り当てを行います。

Forgotten acknowledgment

Channel#basicAck() を忘れることはよくあるミスですが、重大な影響を及ぼします。先程も述べましたが、ACK が返ってきていないい Message は Worker とのコネクションが切れた時点で再割り当てされます。そのため、ACK が返ってきていない Message は削除できないので、Work Queue の使用メモリーが肥大化します。

このミスに気づくためには、RabbitMQ の "rabbitmqctl" コマンドで "message_unacknowledged" フィールドを確認してください。

1778000

Message durability

複数の Worker でラウンドロビンを使う場合に、ある Worker が突然終了しても他の Worker で Message を処理する方法を紹介しました。

しかし、いまの状態だと RabbitMQ 自体がクラッシュした場合は、RabbitMQ で宣言されている Queue および Queue に格納されている Message が失われます。先ほどと同様に、Queue および Message も失いたくないことが要求される場合もあると思います。もちろん RabbitMQ では Queue および Message に永続制を持たすことができます。

最初に、RabbitMQ が Queue を失わない方法を紹介しましょう。そのためには、次のように永続性フラグを Queue 宣言時に有効にする必要があります。

1769139

このコマンド自身は正しいですが、今回の NewTask では動きません。なぜなら、既に永続性を持たない "hello" という名前を持った Queue を宣言しているからです。RabbitMQ では存在している Queue に対して別のパラメータで再定義することは認めておらず、もし再定義した場合はエラーを返します。回避策として、次のように違う名前 (今回の場合は task_queue) で Queue を定義しましょう。

1769216

この Channel#queueDeclare() の修正は、NewTask と Worker の両方に適用する必要があります。
ここでの重要な点は、もし RabbitMQ を再起動しても "task_queue" という名前を持った Queue が失われないことを保証されることです。

次に、RabbitMQ が Message を失わない方法を紹介します。Message にに永続性を持たせるには、BasicProperties を実装した MessageProperties.PERSISTENT_TEXT_PLAIN を次のように設定します。

1769278

Note on message persistence

Message に永続性を持たせた場合も、 Message を失わないことを必ず保証するわけではありません。

永続性を持たせると Message を保存するようしますが、 Message を受け取った際に保存していない僅かな時間があります。また、RabbitMQ はそれぞれの Message に対して fsync(2) を行いません。つまり、Message は通常ディスクに保存されますが、メモリーキャッシュに保存されるかもしれません。

Message の永続性の保証は確実ではありませんが、使い方によっては十分な場合もあります。もし確実な永続性を求めるのであれば、 Message の送信をトランザクション処理でラップしてください。

Fair Dispatch

ラウンドロビンで述べましたが、もし片方の Woker に処理時間の長い Message ばかりが割り当てられた場合はどのような挙動を行なうでしょうか。まずは、実際に試してみましょう。

2 つの Worker を同時に起動し、どのように Work Queue から Message を受信するか確認します。2 つの Worker を起動した状態で、NewTask で奇数回の場合に処理時間の長い Message を Work Queue に送信します。

1870090

それぞれの Worker は、次のように Message を受信しています。

1870095

やはり、RabbitMQ はどのような Message でも関係なく、各 Worker に平等に割り当てを行なうことが分かります。そのため片方の Worker は常に忙しく、もう片方の Woker はほとんど処理をしない状況になっています。

この問題は、RabbitMQ が Queue に Message を受信するたびに Consumer に割り当てを行なっているため発生しています。RabbitMQ は、Consumer が ACK を返していない Message の数をみていません。それは、やみくもに N 番目の Message を N 番目の Consumer に割り当てています。

解決するためには、Channel#basicQos(prefetchCount) を設定してください。これにより RabbitMQ は prefetchCount を超える Message を Consumer に伝えないようになります。prefetchCount に 1 を設定した場合は、Consumer が何か処理を行なっている間は、新しい Message の割り当てを行ないません。代わりに、何も処理をしていない次の Consumer に Message を割り当てます。

1776042

さきほどと同じ状態を再現してみましょう。2 つの Worker を同時に起動し、どのように Work Queue から Message を受信するか確認します。2 つの Worker を起動した状態で、NewTask で奇数回の場合に処理時間の長い Message を Work Queue に送信します。

1777405

それぞれの Worker は、次のように Message を受信しています。

1777346

First Task を割り与えられた Worker は 8 秒間処理を行います。処理を行っている間は、新しい Message を割り当たらないため、もう片方の Worker に Message を割り当てられていることを確認できます。

1777395

Note about queue size

もし全ての Consumer が忙しい状態だった場合は、Queue がいっぱいになる可能性があります。黙視で監視を行い、おそらくより多くの Consumer を追加するか、あるいは他の戦略を持つでしょう。

Putting it all together

Message Acknowledgment と Channel#basicQos(prefetchCount) を使用することで、Work Queue を利用できます。永続性を持たせることで、RabbitMQ の再起動を行なっても宣言した Queue および Queue に格納されている Message を失いません。

part 3 では、多数の Cosumer に同じ Message を割り当てる (Broadcast) 配信を紹介します。