blog

ZooKeeper分散ロック原理の徹底解析

JDKネイティブロックは、異なるスレッドが相互に排他的な方法で共有リソースにアクセスすることを可能にしますが、異なるプロセス間で相互に排他的な方法で共有リソースにアクセスしたい場合、JDKネイティブロ...

Jul 4, 2020 · 12 min. read
シェア

I. 実装原理

JDKネイティブロックは、異なるスレッドが相互に排他的に共有リソースにアクセスすることを可能にしますが、異なるプロセス間で相互に排他的に共有リソースにアクセスしたい場合、JDKネイティブロックは役に立ちません。このとき、Zookeeperを使って分散ロックを実現することができます。具体的には以下の2つに分かれます:

臨時ノダル・プログラム

一時ノード方式の原理は以下の通りです:

  • ZooKeeper は同じノードを2つ作ることを許さないので、最初に作れるのは1つのプロセスだけです;
  • プロセスAがノードの作成に成功したと仮定すると、分散ロックを取得します。この時点で、他のプロセスはparent_nodeにリスナーを登録し、その下の全ての子ノードの変更をリスニングし、現在のスレッドをハングアップさせる必要があります;
  • parent_node の下の子ノードが変更されると、その子ノードにリスナーを登録しているすべてのプロセスに通知されます。これらのプロセスは、対応するロック・ノードの削除イベントかどうかを判断する必要があります。削除イベントである場合、保留中のスレッドは実行を続行し、ロックの再取得を試みます。

ここでテンポラリ・ノードを使用する理由は、デッドロックを回避するためです。プロセス A がビジネス・ロジックを実行し終えると、積極的にノードを削除してロックを解放します。プロセス A がビジネス・ロジックの実行を終了した後、積極的にノードを削除してロックを解放します。しかし、プロセス A が予期せずダウンした場合、ノードは一時ノードとして宣言されているため削除され、デッドロックが回避されます。

テンポラリー・ノード方式は実装が簡単ですが、欠点が目立ちます:

  • デメリット1: 親ノード配下の他のロックが変更または削除された場合、プロセスB、C、Dに通知されますが、他のロックの解放については明らかに気にしません。parent_node配下に多数のロックがあり、アプリケーションの並行性が高い場合、ZooKeeperクラスタはクライアントに頻繁に通知する必要があり、多くのネットワークオーバーヘッドが発生します;
  • 欠点2:一時ノード・スキームを使用して作成されたロックは非公平です。つまり、プロセスAがロックを解放した後、プロセスB、C、Dが再試行する順番は、ロックを最初に取得しようとした時間ではなく、通知を受け取った時間に関連しています。

プログラムの同時実行性が高くない場合は、このスキームの実装が比較的簡単なので、このスキームで実現できます。また、プログラムの同時実行性が非常に高い場合は、次のような一時的順序付きノード・スキームが必要になります:

一時的順序ノードプログラム

仮順序ノードを使用する場合、対応する処理は以下のようになります:

  • 各プロセスはparent_nodeの下に一時的な順序付きノードを作成しようとしますが、一時的な順序付きノードの特性に従って、すべてのプロセスがその作成に成功します;
  • 次に、各プロセスは、現在のparent_nodeの下にあるこのロックのすべてのテンポラリ・ノードの情報を取得し、それが最小のものであるかどうかを判断する必要があります;
  • もしそうでなければ、現在のスレッドをハングします。そして、その前のノードにリッスンを登録します;
  • 上図のように、プロセス A が処理を終了すると、プロセス B が登録した Watch イベントがトリガされます。

ここで特筆すべきは、以下のプロセス:

  • プロセスBが一時的なノードを作成し、それが最小のノードでないことを比較によって知っているが、まだリッスン登録をしていない場合;
  • この時点で、プロセスAは処理を終了し、ノード01を削除します;
  • その後、プロセス B は exist メソッドを呼び出してリスナーを登録し、IllegalArgumentException をスローします。これは例外ですが、通常は前のノードがもはや存在しないことを意味します。

この場合、プロセスBは再度ロックの取得を試み、取得できれば処理を開始することができます。これについては、以下のApache Curatorのソースコードで再度説明します。

