概要
今回は、JUCパッケージのいくつかのツールについてお話します。
| Semaphore | 共有リソースにアクセスするスレッド数の制限 |
| CountDownLatch | スレッドはカウンターが0になるまで待ち、それから作業を開始します。 |
| CyclicBarrier | CountDownLatch |
Semaphore
セマフォは、特定のリソースに同時にアクセスするスレッド数を制御するために使用されるため、特にデータベース接続のようなパブリックリソースが制限されたアプリケーションシナリオでは、フロー制御に使用できます。例えば、何万ものファイルからデータを読み込む要件がある場合、IO集約的なタスクであるため、何十ものスレッドを起動して同時に読み込むことができますが、メモリに読み込んだ後、データベースに保存する必要があり、データベースへの接続が10個しかない場合、同時にデータベース接続を取得するスレッドが10個しかないように制御してデータを保存しなければ、データベースへの接続を取得できないと報告されます。コンストラクタで初期リソースの総数を渡したり、公平な同期を使用するかどうかを指定することができます。
// デフォルトでは、公平でない
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
主な方法は、獲得方法と放出方法です。
SemaphoreはAQSを継承した内部同期システムSyncを持っており、acquire()メソッドがpermitを要求するたびに、permitの数が0より少なくなるまで、状態が1ずつデクリメントされ、その後ブロックして待機します;
acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //を呼び出す前に、スレッドが中断されたかどうかをチェックする。
throw new InterruptedException();
// 共有リソースのロックを取得しようとするが、0より小さいと取得に失敗する。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// FairSync 公正なロックのためのtryAcquireSharedメソッド
protected int tryAcquireShared(int acquires) {
for (;;) { // スピンのデッドループでの操作方法
// スレッドがブロッキング・キューを持っているかどうかをチェックする。もし持っていれば、共有リソースのライセンス数が使い切られたことを意味し、キューイングの場合は-1を返す。
if (hasQueuedPredecessors())
return -1;
int available = getState(); // ロックリソースの最新のメモリ値を取得する
int remaining = available - acquires; // 残り許可数を計算する
if (remaining < 0 || // 残り許可数が0未満の場合、キューに入るために負の数を返す
compareAndSetState(available, remaining))
// 共有リソースが0以上の場合、最後の共有リソースはcompareAndSetState操作によって占有される
return remaining;
// 残りを取得した後、どのようなロジックが入力されようとも、処理が実行され、残りが返され、残りの値からキューイングが必要かどうか上位が判断する。
}
}
// NonfairSync 非フェアロック用のtryAcquireSharedメソッド
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// NonfairSync 非フェアロックの親クラスである Sync クラスの nonfairTryAcquireShared メソッド
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // スピンのデッドループでの操作方法
int available = getState(); // 最新のライセンスを取得する
int remaining = available - acquires; // 残り許可数
if (remaining < 0 || // 残り許可数が0未満の場合、負の数を返し、キューイングに進む
compareAndSetState(available, remaining)) // 共有リソースが0以上の場合
//CAS操作で最後の共有リソースを占有する
return remaining;
// getremainingの後にどのようなロジックが入力されるかにかかわらず
//オペレーションを実行し、残りを返す。上位レベルは、残りの値に基づいて、オペレーションをキューに入れる必要性を判断する。
}
}
release() メソッドは以下のとおりです。パーミットを解放すると、スレッドは、後継ノードが確実に起動されることによって、保持しているセマフォを確実に解放します;
public void release() {
sync.releaseShared(1); // ライセンスされたリソースを解放する
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 共有ロックリソースの解放を試みる。 このメソッドは、AQSの具象サブクラスであるsyncによって実装されている。
doReleaseShared(); // 後継ノードをウェイクアップするスピン操作
return true;
}
return false;
}
// NonfairSync と、FairSyncの親クラスであるSyncのtryReleaseSharedメソッドを使用している。
protected final boolean tryReleaseShared(int releases) {
for (;;) { // スピンのデッドループでの操作方法
int current = getState(); // 最新の共有ロックリソース値を取得する
int next = current + releases; // 許可数に対して加算処理を行う
// int型のステート値がオーバーフローしている可能性がある。
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) //
return true; // は成功フラグを返し、スレッドが共有ロックリソースを解放したことを上位レベルに伝える
}
}
使用例
この例では、最大3スレッドが10スレッドで動作することを想定しています。
public class Test {
static class MyThread extends Thread {
private int value;
private Semaphore semaphore;
public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("スレッド"+ value + "でライセンスを取得する。+ semaphore.availablePermits() +
"ライセンスは、"+ semaphore.getQueueLength() + "スレッドが待機している");
Random random =new Random();
Thread.sleep(random.nextInt(1000));
semaphore.release();
System.out.println("スレッド"+ value + "はライセンスを解放した");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException{
Semaphore semaphore = new Semaphore(3);
for(int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}
出力:
Semaphoreのデフォルトのacquireメソッドは、スレッドを待ち行列に入れ、割り込み例外を投げるものです。しかし、割り込みを無視したり、ブロックキューに入らないメソッドもあります:
// 割り込みを無視する
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
// は待ち行列に入らない。
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)
CountDownLatch
CountDownLatch は同期ヘルパーで、他のスレッドで実行された一連の処理が完了するまで、1つ以上のスレッドが待機できるようにします。CountDownLatch は内部的に Sync も定義しており、AQS を継承した同期器です。
工法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count); //countには
}
よく使われる方法は以下の通りです。
public void await() throws InterruptedException { }; //await()メソッドを呼び出したスレッドはハングし、実行を続ける前にカウント値が0になるまで待つ
//一定時間後にカウントが0になっていなければ、実行を続行する。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
public void countDown() { }; //カウント値を1減らす
CountDownLatchの流れを要約すると、次のようになります:
メインのタスクはawait経由でwaitに入ります。
CountDownLatchは、0より大きいカウンタ値のカウントを管理します。スレッドがcountDownメソッドを実行するたびに、カウンタ値が0になるまでカウンタ値が1ずつ減らされ、その後、キューで待機しているスレッドが解放されます。
使用例
例えば、ゲーム(メインクエスト)に入るとき、通常、「マップデータのロード」、「キャラクターモデルのロード」、「BGMのロード」などのプレタスクがあります。".これらのタスクが完了して初めてゲームに入ることができます。CountDownLatchを使って、このシナリオをシミュレートしてみましょう。
public class Test {
//先行タスク
static class MyThread extends Thread {
private String taskName;
private CountDownLatch countDownLatch;
public MyThread(String taskName, CountDownLatch countDownLatch) {
this.taskName = taskName;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Random random =new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(taskName + "タスク完了");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException{
CountDownLatch countDownLatch = new CountDownLatch(3);
//
new Thread(new Runnable() {
@Override
public void run() {
try{
System.out.println("進行中の他のタスクを待つ");
System.out.println("と"+ countDownLatch.getCount() + "を呼び出す");
countDownLatch.await(); //メインタスクは、他のタスクの完了を待つ
System.out.println("すべての先行タスクが完了し、メインタスクを開始する");
}catch (InterruptedException e) {
e.printStackTrace();
}
}}).start();
// 先行タスク
new Thread(new MyThread("マップデータをロードする ", countDownLatch)).start();
new Thread(new MyThread("キャラクターモデルをロードする ", countDownLatch)).start();
new Thread(new MyThread("BGMをロードする ", countDownLatch)).start();
}
}
出力:
進行中の他のクエストを待機中 あと3つのクエストを保留中 マップデータをロード クエスト完了 キャラクターモデルをロード クエスト完了 背景音楽をロード クエスト完了 すべてのプリクエスト完了、メインクエスト開始
await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// が呼び出される前にスレッドが割り込まれたかどうかを検出する。
if (Thread.interrupted())
throw new InterruptedException(); // は、割り込まれた場合に割り込み例外をスローする。
// 共有リソースのロック取得を試み、0より小さい場合は取得に失敗する。このメソッドは、CountDownLatchの具象サブクラスであるSyncによって実装されている。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg); // は、ロックリソースを取得しようとしたスレッドをキューに入れる。
}
//このメソッドは、カウンターの値が0かどうかを判定する
protected int tryAcquireShared(int acquires) {
// カウンターの値がゼロと比較され、ゼロであればロック獲得に成功し、そうでなければロック獲得に失敗する。
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 与えられたモードパターン(Node)に従って新しいノードを作成する。.EXCLUSIVE排他モード、ノード.SHARED共有パターン;
final Node node = addWaiter(Node.SHARED); // 共有パターンノードの作成
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // ノードの前任者ノードを取得する
if (p == head) { //先行ノードがheadの場合、ロックの獲得を試みる。
int r = tryAcquireShared(arg);
/*
>=0これは、共有ロックリソースの獲得に成功したことを示す.
は現在のノードをヘッド・ノードとして設定し、このメソッドはdoReleaseSharedを呼び出して無駄なノードを解放する。
*/
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/*
2番目のノードがロック獲得に成功しなかった場合、最初のループのこの時点で
ノードのwaitStatus=0を呼び出すことで、一旦SIGNAL状態に変更される。次に2番目のループで
shouldParkAfterFailedAcquireメソッドを呼び出している。
parkメソッドは待ちをブロックする。
*/
if (shouldParkAfterFailedAcquire(p, node) && //先行ノードに基づいてブレークが必要かどうかを判断する
// 通常、共有ロックは取得されず、コードはそのメソッドで停止する。
parkAndCheckInterrupt())
throw new InterruptedException();
// ウェイクアップされた後、parkAndCheckInterrupt()で割り込みが検出されると、例外がスローされる。
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown()
public void countDown() {
sync.releaseShared(1); // ライセンスされたリソースを解放する
}
//を解放するメソッドである。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 共有ロックリソースの解放を試みる。このメソッドはSyncで実装されている
doReleaseShared(); // 後継ノードをウェイクアップするスピン操作
return true; // は、すべてのスレッドが解放されたことを示すtrueを返す。
}
return false; // は、カウンターの値がゼロでない限り、まだ完全に解放されていないことを示すためにfalseを返す。
}
//このメソッドは、カウンターを1つ減らそうとするもので、CountDownLatchの静的内部クラスであるSyncクラスで実装されている。
protected boolean tryReleaseShared(int releases) {
for (;;) {
// 最新のカウンタ値を取得し、それが0であれば、CAS操作によってゼロになったことを意味する。
int c = getState();
if (c == 0)
return false;
int nextc = c-1; // カウンターの値から1を引いた値
if (compareAndSetState(c, nextc)) // CAS比較は、セットアップが成功した場合に真を返す。
return nextc == 0;
//CAS操作が成功し、カウント値の状態が0になると、待機していたすべてのスレッドがキューから解放される。
// CASが失敗した場合、次のループで、他のスレッドですでに処理されていないかチェックする
}
}
//このメソッドは、先頭ノードを空にして、後継ノードをウェイクアップしようとする
private void doReleaseShared() {
for (;;) {
Node h = head; // は毎回キューの先頭ノードを取り出す。
if (h != null && h != tail) { // 先頭ノードが空でなく、キューの末尾ノードでもない場合
int ws = h.waitStatus; //ヘッドノードのwaitStatusステータス値を取得する
if (ws == Node.SIGNAL) { // ヘッドノードがSIGNALの場合、ヘッドノードの後継ノードをウェイクアップする必要があることを意味する。
//CASを介してヘッドノードの状態を空に設定しようとし、失敗した場合は、他のスレッドも解放している可能性があるため、ループを継続する。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 先頭ノードの後継ノードをウェイクアップする
}
//headノードが空の場合、PROPAGATE状態に変更するが、他のスレッドが変更したため失敗する。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// ヘッドノードに何も起こらなければ、上記のセットアップは完了だ。
// 何か変更があった場合、おそらく操作中に新しいヘッドノードが追加された場合、ウェイクアップアクションが引き続き
if (h == head)
break;
}
}
CyclicBarrier
すなわち、複数のスレッドは同期ポイントに達するとブロックされ、最後のスレッドが同期ポイントに達するまでバリアは開かれません。一方、CyclicBarrier は CountDownLatch のすべての機能を持つだけでなく、reset() メソッドを使用してバリアをリセットすることもできます。
工法
// コンストラクタ I
public CyclicBarrier(int parties) {
this(parties, null);
}
// コンストラクタ
//partiesパラメーターは、参加できるスレッドの最大数であり、最後にブロックされたスレッドが解放されると、そのスレッドはタスク barrierAction
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
使用例
CountDownLatchの例でも、先行タスクは3つしかありませんが、メインタスクは4つあります。
プレタスク
public class MyTask implements Runnable {
private String taskName;
private CyclicBarrier cyclicBarrier;
public MyTask(String taskName, CyclicBarrier cyclicBarrier) {
this.taskName = taskName;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
//バリアは全部で4つあり、各バリアiは、現在の前タスクスレッドtaskNameをawaitに入れる責任を持つ。
for(int i = 1; i < 5; i++) {
try {
Random random =new Random();
Thread.sleep(random.nextInt(1000));
System.out.println("バリア"+ i + " + taskName + "タスク完了");
cyclicBarrier.await(); //先行タスクtaskNameはwaitに入り、バリアiの他のすべての先行タスクがawaitに入るまで待つ。
} catch (Exception e) {
e.printStackTrace();
}
cyclicBarrier.reset(); //バリアをリセットし、次のレベルiに進む+ 1前タスクのタスク名
}
}
}
テスト:
@Test
public void testCycleBarrier() throws Exception {
//は、ここで出力タスクを実行する前に、3つの先行スレッドがawaitするのを待たなければならない。
CyclicBarrier cb = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.print("が選択されている。"+ Thread.currentThread().getName() + "終了文を出力する: ");
System.out.println("このレベルのすべての先行タスクが完了した。);
}
});
//3プリエンプティブなタスク
new Thread(new MyTask("マップデータのロード", cb), "マップデータのロードスレッド").start();
new Thread(new MyTask("キャラクタモデルのロード", cb), "キャラクタデータスレッドのロード").start();
new Thread(new MyTask("バックグラウンド音楽をロード", cb), "バックグラウンドデータスレッドをロード").start();
Thread.sleep(5000); //mainスレッドはしばらくスリープし、そうでなければスレッドの実行を待たない。
}
輸出
レベル1のキャラクターモデルをロードするタスクが完了した。
レベル 1 のマップ・データをロードするタスクが完了する
レベル1のBGMをロードするタスクが完了した
終了ステートメントを出力するために、Load Background Data スレッドが選択される!
レベル2のBGMロード・タスクが完了する
レベル2のロード・マップ・データ・タスクが完了した。
レベル2のロード・キャラクタ・モデル・タスクが完了する
ロードキャラクターデータスレッドが選択され、終了ステートメントが出力される!
レベル3のロード・マップ・データ・タスクが完了した。
レベル3のロード・キャラクタ・モデル・タスクが完了した。
レベル3のBGMロード・タスクが完了した。
終了ステートメントを出力するために、Load Background Data スレッドが選択される!
レベル4のロード・マップ・データ・タスクが完了した。
レベル4のロード・キャラクタ・モデル・タスクが完了した。
レベル4のBGMロード・タスクが完了した。
終了ステートメントを出力するために、Load Background Data スレッドが選択される!
CyclicBarrier には await メソッドが 1 つしかありません。CyclicBarrierでは、コンストラクタ・メソッドにRunnable型のオブジェクトを渡すことで、バリアに達したときにタスクを実行することができます。上記の例では、バリアに達した後、プリエンプティブ・タスク・スレッドが選択され、終了文
await()
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
/*
dowaitメソッドは、CyclicBarrierのブロッキング待ち実装の核となるメソッドであり、awaitメソッドが呼ばれると、ブロッキング待ちがキャンセルされる。
キュー・トリップは.
スレッドがawaitからジャンプアウトすると、通常であればセマフォが送られ、ブロッキングが解除される。
はAQSの待ち行列に転送される。その後、徐々にロックが解放され、最後にCyclicBarrierは次の呼び出しのための初期値の状態になる;
そのため、CyclicBarrierはプロセス一式を使い切るたびに初期状態の値に戻り、また別の場所で新しく生成されたオブジェクトのように扱うことができる。
を使うことで、ループバリアになっている;
*/
private int dowait(boolean timed, long nanos)throws InterruptedException,BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock; // 排他ロックの取得
lock.lock(); // は、親クラスAQSのCLHキューによってブロックされているが、クリティカルゾーンに降りてtryメソッドを実行し続ける理由は、トリップの.await()このコード
try {
final Generation g = generation;
if (g.broken) //均衡が破れると、他のすべてのスレッドが例外をスローし
throw new BrokenBarrierException();
//は、スレッドが他の場所で中断されたかどうかを検出し、いずれかのスレッドが中断された場合、バランスを崩し、バランスを崩すフラグである
//を呼び出すことで、ブロックされているすべてのスレッドをウェイクアップする前に初期状態の値を復元する。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//countは、ロック・ブロック・キューにいくつのスレッドが残っているかを示す
int index = --count;
if (index == 0) { // カウント値が0になったら、すべてのスレッドの実行が終了したことを示す。
boolean ranAction = false;
try {
final Runnable command = barrierCommand; // コンストラクタ・メソッドに渡されるインターフェース・コールバック・オブジェクト
if (command != null) // インターフェイスが空でない場合、最後に実行されたスレッドがタスクを実行する機会を持つ。
command.run();
ranAction = true;
nextGeneration(); // を初期状態に戻すことで、次回も再利用できるようにする。
return 0;
} finally {
if (!ranAction) // 最後のスレッドが例外をスローすると、全体のバランスが崩れる。
breakBarrier();
}
}
for (;;) { // スピンのデッドループでの操作方法
try {
// タイムアウト待ちセマフォを使う必要がない場合は、次のように直接tripを呼び出す。.await()はブロッキング待機に入る
if (!timed)
// 通常、コードの実行はここで停止し、メソッドはすでに内部でparkメソッドを呼び出しているため、スレッドはブロックして待機することになる
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); // 指定された時間だけセマフォを待つ
} catch (InterruptedException ie) { // の割り込みによるブロッキング待ちの間にタスクが中断された場合、そのタスクの実行は中断されない。
// 識別子ビットがまだ置き換えられていない場合は
//を実行し、残高フラグがまだfalseの場合、残高が壊れ続け、割り込み例外がスローされる。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken) // を実行し、もしバランスが崩れたら、他のすべてのスレッドが例外を投げる。
throw new BrokenBarrierException();
if (g != generation) // すでにリダイレクトされている場合は、インデックス値が直接返される
return index;
// タイムアウトフラグが設定され、nanosの値が渡されるか、待機後にnanosが返されると
//は、残高がゼロ以下になるたびに残高を減らす。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
//は、すべてのブロックされたスレッドを目覚めさせる前に、均衡を破り、均衡を破るフラグを設定する;
private void breakBarrier() {
generation.broken = true; // バランスを崩すフラグを設定する
count = parties; // カウントを初期値に戻す
trip.signalAll(); // はセマフォを送り、すべてのコンディションで待ち行列をウェイクアップする。
}
//は、コンディションで待機しているすべてのキューをウェイクアップし、その後
//を実行し、世代への参照をリフレッシュして変更し、次の処理に備える;
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}





