blog

CountDownLatch、Semaphore、CyclicBarrier、Conditionソースコード解析

他のスレッドで実行されている一連の処理が完了するまで、1つ以上のスレッドが待機できるようにする同期ヘルパークラスです。カウントダウン計算の概念。 最初にある整数のパラメータ値が与えられると、count...

Jun 24, 2020 · 10 min. read
シェア

CountDownLatch

定義

他のスレッドで実行される一連の処理が完了するまで、1つまたは複数のスレッドが待機できるようにする同期ヘルパークラスです。カウントダウン計算の概念。最初にある整数のパラメータ値が与えられ、カウントダウン関数を実装するためにcountDown()、この整数の0へのカウントダウンでは、await()メソッドを呼び出したプログラムは、それが0に達すると、すべての待機スレッドを解放するために、待機する必要があります。

ソースコード解析

CountDownLatchには、2つのメソッドしかなく、1つは countDown メソッドです。

countDown()

CountDownLatchには同期内部クラスSyncがあり、AQSステートを使用してカウントを表現し、同期制御を実装します。

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
 private static final class Sync extends AbstractQueuedSynchronizer {
 private static final long serialVersionUID = 4982264981922014374L;
 Sync(int count) {
 setState(count);
 }
 int getCount() {
 return getState();
 }
 protected int tryAcquireShared(int acquires) {
 return (getState() == 0) ? 1 : -1;
 }
 protected boolean tryReleaseShared(int releases) {
 // Decrement count; signal when transition to zero
 for (;;) {
 int c = getState();
 if (c == 0)
 return false;
 int nextc = c-1;
 if (compareAndSetState(c, nextc))
 return nextc == 0;
 }
 }
 }
 

countDownメソッドは、SyncのreleaseShared()メソッドを呼び出します。

public void countDown() {
 sync.releaseShared(1);
}

すべてのスレッドはawait()メソッドの呼び出しによってブロックされ、countDown()がstate=0になるまで待つことができます。

protected boolean tryReleaseShared(int releases) {
 // Decrement count; signal when transition to zero
 //スピンを使用してstate-1を実装する
 for (;;) {
 int c = getState();
 if (c == 0)
 return false;
 int nextc = c-1;
 if (compareAndSetState(c, nextc))
 return nextc == 0;
 }
}

tryReleaseSharedは、stateが0までデクリメントされるとtrueを返し、そうでなければstate-1の値を返します。state=0の場合は、doReleaseShared()メソッドを呼び出して待機中のスレッドをウェイクアップします。