以上の紹介を通して、一時的順序ノード方式が一時的ノード方式の2つの欠点を正確に解決していることがわかります:

  • 各一時的順序付きノードは、その前のノードだけを気にすればよく、他の追加ノードや追加イベントを気にする必要はありません;
  • 実装されたロックは公平で、最初に到着したプロセスは、より小さな値を持つ一時的な順序付きノードを作成するため、より速くロックを取得することができます。

一時順序ノード・スキームのもう1つの利点は、読み書きロックの読み取りロックのような共有ロックを実装できることです。

読み書きロック

次の図に示すように、一時的に順序付けられたノードは、読み取りロックされたノードと書き込みロックされたノードに分類されます:

  • 読み取りロック・ノードでは、前の書き込みロック・ノードの解放だけを気にすればよいのです。前の書き込みロックが解放されると、複数の読み取りロック・ノードに対応するスレッドは同時にデータを読み取ることができます;
  • 書き込みロック・ノードでは、前のノードの解放だけを気にすればよく、前のノードが書き込みロック・ノードか読み取りロック・ノードかを気にする必要はありません。なぜなら、順序性を確保するために、書き込み操作は前の読み取り操作または書き込み操作が完了するまで待たなければならないからです。

アパッチ・キュレーター

基本的な使用法

Apache Curator は ZooKeeper 用の Java クライアントで、一時順序ノードスキームに基づく分散ロック、分散読み書きロック、その他の機能を実装しています。使用するには、Apache Curator と ZooKeeper の依存関係をインポートし、ZooKeeper のバージョンがサーバ上の ZooKeeper のバージョンと同じであることを確認する必要があります:

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.4.14</version>
</dependency>

基本的な使い方は以下の通り:

RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
 .connectString(".0.")
 .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
 .namespace("mySpace").build();
client.start();
// 1. 分散ロックを作成する
InterProcessMutex lock = new InterProcessMutex(client, "/distributed/myLock");
// 2.分散ロックを取得しようとする
if (lock.acquire(10, TimeUnit.SECONDS)) {
 try {
 System.out.println("ビジネス時間の消費をシミュレートする");
 Thread.sleep(3 * 1000);
 } finally {
 // 3. 
 lock.release();
 }
}
client.close();

ZooKeeper のデータ構造は以下のようになっています:

指定されたパスの下に複数の一時順序ノードが順次作成され、ビジネスロジックが完了するとこれらのノードは削除されます。ここではスタンドアロン版のZooKeeperを使用していますが、クラスタ環境でも同様です。 レプリケーションの遅延によってデータの不整合が発生するRedisのマスタースレーブモードとは異なり、ZooKeeperクラスタの各ノード上のデータの一貫性はそれ自体で保証することができます。

ソースコード解析

Apache Curatorは、一時的な順序付きノードの実装スキームを使用する基礎となる、そのソースコードを見て、次の実装方法です:

ロックソースコードの解析

上記のコア・メソッドはacquire()メソッドで、以下のように定義されています:

@Override
public boolean acquire(long time, TimeUnit unit) throws Exception{
 return internalLock(time, unit);
}

ご覧のように、内部でinternalLock()メソッドを呼び出しており、internalLockメソッドのソースコードは以下のようになっています:

// threadDataThreadはロックを保持するスレッドで、LockDataはロック・データである。
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 
private boolean internalLock(long time, TimeUnit unit) throws Exception{
 Thread currentThread = Thread.currentThread();
 // まず、threadDataに現在のスレッドのロックがすでにあるかどうかをチェックする。
 LockData lockData = threadData.get(currentThread);
 if ( lockData != null ){
 //ロックがすでに存在する場合、カウンタが1増加する。
 lockData.lockCount.incrementAndGet();
 return true;
 }
 // コア・メソッド:ロックの取得を試みる
 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
 // 実際にロック・ノードを削除するメソッドはreleaseLock()内に存在し、以下のソース・コードを持つ。
 if ( lockPath != null ){
 LockData newLockData = new LockData(currentThread, lockPath);
 threadData.put(currentThread, newLockData);
 return true;
 }
 return false;
 }

