Redisとハサミは使いよう | サイバーエージェント 公式エンジニアブログ

こんにちは。
Amebaの基幹システムを担当している松本と申します。

待望のCluster機能が実装された3.0が2014/2/11にBetaになり、
正式リリースされる日が近づいてきているRedis

今回はアプリ開発でRedisを使って実現できる機能の一部をご紹介させていただきます。


Redisとは

本ブログにも何度か出てきているためご存知の方も多いと思いますが、
簡単に紹介しますと、Redisは高速なデータ操作が可能なインメモリKVSです。
シングルスレッドで動作しているためアトミックな操作が可能になっています。

また、Redisは様々なデータ型に対応していて、文字列型の他にリスト型、ハッシュ型、
セット型、ソート済みセット型、さらにはPub/Subまで扱えることが最大の特徴だと思います。

それぞれのデータ型に対するコマンドも多数用意されており、
それを組み合わせることで様々なケースで利用することができます。

普通にデータ操作を行うだけでも十分魅力的なRedisですが、
複数のコマンドの組み合わせるとどのような機能が実現できるのでしょうか。

カウンターやランキング、タイムライン等が有名ですが、他にもあるのです。
簡単ではありますが、2つほどJavaを用いてご紹介していきます。


ロック

1つめは、RedisのDocumentでも少しだけふれているロック機能を実装してみます。

ロック機能はRedisがアトミックな操作を保証しているから実現できる機能で、
実装に用いるのは以下の4つのコマンドです。

SETNX キーに値をセットする。
キーが既に存在する場合は何もしない。
EXPIRE キーに有効期限を設定する。
GET キーの値を取得する。
DEL キーを削除する。

ロック機能のポイントはSETNXです。

指定したキーがなかった場合は値をセットして1を返し、
既に存在する場合は何もせず0が返ってきます。
つまり、1はロック成功、0は他からロック済みと判断することができます。

それでは実装に進みましょう。
まずロックのインタフェースを用意します。

public interface Lock {

    public void lock() throws TimeoutException;

    public void unlock();
}

最低限のロックとアンロックを用意しました。

続いて中身を実装します。

public class RedisLock implements Lock {

    private static final String LOCK_KEY_PREFIX = "lock:";
    private static final int LOCK_EXPIRE = 30;
    private static final long LOCK_SLEEP = 10L;

    private final Redis redis;
    private final String lockKey;
    private final long lockMillis;

    private volatile boolean isLocked = false;
    private volatile String lockedValue = "";

    public RedisLock(Redis redis, String lockKey, long lockMillis) {
        this.redis = redis;
        this.lockKey = LOCK_KEY_PREFIX + lockKey;
        this.lockMillis = lockMillis;
    }

    @Override
    public void lock() throws TimeoutException {
        if (isLocked) return;
       
        // ロック待機時間
        long max = System.currentTimeMillis() + lockMillis;

        while (true) {
            long now = System.currentTimeMillis();
            String value = String.valueOf(now);
            // ロック試行
            Long result = redis.setnx(lockKey, value);

            // ロックが成功したらexpireを設定してデッドロック防止
            if (result != null && result.longValue() == 1L) {
                redis.expire(lockKey, LOCK_EXPIRE);
                isLocked = true;
                lockedValue = value;
                break;
            }
           
            // ロック待機時間を過ぎたら終了
            if (max <= now) {
                String locked = redis.get(lockKey);
                if (locked != null) {
                    long elapsedTime = now - Long.parseLong(locked);
                    // expireが効いていなかったら削除してデッドロック防止
                    if (elapsedTime >= LOCK_EXPIRE * 1000) {
                        redis.del(lockKey);
                    }
                }
                throw new TimeoutException("lock timeout.");
            }

            try {
                // ロックできなかったらsleepして再試行
                Thread.sleep(LOCK_SLEEP);
            } catch (InterruptedException ignore) { }
        }
    }

    @Override
    public void unlock() {
        if (!isLocked) return;
        // ロックの値を取得
        String value = redis.get(lockKey);
        // このインスタンスでロックされたものなら削除
        if (lockedValue.equals(value)) {
            redis.del(lockKey);
        }

        isLocked = false;
        lockedValue = "";
    }
}

