はじめての RabbitMQ | サイバーエージェント 公式エンジニアブログ
アメーバ事業本部 API 基盤グループでプログラマをしている @na_ga です。

API 基盤グループでは、弊社の様々なサービスから利用される共通 API の開発・運用を行なっております。今回は、私が担当した API でメッセージキューとして利用した RabbitMQ を紹介させていただきたいと思います。

はじめに

API 基盤グループで提供している API には、リクエストをリアルタイムに処理する必要がないものもあります。例えばメール配信 API や、投稿内容の有人監視 API などが挙げられます。

これらの非同期処理が可能な API では、大量のリクエストを受け取るためにメッセージキューを使用しています。

メッセージキューを使用した構成では、リクエストを受け取るプログラムが、受け取ったリクエストから生成したメッセージをキューに格納します。キューに格納されたメッセージは、メッセージを処理するプログラムから取り出され、順番に処理が行なわれます。

各プログラムの役目を別けることによって、システムの負荷にあわせて部分的にスケールしやすい構成ができます。

このようなメッセージングのオープンなプロトコルとして Advanced Message Queuing Protocol があり、代表的な実装としては Apache Project の ActiveMQ、iMatix Corporation の ZeroMQ、そして今回紹介する VMware の RabbitMQ があります。

RabbitMQ 入門

RabbitMQ はメッセージのやり取りを中継するミドルウェアです。

個人的には、ActiveMQ より速く、ZeroMQ にはない永続性を備えており、スケールしやすいメッセージキューで、公式サイトのドキュメントが豊富な点も魅力的です。またコミュニティが活発的で 2012/03 にも RabbitMQ 2.8.1 がリリースされており、今後も注目しています。

では、実際に RabbitMQ で Hello World メッセージの送受信をやってみましょう!

環境を構築してみる

RabbitMQ 環境を CentOS 5 がインストールされている rabbitmq0 サーバに構築します。

RabbitMQ は、Erlang バージョン R12B-3 以上が必須となっています。CentOS の場合は yum の DAG レポジトリからバージョン R12B-5 をインストールできますが、RabbitMQ の各種プラグインを動かすために現時点で最新バージョンの R15B01 をソースからインストールします。

★ Erlang インストール
[root@rabbitmq0]# cd /usr/local/src
[root@rabbitmq0]# wget http://www.erlang.org/download/otp_src_R15B01.tar.gz
[root@rabbitmq0]# tar xvzf otp_src_R15B01.tar.gz
[root@rabbitmq0]# yum install ncurses-devel
[root@rabbitmq0]# cd otp_src_R15B01
[root@rabbitmq0]# ./configure
[root@rabbitmq0]# make
[root@rabbitmq0]# make install

★ Erlang バージョン確認
[root@rabbitmq0]# /usr/local/bin/erl -version
Erlang (ASYNC_THREADS,HIPE) (BEAM) emulator version 5.9.1

RabbitMQ は公式サイトで配布されている最新バージョン 2.8.1 の Binary をダウンロードし、/usr/local/rabbitmq 以下に設置します。その後 RabbitMQ を起動する rabbitmq ユーザーを追加し、RabbitMQ の起動スクリプトを作成します。

★ RabbitMQ をインストールディレクトリに配置
[root@rabbitmq0]# cd /usr/local/src/
[root@rabbitmq0]# wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.1/rabbitmq-server-generic-unix-2.8.1.tar.gz
[root@rabbitmq0]# tar xvzf rabbitmq-server-generic-unix-2.8.1.tar.gz
[root@rabbitmq0]# cp -a ./rabbitmq_server-2.8.1 /usr/local/rabbitmq_server-2.8.1
[root@rabbitmq0]# ln -s /usr/local/rabbitmq_server-2.8.1 /usr/local/rabbitmq
[root@rabbitmq0]# mkdir /var/log/rabbitmq

★ rabbitmq ユーザーを作成
[root@rabbitmq0]# /usr/sbin/groupadd -g 30105 rabbitmq
[root@rabbitmq0]# /usr/sbin/useradd -u 30105 -g rabbitmq -d /var/lib/rabbitmq -m -c 'RabbitMQ messaging server' rabbitmq

★ rabbitmq ユーザーに rabbitmq コマンドの PATH を追加
[root@rabbitmq0]# cp /var/lib/rabbitmq/.bash_profile{,.bak}
[root@rabbitmq0]# sed -i 's#PATH=$PATH:$HOME/bin#PATH=$PATH:$HOME/bin:/usr/local/bin:/usr/local/rabbitmq/sbin#g' /var/lib/rabbitmq/.bash_profile
[root@rabbitmq0]# diff /var/lib/rabbitmq/.bash_profile{,.bak}
10c10
< PATH=$PATH:$HOME/bin:/usr/local/bin:/usr/local/rabbitmq/sbin
---
> PATH=$PATH:$HOME/bin

