blog

Redis分散ロックの実装

日々の開発において、ロックを追加しなければならない場面に遭遇することは避けられません。例えば、商品の在庫を差し引く場合、まずデータベースから在庫を取り出し、在庫判定を行い、それから在庫を差し引かなけれ...

Aug 16, 2020 · 7 min. read
シェア

序文

日々の開発において、ロックを追加しなければならない場面に遭遇することは避けられません。例えば、商品の在庫を差し引くには、まずデータベースから在庫を取り出し、在庫判定を行い、それから在庫を差し引く必要があります。この操作の波は明らかに原子性に沿っていません。もしコードブロックがロックされないと、並行性のために売れすぎの問題につながりやすくなります。私たちのシステムは、モノリシックアーキテクチャの場合は、ローカルロックを使用すると、問題を解決することができます。分散アーキテクチャの場合は、分散ロックを使用する必要があります。

プログラム

SETNXコマンドとEXPIREコマンドの使用法

if (setnx("item_1_lock", 1)) {
 expire("item_1_lock", 30);
 try {
 ...  
 } catch {
 ...
 } finally {
 del("item_1_lock");
 }
}

SETNXとEXPIRE操作の波が非原子であるため、SETNXが成功した場合、EXPIREが実行されない、その結果、ロックがデッドロックを形成するタイムアウト時間を設定しないエラーが発生し、このメソッドは、問題を解決するように見えるが、一定のリスクがあります。

この場合、SETNXとEXPIREの両方のオペレーションが成功するか、または成功しないように、luaスクリプトを使用してオペレーションをアトミックに保つことができます。

if (redis.call('setnx', KEYS[1], ARGV[1]) < 1)
then return 0;
end;
redis.call('expire', KEYS[1], tonumber(ARGV[2]));
return 1;

そうすることで、競合するロックのアトミック性は最初に解決され、他の機能はまだ実装されていませんが、デッドロック。

Redis 2.6.12以上では、SETコマンドを柔軟に使用できます。

if (set("item_1_lock", 1, "NX", "EX", 30)) {
 try {
 ...  
 } catch {
 ...
 } finally {
 del("item_1_lock");
 }
}

この改良されたアプローチでは、SETNXとEXPIREのアトミック性の問題が、luaスクリプトを使用することなく解決されます。ここで、Aがロックを取得し、ロジックを実行するコードブロックに入ったが、様々な理由でタイムアウトし、ロックが自動的に解放されたとします。その後、Bがロックの取得に成功し、コードブロックに入ってロジックを実行しますが、Aがロジックの実行を終えてロックを解除すると、Bが取得したばかりのロックを解除してしまいます。自分の鍵を使って別のドアを開けるようなもので、受け入れられません。

この問題を解決するには、SET時にロック識別子を設定し、DEL時に現在のロックが自分のロックであることを確認します。

String value = UUID.randomUUID().toString().replaceAll("-", "");
if (set("item_1_lock", value, "NX", "EX", 30)) {
 try {
 ...  
 } catch {
 ...
 } finally {
 ... lua 原子性のためのスクリプト
 }
}
if (redis.call('get', KEYS[1]) == ARGV[1])
then return redis.call('del', KEYS[1])
else return 0
end

この時点で、競合するロックの原子性の問題と、誤ってロックを削除してしまう問題はようやく解決されました。しかし、ロックは一般的に、再連続性、ループ待ち、タイムアウトの自動更新といった機能をサポートする必要があります。以下の

Redissonを始める

Redissionのロック、再入力、タイムアウト自動更新機能は、カプセル化に役立っており、APIを呼び出す限り、独自のニーズに応じて、簡単に上記のポイントを達成することができます。詳細は Redisson参照してください。

プロジェクトへのRedissonのインストール

<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.13.2</version>
</dependency>
implementation 'org.redisson:redisson:3.13.2'

最新バージョンは3.13.2、または Redisson 必要なバージョンを見つけることができます。

簡単な試み

RedissonClient redissonClient = Redisson.create();
RLock lock = redissonClient.getLock("lock");
boolean res = lock.lock();
if (res) {
 try {
 ...  
 } finally {
 lock.unlock();
 }
}

Redissonは、パッケージを行うためのすべての基本的なロジックになります、特定の実装を気にする必要はありません、数行のコードは完璧なロックを使用することができます。ここでは、単純なソースコードを投げるです。