大分できてきました。

SETNXでロック可否を判断し、ロックできたらEXPIREで有効期限を設定します。
他からロックされていた場合は再試行し、待機時間が過ぎたらロック失敗です。
ロック失敗後はロック情報を調査し、EXPIREが効いていなかったら削除します。

アンロック時はインスタンスでロックしたものかを確認後に削除します。

最後に、この実装を返すFactoryを用意します。

public class RedisLockFactory {

    private final long lockTime;
    private final Redis redis;

    public RedisLockFactory(Redis redis, long lockTime) {
        this.redis = redis;
        this.lockTime = lockTime;
    }

    public Lock getLock(String lockKey) {
        return new RedisLock(redis, lockKey, lockTime);
    }
}

これでロック機能の完成です。

以下のようにロックを取得して利用します。

public class RedisLockExample {

    public static void main(String[] args) throws Exception {
        Redis redis = new RedisPool("localhost", 6379);
        RedisLockFactory lockFactory =
            new RedisLockFactory(redis, 10000L);

        for (int i = 1; i <= 10; i++) {
            // ロック取得
            Lock lock = lockFactory.getLock("hoge");
            // ロック実行
            lock.lock();
            try {
                // 排他制御を行いたい更新処理
                update();
                Thread.sleep(3000L);
            } finally {
                // アンロック
                lock.unlock();
            }
            Thread.sleep(1000L);
        }
    }

    private static void update() {
        System.out.println("hoge");
    }
}

ローカルでRedisを起動し、このクラスを複数実行してからMONITORコマンドで状況を
見てみると、ロックされている様子がよくわかります。

1392260782.952660 [0 127.0.0.1:56465] "SETNX" "lock:hoge" "1392260782949"
1392260782.955800 [0 127.0.0.1:56465] "EXPIRE" "lock:hoge" "30"
1392260783.955176 [0 127.0.0.1:56464] "SETNX" "lock:hoge" "1392260783951"
1392260783.968747 [0 127.0.0.1:56464] "SETNX" "lock:hoge" "1392260783965"

・・・略・・・

1392260785.940036 [0 127.0.0.1:56464] "SETNX" "lock:hoge" "1392260785936"
1392260785.953764 [0 127.0.0.1:56464] "SETNX" "lock:hoge" "1392260785949"
1392260785.960863 [0 127.0.0.1:56465] "GET" "lock:hoge"
1392260785.963952 [0 127.0.0.1:56465] "DEL" "lock:hoge"
1392260785.967388 [0 127.0.0.1:56464] "SETNX" "lock:hoge" "1392260785963"
1392260785.970122 [0 127.0.0.1:56464] "EXPIRE" "lock:hoge" "30"
1392260786.968606 [0 127.0.0.1:56465] "SETNX" "lock:hoge" "1392260786964"
1392260786.983315 [0 127.0.0.1:56465] "SETNX" "lock:hoge" "1392260786979"

このRedisを用いたロック機能はAmebaのサービスで実際に使用しているものもあります!


設定の同期

2つめは、設定変更時に通知して複数サーバ間で設定を同期する機能を実装してみます。

実装に用いるのは以下の3つのコマンドです。

PUBLISH チャンネルにメッセージを送信する。
SUBSCRIBE チャンネルに接続し、送信されたメッセージを受信する。
UNSUBSCRIBE チャンネルの接続を終了する。

RedisのPub/Subは非常に面白い機能で、簡単にObserverパターンの実装ができます。
SentinelもPub/Subを使用して情報のやりとりをしています。

今回はこのPub/Subを使って設定の同期を実装します。

public class SyncProperties {

    private static final String SYNC_PROP_CHANNEL = "sync.prop";

    private final Redis redis;
    private final Properties properties;
    private final SyncPropPubSub syncPropPubSub;
    private final ExecutorService threadPool;

    private boolean isStart = false;

    public SyncProperties(Redis redis, Properties properties) {
        this.redis = redis;
        this.properties = properties;
        this.syncPropPubSub = new SyncPropPubSub();
        this.threadPool = Executors.newSingleThreadExecutor();
    }