★ rabbitmq ユーザーにパーミッションを変更
[root@rabbitmq0]# chown -R rabbitmq. /var/log/rabbitmq
[root@rabbitmq0]# chown -R rabbitmq. /usr/local/rabbitmq_server-2.8.1

★ RabbitMQ の起動スクリプトを作成
[root@rabbitmq0]# cat << 'EOF' > /etc/init.d/rabbitmq && chmod +x /etc/init.d/rabbitmq
#!/bin/bash
user=rabbitmq
run_as_user(){
su - $user -c "$*"
}
start(){
run_as_user rabbitmq-server -detached
}
stop(){
run_as_user rabbitmqctl stop
}
status(){
run_as_user rabbitmqctl status
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
stop && start
;;
status)
status
;;
*)
echo $"Usage: $0 {start|stop|restart|status}"
exit 1
esac
exit $?
EOF

RabbitMQ を起動し、キュー情報を取得します。

★ RabbitMQ 起動
[root@rabbitmq0]# /etc/init.d/rabbitmq start
Activating RabbitMQ plugins ...
0 plugins activated:

★ キュー情報取得
[root@rabbitmq0]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
...done.

インストールした直後なので、まだ何もキューが作成されていないことが確認できます。たったこれだけで、とりあえずメッセージを送受信する準備が整いました!

メッセージを送信してみる

"hello" キューに対して "Hello World" メッセージを送信するプログラムを実装します。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {

private static final String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {

// Factory 生成
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq0");

// Conenction & Channel 接続
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// hello キューを宣言
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// hello キューに Hello World メッセージを送信
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");

// Channel & Connection 切断
channel.close();
connection.close();

}
}

プログラムを実行し、キュー情報を取得します。

★ プログラム実行
[root@rabbitmq0]# java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ./Send
[x] Sent 'Hello World!'

★ キュー情報取得
[root@rabbitmq0]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
hello 1
...done.

インストール後は存在していなかった "hello" キューが作成され、1 件のメッセージが格納されていることが確認できます。

メッセージを受信してみる

"hello" キューからメッセージを受信するプログラムを実装します。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

private static final String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {

// Factory 生成
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq0");

// Connection & Channel 接続
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// hello キューを宣言する (Consumer 側が先に起動する場合を考慮して)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("[*] Waiting for messages. To exit press CTRL+C");

// Consumer を宣言する (これにより RabbitMQ からメッセージがプッシュ配信される)
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

// 受け取ったメッセージを処理する
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received '" + message + "'");
}

}
}

プログラムを実行し、キュー情報を取得します。

★ プログラム実行
[root@rabbitmq0]# java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ./Recv
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'

★ キュー情報取得
[root@rabbitmq0]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
hello 0
...done.

さきほど格納したメッセージを受け取り "hello" キューに格納されているメッセージが 0 件になったことが確認できます。

さらに高度な使い方

RabbitMQ はキューおよびメッセージの永続化、メッセージのラウンドロビン配信・一斉配信、RPC 通信など高度な機能を備えています。

こちらに関しては公式サイトのチュートリアルやドキュメントに詳細が記載されていますので、一度ご確認ください。冒頭でも述べましたが、RabbitMQ は公式ドキュメントが非常に豊富な点も魅力のひとつです!

■ RabbitMQ ドキュメント : http://www.rabbitmq.com/documentation.html
■ RabbitMQ チュートリアル : http://www.rabbitmq.com/getstarted.html

RabbitMQ の冗長構成

実際に API に組み込んで使用するには、冗長構成が必要不可欠です。

一部の RabbitMQ に障害が発生した場合でも、API に影響が出ないよう冗長構成を取ります。また、リクエストを受け取ったプログラムは、正常な RabbitMQ に対してメッセージを送信し続ける必要があります。

しかし、RabbitMQ の公式クライアント (現時点の rabbitmq-client-2.8.1) では、1 台の RabbitMQ だけに接続を行なう実装になっています。もし接続している RabbitMQ に障害が発生した場合は、処理を続けられません。

そのため、ここでは複数の RabbitMQ に対して接続を行ない、もしメッセージが送れなかった場合は正常な RabbitMQ に対してメッセージを送信するように拡張しました。

では、拡張したクライアントを使用し、RabbitMQ の冗長構成を紹介します。

実装

拡張したソースコードは Github にて公開しています。

■ Github : https://github.com/na-ga/us-rabbitmq-client/

複数の RabbitMQ を起動してみる

さきほど構築した手順で rabbitmq1 サーバと rabbitmq2 サーバに RabbitMQ 環境を構築します。その後、各 RabbitMQ を起動し、キュー情報を取得します。

★ rabbit1 サーバ上の RabbitMQ を起動
[root@rabbitmq1]# /etc/init.d/rabbitmq start
Activating RabbitMQ plugins ...
0 plugins activated:

★ rabbit2 サーバ上の RabbitMQ を起動
[root@rabbitmq2]# /etc/init.d/rabbitmq start
Activating RabbitMQ plugins ...
0 plugins activated:

★ rabbit1 サーバ上のキュー情報を取得
[root@rabbitmq1]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
...done.