実際にロックの取得を試みる上記のメソッドは attemptLock() です:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{
 final long startMillis = System.currentTimeMillis();
 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
 final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
 int retryCount = 0; // 再試行
 String ourPath = null;
 boolean hasTheLock = false;
 boolean isDone = false;
 
 // このループは、NoNodeException が発生したときに再試行するために使用される。
 while ( !isDone ){
 isDone = true;
 try{
 // コア・メソッド:ロック・パスに従って対応するノードを作成する。
 ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
 // コア・メソッド:ロックの取得
 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
 }
 catch ( KeeperException.NoNodeException e ){
 // 例外が発生し、ZooKeeper に設定された最大再試行時間または再試行回数に達していない場合、ループは継続され、ロックの取得が再度試みられる。
 if ( client.getZookeeperClient().getRetryPolicy()
 .allowRetry(retryCount++,System.currentTimeMillis() - startMillis,
 RetryLoop.getDefaultRetrySleeper()) ){
 isDone = false;
 }else{
 throw e;
 }
 }
 }
 // 実際にロック・ノードを削除するメソッドはreleaseLock()に存在し、以下のソース・コードを持つ。
 if ( hasTheLock ){
 return ourPath;
 }
 return null;
 }

createsTheLockは比較的単純な実装で、指定されたパスに基づいて順序付けられた一時的なノードを作成します:

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception{
 String ourPath;
 // lockNodeBytesが空でない場合、データを持つ一時的な順序付きノードが作成される。
 if ( lockNodeBytes != null ){
 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
 withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
 }else{
 //そうでない場合は、空の一時的な順序付きノードが作成される。
 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().
 withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
 }
 // 作成されたノードへのパスを返す
 return ourPath;
}

ここで返される一時順序ノードのパスは、引数としてinternalLockLoop()メソッドに渡されます。原理を紹介する記事の冒頭で、各スレッドが良い一時順序ノードを作成するだけでなく、それが一時順序ノードによって作成された現在の最小のノードであることを判断する必要があると述べ、internalLockLoop()メソッドは、主にこれを行うことです:

private boolean internalLockLoop ( long startMillis, Long millisToWait, String ourPath) throws Exception {
 // ロックが保持されているかどうか
 boolean haveTheLock = false;
 boolean doDelete = false;
 try {
 if (revocable.get() != null) {
 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
 }
 // 接続されている ZooKeeper クライアントが起動状態、つまりロックを取得したいプロセスが実行中で、まだロックを取得していない場合、ループは継続する。
 while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
 // 現在のすべての子ノードを小さいものから大きいものへとソートする。
 List<String> children = getSortedChildren();
 // createTheLock メソッドによって取得された一時的な順序付きノードへのパスはインターセプトされ、ノード名の一部のみが保持される。
 String sequenceNodeName = ourPath.substring(basePath.length() + 1);
 // 現在の点が最小のノードかどうかを判断する
 PredicateResults predicateResults = driver.
 getsTheLock(client, children, sequenceNodeName, maxLeases);
 // 現在の点が最小のノードである場合、ロックはその点で取得される。
 if (predicateResults.getsTheLock()) {
 haveTheLock = true;
 } else {
 // ポイントが最小のノードでない場合、まずスプライスして前のノードの完全なパスを取得する。
 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
 synchronized (this) {
 try {
 // 次に、前のノードをリッスンする
 client.getData().usingWatcher(watcher).forPath(previousSequencePath);
 // 待ち時間を設定した場合
 if (millisToWait != null) {
 // これまでに費やした時間から待ち時間を引く。
 millisToWait -= (System.currentTimeMillis() - startMillis);
 startMillis = System.currentTimeMillis();
 // 待ち時間が0より小さい場合、費やした時間が待ち時間を超えたことになり、取得したロックは無効であり、削除する必要がある。
 if (millisToWait <= 0) {
 //削除フラグを設定し、ループを抜ける
 doDelete = true; 
 break;
 }
 // まだ時間が残っていれば、残りの時間、ロックが取得されるのを待つ。
 wait(millisToWait);
 } else {
 // 待ち時間が設定されていない場合は、ロックの取得を待つ。
 wait();
 }
 } catch (KeeperException.NoNodeException e) {
 // この例外がスローされると、リスナーが前のノードに設定されたときに、前のノードがもはや存在しないことを意味し、例外がキャッチされる。
 // しかし、余計なことをする必要はない。なぜなら、ループは継続し、再びロックの取得を試みることができるからだ。
 }
 }
 }
 }
 } catch (Exception e) {
 ThreadUtils.checkInterrupted(e);
 doDelete = true;
 throw e;
 } finally {
 // 例外またはタイムアウトがスローされた場合、プロセスによって作成されたロックは無効であり、削除する必要がある。例外またはタイムアウトがスローされた場合、プロセスによって作成されたロックは無効であり、削除する必要がある。
 if (doDelete) {
 deleteOurPath(ourPath);
 }
 }
 return haveTheLock;
}