    public void syncStart() {
        if (isStart) return;
        isStart = true;
        // subscribeはスレッドを専有するので別スレッドでチャンネル接続
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                redis.subscribe(syncPropPubSub, SYNC_PROP_CHANNEL);
            }
        });
    }

    public void syncEnd() {
        if (!isStart) return;
        // チャンネル接続を閉じる
        syncPropPubSub.unsubscribe();
        isStart = false;
    }

    public void shutdown() {
        try {
            threadPool.shutdown();
            if (!threadPool.awaitTermination(3, TimeUnit.SECONDS)) {
                try {
                    threadPool.shutdownNow();
                } catch (Exception ignore) { }
            }
        } catch (Exception e) {
            try {
                threadPool.shutdownNow();
            } catch (Exception ignore) { }
        }
    }

    public String get(String key) {
        return properties.getProperty(key);
    }

    public String get(String key, String defaultValue) {
        return properties.getProperty(key, defaultValue);
    }

    public void put(String key, String value) {
        properties.put(key, value);
        String message = "put:" + key + ":" + value;
        // put情報をチャンネルに送信
        redis.publish(SYNC_PROP_CHANNEL, message);
    }

    public void remove(String key) {
        properties.remove(key);
        String message = "remove:" + key;
        // remove情報をチャンネルに送信
        redis.publish(SYNC_PROP_CHANNEL, message);
    }

    @Override
    public String toString() {
        return properties.toString();
    }

    private class SyncPropPubSub extends RedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            // 受信したメッセージを分解
            String[] m = message.split(":");
            String type = m[0];
            switch (type) {
            case "put":
                if (m.length < 3) break;
                // put情報を反映
                properties.put(m[1], m[2]);
                break;
            case "remove":
                if (m.length < 2) break;
                // remove情報を反映
                properties.remove(m[1]);
                break;
            default:
                break;
            }
        }
    }
}

Propertiesをラップし、putとremoveが実行された時にメッセージを送信します。
受信したメッセージは分解して解析し、Propertiesに反映します。

送信側は以下のように使用します。

public class PublisherExample {

    public static void main(String[] args) throws Exception {
        Redis redis = new RedisPool("localhost", 6379);

        Properties properties = new Properties();
        properties.put("hoge", "0");

        SyncProperties syncProperties =
            new SyncProperties(redis, properties);
        // 設定追加
        syncProperties.put("fuga", "true");
        for (int i = 1; i <= 10; i++) {
            // 設定更新
            syncProperties.put("hoge", String.valueOf(i));
            Thread.sleep(2000L);
        }
        // 設定削除
        syncProperties.remove("fuga");

        syncProperties.shutdown();
    }
}

受信側は以下のように使用します。

public class SubscriberExample {

    public static void main(String[] args) throws Exception {
        Redis redis = new RedisPool("localhost", 6379);

        Properties properties = new Properties();
        properties.put("hoge", "0");

        SyncProperties syncProperties =
            new SyncProperties(redis, properties);
        // 同期開始
        syncProperties.syncStart();
        for (int i = 1; i <= 10; i++) {
            System.out.println(syncProperties.toString());
            Thread.sleep(3000L);
        }
        // 同期終了
        syncProperties.syncEnd();
        syncProperties.shutdown();
    }
}

受信側を起動してから送信側を起動すると、受信側のコンソールで同期の様子がわかります。

{hoge=0}
{hoge=0}
{fuga=true, hoge=1}
{fuga=true, hoge=3}
{fuga=true, hoge=4}
{fuga=true, hoge=6}
{fuga=true, hoge=7}
{fuga=true, hoge=9}
{fuga=true, hoge=10}
{hoge=10}

設定のマスタもRedisに持つようにして、起動時にそこから持ってくるようにすれば、
常に設定を最新の状態にすることができますね!


まとめ

Redisは使い方次第で様々な機能を実装することができます。
特にPub/Subは設定をはじめ、チャットやゲームの同期やMQの実装まで、
とても応用できる幅が広く面白い機能だと思います。

高速なデータ操作だけではない、使い方次第で色々な可能性を秘めているRedis。
ぜひみなさんも使ってみてください。

なお、今回作成したソースコードはこちらで公開しております。
ご興味がありましたらご覧ください。

https://github.com/yosuque/redis-lock
https://github.com/yosuque/redis-message