★ rabbit2 サーバ上のキュー情報を取得
[root@rabbitmq2]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
...done.

こちらもインストールした直後なので、まだ何もキューが作成されていないことが確認できます。

複数の RabbitMQ にメッセージを送信してみる

複数の "durable_queue" キューに対して "Hello World" メッセージを送信するプログラムを実装します。

このプログラムでは、キューの宣言時およびメッセージの送信時に永続性オプションを有効にしています。また、第一引数に送信回数を指定できるようにします。

import jp.ameba.us.rabbitmq.client.MqConnectorFactory;
import jp.ameba.us.rabbitmq.client.MqProducer;

public class Send {

private static final String QUEUE_NAME = "durable_queue";

public static void main(String[] args) {

// 送信回数
int count = 0;
try{
count = Integer.valueOf(args[0]);
}catch(Exception e){
System.err.println("Usage: java ./Send [count]");
return;
}

// Factory 生成
MqConnectorFactory factory = new MqConnectorFactory();
factory.addServer("rabbitmq1");
factory.addServer("rabbitmq2");

// RabbitMQ サーバに接続
factory.startup();

// Producer 取得
MqProducer producer = factory.newProducer();

// durable_queue キューに Hello World メッセージを count 回送信
for(int i = 0; i < count; i++){
String message = "Hello World! " + i;
producer.store(QUEUE_NAME, true, false, false, null, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
}

// RabbitMQ サーバと切断
factory.shutdown();

}
}

引数に 5000 を指定してプログラムを実行し、プログラム実行中に rabbitmq1 サーバ上で動作している RabbitMQ を再起動します。その後 RabbitMQ のキュー情報を取得します。

★ 引数に 5000 を指定してプログラム実行
[root@rabbitmq0]# java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar:us-rabbitmq-client.jar ./Send 5000
[x] Sent 'Hello World! 1'
[x] Sent 'Hello World! 2'
[x] Sent 'Hello World! 3'
(snip) ☆ 実行中に rabbitmq1 サーバにて /etc/init.d/rabbitmq restart を実行する
[x] Sent 'Hello World! 4998'
[x] Sent 'Hello World! 4999'

★ rabbit1 サーバ上のキュー情報を取得
[root@rabbitmq1]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
durable_queue 1053
...done.

★ rabbit2 サーバ上のキュー情報を取得
[root@rabbitmq2]# su - rabbitmq -c "rabbitmqctl list_queues name messages_ready"
Listing queues ...
durable_queue 3947
...done.

rabbitmq1 サーバ上の "durable_queue" キューには 1053 件のメッセージが格納されており、残りの 3947 件が rabbitmq2 サーバ上の "durable_queue" キューに格納されていることが確認できました。

RabbitMQ の管理画面

RabbitMQ には多くのプラグインが提供されています。

いままでは rabbitmqctl コマンドでキュー情報を取得していましたが、rabbitmq_management プラグインを導入するとブラウザ上から閲覧できます。rabbitmq_management は、システムの負荷状況・リソース使用量や、RabbitMQ 上のキューや Exchange の管理ができる便利なプラグインです。

では、管理画面の導入手順を紹介します。

プラグインを導入してみる

rabbitmq0 サーバに導入します。

RabbitMQ 環境を構築後に rabbitmq-plugins コマンドで rabbitmq_management プラグインを有効にします。その後、プラグインを反映するために RabbitMQ を再起動します。

★ プラグイン導入
[root@rabbitmq0]# su - rabbitmq -c "rabbitmq-plugins enable rabbitmq_management"
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_mochiweb
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

★ RabbitMQ 終了
[root@rabbitmq0]# /etc/init.d/rabbitmq stop
Stopping and halting node 'rabbit@rabbitmq0' ...
...done.

★ RabbitMQ 起動
[root@rabbitmq0]# /etc/init.d/rabbitmq start
Activating RabbitMQ plugins ...
6 plugins activated:
* amqp_client-2.8.1
* mochiweb-1.3-rmq2.8.1-git
* rabbitmq_management-2.8.1
* rabbitmq_management_agent-2.8.1
* rabbitmq_mochiweb-2.8.1
* webmachine-1.7.0-rmq2.8.1-hg

ブラウザで http://guest:guest@rabbitmq0:55672/ に接続しましょう。

RabbitMQ Management Console

このような画面が表示されれば、完了です!

まとめ

今回は、メッセージキューとして利用した RabbitMQ を紹介させていただきました。

RabbitMQ でメッセージをやり取りする流れの雰囲気は掴んでいただけたでしょうか。また、冗長構成を実現するには、RabbitMQ でクラスタリングを組む手法や、ロードバランサーを別途用意して実現することもできますが、シンプルな使い方では今回のような手法も使えるのではないかと思います。

最後になりますが、RabbitMQ は公式ドキュメントは充実していますが、まだ日本語ドキュメントは少ない状態です。RabbitMQ を使ってみた際には、ぜひアウトプットをしていただきたいなと思っています!