ここでは、現在のノードがロックを保持しているかどうかを判断する、上記のgetsTheLockメソッドについて説明します:

PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);

上述したように、現在のノードがロックを保持しているかどうかの判定はロック・タイプによって異なるため、getsTheLock メソッドにはさまざまな実装があります。ここでは、StandardLockInternalsDriver を例にとって説明します。StandardLockInternalsDriver は、排他的ロックの判定ルールを使用します:

 public PredicateResults getsTheLock(CuratorFramework client, List<String> children, 
 String sequenceNodeName, int maxLeases) throws Exception {
 // 順序付きノード内の現在の点のインデックスを取得する。
 int ourIndex = children.indexOf(sequenceNodeName);
 // ourIndex が 0 より小さい場合、NoNodeException がスローされる。
 validateOurIndex(sequenceNodeName, ourIndex);
 // もしourIndexがmaxLeasesより小さければ、それは0であり、小さいものから大きいものへと並べられた集合の最初のもの、つまり最小のものである。
 boolean getsTheLock = ourIndex < maxLeases;
 // それが最小のものであれば、ロックは獲得されており、前のノードの名前を返す必要はない。そうでなければ、その後のリスニング操作のために前のノードの名前を返す必要がある。
 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
 return new PredicateResults(pathToWatch, getsTheLock);
 }

maxLeaseパラメータの意味を説明します。デフォルト値は1であり、これは相互に排他的なロックです。デフォルト値が1より大きい場合、maxLeaseの値が5であると仮定すると、最小の5つの一時順序ノードがロックを保持できるノードであるとみなすことができ、このとき、最大5つのスレッドがクリティカルゾーンに同時にアクセスすることができ、これは機能的にJavaのセマフォ機構に似ています。

リリースロックのソースコード解析

ロックを取得するソース・コードは以上ですが、ロックを解放する処理はもう少し単純です。release()メソッドのソース・コードは以下の通りです:

public void release() throws Exception {
 Thread currentThread = Thread.currentThread();
 // 現在のスレッドに基づいてロック情報を取得する
 InterProcessMutex.LockData lockData = threadData.get(currentThread);
 // ロック・ノードを実際に削除するメソッドはreleaseLock()に存在し、そのソース・コードは以下のとおりである。
 if (lockData == null) {
 throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
 }
 // Zookeeperが実装するロックはリエントラントなので、カウンタは1つ減る。
 int newLockCount = lockData.lockCount.decrementAndGet();
 if (newLockCount > 0) {
 return;
 }
 // カウンタの値が0より小さい場合、ロックが解除された回数がロックが追加された回数より多いことになり、例外がスローされる。
 if (newLockCount < 0) {
 throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
 }
 try {
 // このステップに達すると、カウンタ値は正確に0になり、ノードが実際に削除されてロックが解放される。
 internals.releaseLock(lockData.lockPath);
 } finally {
 // threadData からロック情報を削除する。
 threadData.remove(currentThread);
 }
}

実際にロック・ノードを削除するメソッドは、以下のソース・コードを持つ releaseLock() に存在します:

final void releaseLock(String lockPath) throws Exception{
 client.removeWatchers();
 revocable.set(null);
 deleteOurPath(lockPath); //ZooKeeper上の対応するノードを削除する。
}
  • ニ・チャオ . PaxosからZookeeperへ - 分散一貫性の原則と実践 . 電子工業出版社 . 2015-02-01

その他の記事については、[Full Stack Engineer's Handbook] , GitHub address: をご覧ください。

Read next

JAVAの基本的ないくつかのよく使われるクラス

char - 文字、int - 整数に加え、ラッパークラスに対応し、最初の文字の残りを大文字にすることができます。 文字列s1 = "nice to "の場合、文字列s2 = "nice" + " to"; "nice to "の定数プールがあり、s2は、本質的にリテラル代入と等価であるため、JVMは、 ...のリターンを優先します。

Jul 4, 2020 · 2 min read