ロック

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
 // 現在のスレッドIDを取得する
 long threadId = Thread.currentThread().getId();
 // ロックを取得しようとする
 Long ttl = tryAcquire(leaseTime, unit, threadId);
 // 成功を直接得る
 if (ttl == null) {
 return;
 }
 // ロックの取得に失敗した。
 RFuture<RedissonLockEntry> future = subscribe(threadId);
 if (interruptibly) {
 commandExecutor.syncSubscriptionInterrupted(future);
 } else {
 commandExecutor.syncSubscription(future);
 }
 try {
 while (true) {
 // ロックの再取得を試みる
 ttl = tryAcquire(leaseTime, unit, threadId);
 // 成功を直接得る
 if (ttl == null) {
 break;
 }
 // フェッチを続けるためにttl時間を待つ
 if (ttl >= 0) {
 try {
 future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
 } catch (InterruptedException e) {
 if (interruptibly) {
 throw e;
 }
 future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
 }
 } else {
 if (interruptibly) {
 future.getNow().getLatch().acquire();
 } else {
 future.getNow().getLatch().acquireUninterruptibly();
 }
 }
 }
 } finally {
 // チャンネル登録を解除する
 unsubscribe(future, threadId);
 }
}

ロックの取得

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
 // ロックの有効期限が設定されている場合、ロックは通常の方法で取得される。
 if (leaseTime != -1) {
 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
 }
 
 // ロックの有効期限が設定されていない場合は、自動更新機能をオンにし、まず30秒の有効期限を設定する。
 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
 ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
 // エラーを返す
 if (e != null) {
 return;
 }
 //  
 if (ttlRemaining == null) {
 // 自動更新を有効にする
 scheduleExpirationRenewal(threadId);
 }
 });
 return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
 internalLockLeaseTime = unit.toMillis(leaseTime);
 return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
 /**
 * ロックが存在しない場合、hincrbyを使用して新しいハッシュテーブルを作成し、ロックカウントを1増加させ、有効期限を設定する。
 * ロックが存在し、現在のスレッドに属している場合、ロック・カウントを1インクリメントし、有効期限を設定する。
 * ロックは存在するが現在のスレッドに属していない場合、ロックの有効期限を返す。
 **/
 "if (redis.call('exists', KEYS[1]) == 0) then " +
 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
 "return nil; " +
 "end; " +
 "return redis.call('pttl', KEYS[1]);",
 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

ロックの削除

public RFuture<Void> unlockAsync(long threadId) {
 RPromise<Void> result = new RedissonPromise<Void>();
 
 // ロジックのロック解除
 RFuture<Boolean> future = unlockInnerAsync(threadId);
 future.onComplete((opStatus, e) -> {
 // 有効期限をリフレッシュする時限タスクを削除する
 cancelExpirationRenewal(threadId);
 
 if (e != null) {
 result.tryFailure(e);
 return;
 }
 
 // ロック解除スレッドとロックが同じスレッドになく、エラーを投げている。
 if (opStatus == null) {
 IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
 + id + " thread-id: " + threadId);
 result.tryFailure(cause);
 return;
 }
 result.trySuccess(null);
 });
 return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
 return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
 /**
 * ロックが現在のスレッドに属しているかどうかを判断する。
 * ロックカウントが1から引かれ、それでもロックカウントが0より大きければ有効期限が設定され、そうでなければロックが解放され、ロック解放メッセージが発行される。
 **/
 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
 "return nil;" +
 "end; " +
 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
 "if (counter > 0) then " +
 "redis.call('pexpire', KEYS[1], ARGV[2]); " +
 "return 0; " +
 "else " +
 "redis.call('del', KEYS[1]); " +
 "redis.call('publish', KEYS[2], ARGV[1]); " +
 "return 1; " +
 "end; " +
 "return nil;",
 Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

まとめ

Redisを分散ロックとして使って並行性の問題を解決するには、まだ難しい点がありますし、注意しなければならない点もたくさんあります。 システムのボリュームを正しく評価する必要がありますし、特定の技術をただ使いたいがために使うべきではありません。並行性の問題を完全に解決するには、やはりデータベースレベルでの作業が必要です。

Read next

アプレットクラウド開発-寮の修理アシスタント

ディレクトリを検索し、マウスの右ボタン、それぞれ、アップロードしてクラウド機能をデプロイします。 1クラウド開発コンソールを開き、データベースを選択し、それぞれ次の新しいコレクションを作成applyData、建物、...

Aug 15, 2020 · 2 min read