前回の記事ではReentrantLockの非フェアロックを例にAQSの原理についてお話しましたので、今日はフェアロックから始めましょう。
非フェアロックについてあまりよくわからない方は、前回の記事をご覧ください。
ReentrantLockの公平なロックは
前の記事で、公平でないロックはロックをキューに入れる前に2回ロックを取ろうとする、という話をしました。公平なロックの場合、キューに入っているスレッドがあるかどうかを確認し、もしあれば、いい子にしてブロッキングキューの最後にキューに入れ、先にロックを取る前にキューに入っているスレッドがない、ということが考えられます。
ソースコードの比較です。
フェアロック:
final void lock() {
acquire(1);
}
/**ロック・リソースの取得を試みる**/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非フェアロック:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
/**ロックの取得を試みる**/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/**ロックを獲得しようとするとき、フェア・ロックはhasQueuedPredecessors()メソッドを使ってキューにスレッドがあるかどうかを検出し、あれば失敗を返す。**/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ロックを追加すると、その違いがはっきりとわかります。非フェアロックはまずロックリソースを取得しようとします。次に、ロックリソースを取得しようとするとき、公平ロックはまずキューに入っているスレッドがあるかどうかを検出し、もしあれば直接失敗を返します。
この質問に対する鍵は、キューが空になるとコンシューマースレッドがハングし、キューが一杯になるとプロデューサースレッドがハングするような、ブロッキングキューを実装することだと思います。
waitとnotifyによるProducer-Consumerモデルの実装
public class ProducerAndConsumer { List<String> apple = new ArrayList<>(); public synchronized void produce() { while (apple.size() ==5) { try { System.out.println("キューは満杯で、プロデューサーは待機している。"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String aa = UUID.randomUUID().toString().split("-")[0]; apple.add(aa); System. out .println(Thread.currentThread().getName()+"リンゴの生成に成功!"+aa); notify(); } public synchronized void consumer() { while (apple.size() ==0) { try { System.out.println("キューは空になり、コンシューマーは待機する。"); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String aa = apple.get(apple.size()-1); apple.remove(aa); System. out.println(Thread.currentThread().getName()+"リンゴの消費に成功した!"+aa); notify(); } }ここでは、生成されたデータを保存するために List を使用し、スレッドセーフであることが保証されている produce() および consume() インターフェイスを外部で提供します。
ReentrantLockの条件付きProducer-Consumerモデルの実装
public class ProducerAndConsumer_Lock { private List<String> apples = new ArrayList<>(); private Lock lock = new ReentrantLock(); private Condition fullCondition = lock.newCondition(); private Condition emptyCondition = lock.newCondition(); //スレッドセーフなキュー追加メソッド public void produce() { lock.lock(); try { while (apples.size() == 5) { /**キューが一杯になると、プロデューサースレッドは待ち行列に追加され、キューが一杯になると起こされる。**/ System.out.println("キューが一杯になると、プロデューサーは待機し、消費するためにコンシューマーに通知を始める"); //キューが一杯になると、プロデューサーは待つ fullCondition.await(); } // String aa = UUID.randomUUID().toString().split("-")[0]; apples.add(aa); System.out.println(Thread.currentThread().getName()+"リンゴの生成に成功!"+aa); //コンシューマーの条件が満たされたら、コンシューマースレッドをウェイクアップする。 emptyCondition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } //スレッドセーフなコンシューマー・メソッド public void consume(){ lock.lock(); try { while(apples.size() == 0){ /**キューが空になると、コンシューマースレッドは待ち行列に追加され、キューにデータがあるときに起こされる。**/ System.out.println("キューが空になると、コンシューマーは待機し、プロデューサに生成を通知し始める」)。; emptyCondition.await(); } //消費開始 String aa = apples.get(apples.size()-1); apples.remove(aa); System. out.println("リンゴの消費に成功した!"+aa); //条件が満たされたらプロデューサースレッドをウェイクアップする。 fullCondition.signalAll(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
Condition構造解析
lock.newCondition() によって Condition キューが生成されると、実際には新しい ConditionObject オブジェクトが作成されます。
public Condition newCondition() {
return sync.newCondition();
}
/**ConditionObjectAQSの静的内部クラスである。**/
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
/**条件キューを構築するセンチネルノード**/
private transient Node firstWaiter;
private transient Node lastWaiter;
}
前回の記事でNodeノードを分析した際、Node nextWaiterというプロパティがあり、これは条件付きキューをリンクするために使用されると述べました。
static final class Node {
Node nextWaiter;
}
つまり、条件付き待ち行列は実際には一方向リンクリストであることがわかります。
- 条件に対して await() がコールされると、現在のスレッドがこの条件キューの一方向連鎖リストに追加されます。
- 他のスレッドが対応するConditionでsingal()を呼び出すと、対応するConditionの一方向リストにあるノードを、前回述べたようにブロックキュー、つまりロック資源の獲得待ちの双方向リストに移し、ロック資源の獲得競争に参加します。
ソースコード解析
コンディションはロックに依存して作成され、ロックは複数のコンディションキューを作成することができます。
await()が呼ばれたとき。
public final void await() throws InterruptedException {
// スレッドの中断を検出する
if (Thread.interrupted())
throw new InterruptedException();
//現在のスレッドが待ち行列に加わる
Node node = addConditionWaiter();
//
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* このノードのスレッドが同期キューに入っているかどうかを検出し、もし入っていなければ、そのスレッドはロックを獲得する資格がないので、待ち続ける。
* 実行は、同期キューでノードが検出されるまで続けられる。
*/
while (!isOnSyncQueue(node)) {
//スレッドがハングする
LockSupport.park(this);
//すでに中断されている場合は、awaitメソッドから抜ける。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//競合同期
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//条件待ちをしていないノードの条件待ちキューをクリアする。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//条件キューにスレッドを追加する
private Node addConditionWaiter() {
Node t = lastWaiter;
// 条件キューの最後のノードがデキューされると、デキューされたノードを削除するクリーンアップが実行される。
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**あるポイントがすでにブロッキング・キューに入っているかどうかを検出する**/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
Conditionのsignal()メソッドを呼び出すと、待機キューで最も長い時間待機しているノードがウェイクアップされ、ウェイクアップする前にそのノードをCLH同期キューに移動させます。
condition.signal()を呼び出すとウェイクアップが発生し、このときウェイクアップはキューの先頭となり、firstWaiter(キューの先頭)に対応するコンディションはブロックキューのキューの末尾に移動し、ロックの取得を待ち、ロックが取得された後、awaitメソッドは次の実行を継続するために戻ることができます。
public final void signal() {
//現在のスレッドがロックを持つ唯一のスレッドかどうかを検出する
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//headノードは、条件キューの最初のノードである。
Node first = firstWaiter;
if (first != null)
doSignal(first); //
}
/** 1. まず必要なのは、現在のスレッドがロックを保持しているかどうかをチェックすることで、これが前提条件となる。
2. 待ち行列の最初のノードをウェイクアップする。**/
private void doSignal(Node first) {
do {
//ヘッダー・ノードを修正し、古いヘッダー・ノードを削除する。
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**ノードをブロッキング・キューに移動し、ウェイクアップする。**/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
スレッドがブロッキング・キューの双方向チェーン・テーブルに移動すると、ウェイクアップされた後、下から実行を開始します。この時、ノードは既に同期キューに入っているので、whileループから飛び出してacquireQueued()メソッドを実行し、ロックリソースの獲得を試み、獲得できなければハングします。詳しくは前回の記事をご覧ください。
public final void await() throws InterruptedException {
// スレッドの中断を検出する
if (Thread.interrupted())
throw new InterruptedException();
//現在のスレッドが待ち行列に加わる
Node node = addConditionWaiter();
//
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* このノードのスレッドが同期キューに入っているかどうかを検出し、もし入っていなければ、そのスレッドはロックを獲得する資格がないので、待ち続ける。
* 実行は、同期キューでノードが検出されるまで続けられる。
*/
while (!isOnSyncQueue(node)) {
//スレッドがハングする
LockSupport.park(this);
//すでに中断されている場合は、awaitメソッドから抜ける。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//競合同期
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//条件待ちをしていないノードの条件待ちキューをクリアする。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}