private void doReleaseShared() {
 for (;;) {
 Node h = head;
 if (h != null && h != tail) {
 int ws = h.waitStatus;
 if (ws == Node.SIGNAL) {
 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 continue; // loop to recheck cases
 unparkSuccessor(h);
 }
 //PROPAGATE共有モードであることを示すノードの状態は、スレッドのウェイクアップを伝播する
 else if (ws == 0 &&
 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 continue; // loop on failed CAS
 }
 //先にウェイクアップしたスレッドがHEADを占有している場合、ループが再度実行され、HEADノードが変更されたかどうかをチェックし、変更された場合はループを継続する。
 if (h == head) // loop if head changed
 break;
 }
}

を、スレッドがウェイクアップされる実行順に並べます:

  • h == head はヘッド・ノードが使用されていないことを示します;
  • unparkSuccessor(h) ウェイクアップするスレッドを示します;
  • h != head 先頭ノードが今起きたスレッドによって占有されていることを示します。

await()

public void await() throws InterruptedException {
 sync.acquireSharedInterruptibly(1);
}

state < 0の場合、doAcquireSharedInterruptibly()メソッドを実行して、現在のスレッドを共有ロック・キューに追加する必要があります。

public final void acquireSharedInterruptibly(int arg)
 throws InterruptedException {
 if (Thread.interrupted())
 throw new InterruptedException();
 if (tryAcquireShared(arg) < 0)
 doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
 throws InterruptedException {
 //SHARED共有モードの場合は、キューに共有モード・ノードを作成する。
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
 for (;;) {
 final Node p = node.predecessor();
 if (p == head) {
 //ロックの取得を試みる
 int r = tryAcquireShared(arg);
 //ロックが獲得され、状態!=0,次のコードは実行されない。
 if (r >= 0) {
 //ウェイクアップしたノードをHEADノードに設定する。
 setHeadAndPropagate(node, r);
 p.next = null; // help GC
 failed = false;
 return;
 }
 }
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 throw new InterruptedException();
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
}

最初のスレッドが起動してHEADノードに設定されると、今度は2番目のスレッドが起動します......。

private void setHeadAndPropagate(Node node, int propagate) {
 Node h = head; // Record old head for check below
 setHead(node);
 /*
 * Try to signal next queued node if:
 * Propagation was indicated by caller,
 * or was recorded (as h.waitStatus either before
 * or after setHead) by a previous operation
 * (note: this uses sign-check of waitStatus because
 * PROPAGATE status may transition to SIGNAL.)
 * and
 * The next node is waiting in shared mode,
 * or we don't know, because it appears null
 *
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 */
 if (propagate > 0 || h == null || h.waitStatus < 0 ||
 (h = head) == null || h.waitStatus < 0) {
 Node s = node.next;
 if (s == null || s.isShared())
 doReleaseShared();
 }
}

Semaphore

定義

単語の意味から信号灯として理解され、それは同時にプログラムにアクセスするスレッドの数を制御することができます、例えば、駐車場は100台の駐車スペースの合計を持って、その後、突然150台の車が駐車場に駐車する必要があるようになった、あなたは駐車場に残りの車が車の完全な駐車場に入ることを許可する空きスポットができるまで待たなければなりません。このような利用シナリオは、交通の流れを制限するために使用することができます。

acquire()はライセンスを取得し、release()はライセンスを解放します。

ソースコード解析

FairSync 公平な戦略

static final class FairSync extends Sync {
 private static final long serialVersionUID = 2014338818796000944L;
 FairSync(int permits) {
 super(permits);
 }
 protected int tryAcquireShared(int acquires) {
 for (;;) {
 //キューにスレッドがあるかどうかを判断し、CAS処理を行う。
 if (hasQueuedPredecessors())
 return -1;
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0 ||
 compareAndSetState(available, remaining))
 return remaining;
 }
 }
}

NonFairSync 非フェア戦略

フェアな戦略とフェアでない戦略は、hasQueuedPredecessors()の判断の一つに過ぎません。

static final class NonfairSync extends Sync {
 private static final long serialVersionUID = -2694183684443567898L;
 NonfairSync(int permits) {
 super(permits);
 }
 protected int tryAcquireShared(int acquires) {
 return nonfairTryAcquireShared(acquires);
 }
}

nonfairTryAcquireShared() メソッドのソースコード:

final int nonfairTryAcquireShared(int acquires) {
 for (;;) {
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0 ||
 compareAndSetState(available, remaining))
 return remaining;
 }
}

残りのソースコードは CountDownLatchの ものと同じで、共有ロック実装に基づいています。

CyclicBarrier

定義

フォーメーションという言葉の意味は、円形の障壁と理解されます。いわゆるバリアは同期ポイントであり、この同期ポイントに到達するスレッド群がブロックされると、この同期ポイントに到達する最後のスレッドだけがバリアのドアを開け、ドアの外にブロックされたすべてのスレッドがドアに入って作業を続行します。これは、すべてのサブスレッドがタスクを完了してからメインスレッドが実行されるというシナリオに適用できます。

ソースコード解析

public CyclicBarrier(int parties, Runnable barrierAction) {
 if (parties <= 0) throw new IllegalArgumentException();
 this.parties = parties;
 this.count = parties;
 this.barrierCommand = barrierAction;
}

コンストラクタのパラメータである parts は、関係するスレッドの数を示します。 parts は、各スレッドが await() メソッドを呼び出すたびに 1 ずつ減算され、バリアのゲートを通過するとリセットされます。2番目のパラメータ barrierAction は、最後に到着したスレッドによって実行される Runnable インスタンスです。

Condition

定義

これは、マルチスレッド通信を調整するために使用されるツールクラスであり、スレッドが特定の条件を待ってブロックするとき、条件が満たされたときに起動されます。

ソースコード解析

await()とsignal()という2つの重要なメソッドがあります。

await()

このメソッドを呼び出すと、スレッドは待ち行列に入ってロックを解放し、スレッドの状態は待ち状態になります。

public final void await() throws InterruptedException {
 //スレッドの割り込みを許可する
 if (Thread.interrupted())
 throw new InterruptedException();
 //状態をconditionとするノードを作成し、データを格納するためにリンクリストを使用する。
 Node node = addConditionWaiter();
 //現在のロックを解放し、ロックの状態を取得し、待ち行列のスレッドを解放する。
 int savedState = fullyRelease(node);
 int interruptMode = 0;
 //ポイントがキューにあるかないかを判断する
 while (!isOnSyncQueue(node)) {
 //現在のスレッドをハングする
 LockSupport.park(this);
 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
 break;
 }
 //acquireQueuedfalseの場合、ロックは獲得される。
 //interruptMode != THROW_IEこのスレッドはノードのキューイングに成功しなかったが、signalがenqメソッドを実行してキューイングしたことを意味する。
 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
 //この変数をREINTERRUPTに設定する。
 interruptMode = REINTERRUPT;
 //ノードの次の待ち行列が空でなければ、クリーンアップ処理が開始され、条件ノードがクリーンアップされる
 if (node.nextWaiter != null) // clean up if cancelled
 unlinkCancelledWaiters();
 //スレッドが中断された場合は、例外をスローする必要がある。
 if (interruptMode != 0)
 reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter()
private Node addConditionWaiter() {
 Node t = lastWaiter;
 // If lastWaiter is cancelled, clean out.
 //lastWaiterがnullでなく、waitStatusがconditionでない場合、そのノードをチェーンから外す。
 if (t != null && t.waitStatus != Node.CONDITION) {
 unlinkCancelledWaiters();
 t = lastWaiter;
 }
 //状態条件を持つ一方向リストの作成
 Node node = new Node(Thread.currentThread(), Node.CONDITION);
 if (t == null)
 firstWaiter = node;
 else
 t.nextWaiter = node;
 lastWaiter = node;
 return node;
}
  • fullyRelease()メソッドのソースコード:
final int fullyRelease(Node node) {
 boolean failed = true;
 try {
 //獲得までの再入力回数
 int savedState = getState();
 //同期キュー内のスレッドを解放してウェイクアップする
 if (release(savedState)) {
 failed = false;
 return savedState;
 } else {
 throw new IllegalMonitorStateException();
 }
 } finally {
 if (failed)
 node.waitStatus = Node.CANCELLED;
 }
}
  • isOnSyncQueue()メソッドのソースコード:
final boolean isOnSyncQueue(Node node) {
 //falseはキューに入っていないことを意味し、trueはキューに入っていることを意味する。
 if (node.waitStatus == Node.CONDITION || node.prev == null)
 return false;
 if (node.next != null) // If has successor, it must be on queue
 return true;
 /*
 * node.prev can be non-null, but not yet on queue because
 * the CAS to place it on queue can fail. So we have to
 * traverse from tail to make sure it actually made it. It
 * will always be near the tail in calls to this method, and
 * unless the CAS failed (which is unlikely), it will be
 * there, so we hardly ever traverse much.
 */
 /AQS待ち行列を末尾ノードから順にスキャンし、AQS待ち行列内のノードが現在点と等しいことが判明した場合、そのノードは待ち行列に存在しなければならないことを意味する。
 return findNodeFromTail(node);
}

signal()

このメソッドを呼び出すと、AQSキュー内のノードがウェイクアップされます。

public final void signal() {
 //現在のスレッドがロックを獲得したかどうかを判断する
 if (!isHeldExclusively())
 throw new IllegalMonitorStateException();
 //AQSキュー内の最初のノード
 Node first = firstWaiter;
 if (first != null)
 doSignal(first);
}
  • doSignal()メソッドのソースコード:
private void doSignal(Node first) {
 do {
 //条件キューから最初のノードを削除する
 if ( (firstWaiter = first.nextWaiter) == null)
 lastWaiter = null;
 first.nextWaiter = null;
 } while (!transferForSignal(first) &&
 (first = firstWaiter) != null);
}
  • transferForSignal()メソッドのソースコード:
final boolean transferForSignal(Node node) {
 /*
 * If cannot change waitStatus, the node has been cancelled.
 */
 //ノードの状態を0に更新する
 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 return false;
 /*
 * Splice onto queue and try to set waitStatus of predecessor to
 * indicate that thread is (probably) waiting. If cancelled or
 * attempt to set waitStatus fails, wake up to resync (in which
 * case the waitStatus can be transiently and harmlessly wrong).
 */
 //enqを呼び出し、現在のポイントをAQSキューに追加する。そして、現在のノードの前のノード、つまり元の末尾ノードに戻る。
 Node p = enq(node);
 int ws = p.waitStatus;
 //前のノードがキャンセルされた場合、セットポイントの試行状態はSIGNALとなる。
 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
 //ノードのスレッドをウェイクアップする
 LockSupport.unpark(node.thread);
 return true;
}
  • ブロッキング: await()メソッドでは、スレッドがロック・リソースを解放した後、ノードがAQS待ち行列になければ、現在のスレッドをブロックし、待ち行列にあれば、ロックを取得しようとスピンして待機します;
  • リリース:signal()の後、ノードは条件キューからAQS待ちキューに移動し、通常のロック獲得プロセスに入ります。
Read next

iOS:謎のKVO

1.オリジン Aspectsがオープンソース化された後、多くの友人からAspectsとObjective-Cの違いを聞かれました。 GitHubでAspectsを見つけ、勉強した結果、Aspectsもisa交換原則に基づいたフック操作であることを知りました。...

Jun 24, 2020 · 19 min read