はじめに
synchronized欠点
Javaのネイティブ・ロックであるオブジェクト・ベース・ロックが導入されました。これは一般に synchronized キーワードと組み合わせて使用されます。しかし、synchronized キーワードにはいくつかの欠点があります:
クリティカルゾーンの読み取り操作だけであれば、複数のスレッドが一緒にクリティカル ゾーンを実行することも可能です。しかし、synchrnoizedでは一度にクリティカルゾーンを実行できるスレッドは1つだけです。
synchrnoizedは、スレッドがロックの取得に成功したかどうかを知る方法がありません。
オブジェクト・ロックを取得したスレッドが、クリティカル・エリアのIOやスリープ・メソッドなどのためにブロックしても、ロックを解放しないので、他のスレッドが待たされることになります。
JavaはJUCパッケージの下でロックに関する多くのクラスとインターフェイスを提供しており、上記の問題はすべてJUCパッケージのロックによって解決できます。
ロックにはいくつかの分類があります。
再入可能ロックと非再入可能ロック
リエントラント・ロックとは、あるスレッドがオブジェクトのロックを取得した後、そのスレッドはそのオブジェクトのロックを再度取得できますが、他のスレッドは取得できないということです。
継承された AQS で synchroniser を実装する場合、ロックを所有するスレッドが再びロックを取得するシナリオを考慮する必要があります。
フェアなロックとアンフェアなロック
ReentrantLock は非フェアロックとフェアロックの両方をサポートしています。
フェアロックの利点は、ロックを待っているスレッドが飢え死にしないことです。欠点は、全体的なスループット効率が非フェアロックに比べて相対的に低いということです、最初のスレッドに加えてキュー内のすべてのスレッドを待っていると、非フェアロックのオーバーヘッドよりも、CPUのウェイクアップスレッドがブロックされます。
一般的に、非フェアロックは、スレッドがブロッキングせずに直接それを取得できる可能性があるため、効率がいくらか向上します。しかし、スレッド飢餓が発生する可能性があります。
読み取り/書き込みロックと除外ロック
synchronizedキーワードとReentrantLockクラスは排他的ロックで、同時にアクセスできるスレッドは1つだけです。一方、read/writeロックは複数のスレッドが同時にアクセスできますが、書き込みスレッドがアクセスすると、すべての読み取りスレッドと他の書き込みスレッドはブロックされます。
Javaは、読み取り/書き込みロックのデフォルト実装としてReentrantReadWriteLockクラスを提供し、2つのロックの内部保守:読み取りロック、書き込みロック。読み取りと書き込みのロックを分離することで、"read more write less "環境、大幅にパフォーマンスを向上させます。
AQS
属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
static final class Node {
/*
waitStatusにはいくつかの列挙値がある:
(1)0 ノードが初期化されたときのデフォルト値。
(2)CANCELLED 値 1 は、そのスレッドのロック取得要求がキャンセルされたことを意味する。
(3)CONDITION 値 -2 は、ノードが待ち行列にあり、ノードのスレッドがウェイクアップされるのを待っていることを意味する。
(4)PROPAGATE が-3の場合、このフィールドは現在のスレッドがSHAREDの場合にのみ使用される。
(5)SIGNAL が-1である場合、スレッドは準備ができており、リソースの解放を待っているだけであることを意味する。
*/
volatile int waitStatus; //ポイントがキューにあるとき
volatile Node prev;
volatile Node next;
volatile Thread thread; //このノードのスレッド
Node nextWaiter;
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
}
AQS は内部的に、リソースを識別するために volatile 変数 waitStatus を使用します。また、状態を取得し再フォーマットするためのいくつかの protected メソッドを定義しており、サブクラスでオーバーライドして独自のロジックを実装することができます:
getState() //現在の同期状態を取得する。
setState() //現在の同期状態を設定する。
compareAndSetState() //CASを使用して現在の状態を設定する。 このメソッドは、状態がアトミックに設定されることを保証する。
3つのメソッドはすべてアトミック操作で、compareAndSetState()メソッドの実装はUnsafeのcompareAndSwapInt()メソッドに依存しています。
AQSクラス自身は多くのキューイングとブロッキングのメカニズムを実装しており、内部的にはキューの先頭と末尾を識別するためにheadとtailの2つのポインタを持つFIFOダブルエンド・キューを使用しています。スレッドがキューに追加されると、そのプロセスはスレッドセーフである必要があるため、同期器は、compareAndSetTail(Node expect, Node update)というCASベースのテール・ノード設定メソッドを提供します。
キュー内の先頭ノードは同期状態の獲得に成功したノードであり、同期状態を解放した後、同期状態の獲得に成功したときに自身を先頭ノードに設定する後継ノードをウェイクアップします。先頭ノードの設定は同期状態の取得に成功したスレッドが行い、取得に成功するスレッドは1つだけなので、先頭ノードを設定する方法はCASを保証する必要がありません。
メインロジックの一部のみを実装した抽象クラスで、いくつかのメソッドはサブクラスが実装します;
これに加えて、シンクロナイザーは多くのテンプレート・メソッドを実装しています:
AQSでは、リソースの共有に2つのモードがあります:
- 排他モード:リソースは排他的で、一度に1つのスレッドしか取得できません。ReentrantLock など。
- 共有モード:複数のスレッドから同時にアクセス可能で、特定のリソース数をパラメータで指定できます。Semaphore/CountDownLatchなど。
一般的に、サブクラスは要件に従って特定のモードだけを実装すればよいのですが、もちろんReadWriteLockのように両方のモードを実装する同期クラスもあります。
共有された同期状態の獲得と解放
共有同期状態取得
共有獲得ロックの使用では、複数のスレッドが同時に同期状態を獲得することが許可されます。たとえば、ファイルに対する読み取り操作では、複数のロックが許可されます。
同期状態は、synchroniserによって提供されるテンプレート・メソッドのacquireShared(int arg)によって共有方式で取得できます。このメソッドのソース・コードを以下に示します。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) //tryAcquireSharedサブクラスによる書き込み
doAcquireShared(arg);
}
tryAcquireSharedメソッドで同期状態の取得を試み、その戻り値が0以上の場合、同期状態を取得できることを示します。これは、shared-acquireスピンプロセスで同期状態の取得に成功し、スピンを終了する条件でもあります。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
スピンプロセス。現在のポイントの先行ノードがヘッドノードである場合、同期状態の取得を試みる。,
戻り値が0以上の場合、取得に成功し、スピンプロセスから抜ける。
*/
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共有取得リリース
共有フェッチは、releaseShared(int arg) メソッドを呼び出すことでリリースされた同期状態を解放し、解放時に待機状態の後続ノードをウェイクアップします。複数のスレッドが同期状態を解放する操作であるため、このメソッドは、同期状態が tryReleaseShared(int arg) メソッドを介して解放されるスレッドセーフであることを保証します。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
カスタム同期コンポーネント
**要件:**同期ツールのロック TwinsLock を設計し、同時にアクセスできるスレッドは 2 つまでで、2 つ以上のスレッドによるアクセスはブロックされます。
ロックを作成する場合、Lock インターフェースを継承する必要があります:
複数のスレッドが同時にアクセスすることをサポートするアクセスパターンであり、これは明らかに共有アクセスであるため、synchroniserが提供するacquireShared(int args)メソッドなどの共有関連メソッドを使用する必要があります。
次に、同期リソースの数は2、つまりステータスは2です。スレッドがフェッチするとステータスは1減少し、スレッドがリリースするとステータスは1増加します。ステータスが0の場合、同期ステータスをフェッチしようとするスレッドはブロックされるだけです。compareAndSet(int expect,int update)メソッドは、同期ステータスが変更されたときにアトミック性を保証するために使用する必要があります。
通常、カスタム同期コンポーネントは、カスタム同期コンポーネント TwinsLock の内部クラスとして定義されます。
カスタムロックとシンクロナイザー TwinsLock :
public class TwinsLock implements Lock {
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("countは0より大きくなければならない");
}
setState(2); //初期状態は2である。
}
/*
スレッドがリソースを取得すると、状態curStateが1減少する。,
同期された状態変更には、原子性を保証するためにCASが必要である。
*/
@Override
protected int tryAcquireShared(int reduceCount) {
for(;;) {
int curState = getState();
int newState = curState - reduceCount;
if(newState < 0 || compareAndSetState(curState, newState)) {
return newState;
}
}
}
/*
スレッドがリソースを解放すると、状態curStateが1増加する。,
同期された状態変更には、原子性を保証するためにCASが必要である。
*/
@Override
protected boolean tryReleaseShared(int reduceCount) {
for(;;) {
int curState = getState();
int newState = curState + reduceCount;
if(compareAndSetState(curState, newState)) {
return true;
}
}
}
}
private final Sync sync = new Sync(2); //カスタムコンポーネントの同期
@Override
public void lock() {
//カスタム・コンポーネントのオーバーライドされたacquireSharedメソッドで状態を変更する。
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
//Lockインターフェイスの残りのメソッドはデフォルトなので、ここでは省略する。
}
タスク
public class Task extends Thread {
final static Lock twinsLock = new TwinsLock();
@Override
public void run() {
while (true) {
twinsLock.lock();
System.out.println(Thread.currentThread().getName() + "AcquireLock");
try {
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + "タスクを実行する ");
Thread.sleep(1);
}catch(Exception e){
e.printStackTrace();
}finally{
System.out.println(Thread.currentThread().getName() + "ロックを解除する");
twinsLock.unlock();
}
}
}
}
テストでは、最大2つのスレッドがロックを保持していることがわかります。
public class Test {
public static void main(String[] args) throws InterruptedException{
new Task().start();
new Task().start();
new Task().start();
}
}
JDKのロック・インターフェースとクラスは
インターフェイス 条件/ロック
Condition newCondition();
Condition インターフェースは、従来の Object の wait () や notify () を置き換えて、await/signalメソッドによるスレッド間連携を実現するために使用されます。
void await()
現在のスレッドは、他のスレッドが同じConditionオブジェクトのsignal/signalAllメソッドを呼び出すか、他のスレッドがinterruptメソッドを呼び出して現在のスレッドに割り込むまで待機状態になります;
void awaitUninterruptibly()
現在のスレッドは通知されるまで待機状態になり、その間は割り込みシグナルに鈍感で、現在のスレッドへの割り込みはサポートされません。
long awaitNanos(long nanosTimeout)
現在のスレッドは、通知されるか、割り込まれるか、タイムアウトするまで待機状態になります。戻り値が0以下の場合、タイムアウトしたとみなすことができます。
boolean awaitUntil(締め切り日)
現在のスレッドは、通知されるか、中断されるか、タイムアウトするまで待機状態になります。指定した時間に達することなく通知されなかった場合はtrueを返し、そうでない場合はfalseを返します。
void signal()
Conditionで待機しているスレッドをウェイクアップします。ウェイクアップされたスレッドは、メソッドが戻る前にConditionオブジェクトに関連付けられたロックを保持する必要があります。
void signalAll()
コンディションで待機しているすべてのスレッドをウェイクアップします。 await() などのメソッドからリターンできるスレッドは、まずコンディション・オブジェクトに関連付けられたロックを取得する必要があります。
オブジェクトモニターとコンディションの比較
待ち行列の数:オブジェクトモニターには1つ、コンディションモニターには複数あります。
Obejctのwaitメソッドは割り込みに遭遇すると、まずスレッドを待ちキューからブロックキューに移動し、ロックが解放されるのを待ち、解放後にInterruptedExceptionをスローします。状態をサポートします。
待機時間の設定:ConditionのawaitNanosメソッドは、現在のスレッドがロックを解放してから一定時間後まで待機することをサポートしていますが、Objectにはありません。
ReentrantLock
大まかな構造
ReentrantLock は Lock インターフェースのデフォルト実装で、再入可能な排他ロックです。このクラスの一般的な構造は次のとおりです:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
//実装
}
protected final boolean tryRelease(int releases) {
//実装
}
}
static final class NonfairSync extends Sync {
//プロパティとメソッドの実装を省略する
}
static final class FairSync extends Sync {
//プロパティとメソッドの実装を省略する
protected final boolean tryAcquire(int acquires) {
//省略されたメソッドの実装
}
}
}
このクラスの内部には、カスタム同期装置である AQS を継承した抽象クラス Sync があります。また、Sync を継承した非抽象クラス NonfairSync と FairSync があり、それぞれ "非フェア同期" と "フェア同期" です。つまり、ReentrantLock は"fair lock" と "non-fair lock" の両方をサポートできるということです。
使用例
// 1.公平なロックと公平でないロックの選択を初期化する
ReentrantLock lock = new ReentrantLock(true);
// 2.利用可能なコードブロック
lock.lock();
try {
try {
// 3.複数のロック・メソッドをサポートし、より柔軟にする; 再入可能
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
} finally {
// 4.手動でロックを解放する
lock.unlock()
}
} finally {
lock.unlock();
}
非フェア・ロックによる同期状態の取得
ReentrantLock のコンストラクタに boolean 型のパラメータを渡すことで、公平なロックかどうかを指定できます。
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //acquireメソッドをタップしてAQSにジャンプする。
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
acquire
同期状態を取得するAQSの方法、acquireを見てみましょう。
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
//同期ステータスの取得が試みられ、これが失敗すると、addWaiterがコンストラクタ・ノードを同期キューの末尾に追加する。その後、acquireQueuedによって、ノードがデッドエンド・ループで同期状態を取得する。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//キューにノードを追加する
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CASを使ってノードの末尾への挿入を試み、成功すれば追加されたノードを返す。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//待ち行列が空であるか、上記のCASが失敗した場合、スピンCASは以下を挿入する。
enq(node);
return node;
}
/*
デッド・ループによってノードが正しく追加され、CASによってノードがテール・ノードとして設定された後にのみ、現在のスレッドがそのノードから書き込みできるようにする。
メソッドが返す
*/
private Node enq(final Node node) {
for (;;) { //デッド・ループは、ノードが常に追加されることを保証する。
Node t = tail;
if (t == null) { //CASは、テールノードが空の場合、ノードをヘッドノードに設定する。
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; //CASノードをテールノードに設定する
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
an}
}
//tailAQSオブジェクト上の変数のメモリオフセットで、テール変数がメモリ上で直接変更される。
private static final long tailOffset;
static {
try {
//...
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
}
acquireQueued
キューに追加されたノードが同期ステータスを取得するために使用するacquireQueuedメソッドを見てみましょう。
//最初に注意すべきことは、ノードヘッドの次のノードは、現在リソースを獲得しているノードかNULLであるということである。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { //スピン:同期状態を取得するか、ブロックされる。
final Node p = node.predecessor();
//ノードの前任ノードpがヘッドノードである場合、ノードはリソースを取得しようとする
if (p == head && tryAcquire(arg)) {
//nodeノードがリソースを取得し、nodeをheadに設定する。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/*
pがヘッド・ノードでない場合、またはpがヘッド・ノードだがロックを取得していない場合。,
現在のノードがブロックされるべきかどうかを判断する。
*/
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //ノードのステータスを CANCELLED としてマークする。
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 現在のスレッドがブロックされるべきかどうかを判断するために、先行ノードに依存する。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// ノードの前任者のノード・ステータスを取得する。
int ws = pred.waitStatus;
// ヘッドノードが起きていることを示す。
if (ws == Node.SIGNAL)
return true;
// waitStatus>0がキャンセルされる
if (ws > 0) {
do {
// ループフォワードしてキャンセル・ノードを見つけ、キャンセル・ノードをキューから削除する。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 先行ノードの待機状態をSIGNALに設定する。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
キュー内のノードが同期のステータスを取得しようとするプロセスを要約します:
デッド・ループによる CPU リソースの浪費を防ぐため、shouldParkAfterFailedAcquire メソッドは、先行ノードの状態を判断して現在のスレッドをハングするかどうかを決定します:
cancelAcquire
先のacquireQueuedメソッドでは、Nodeの状態をCANCELLEDとマークしています。
private void cancelAcquire(Node node) {
if (node == null)
return;
// ノードをどのスレッドとも関連付けない、つまり仮想ノードに設定する。
node.thread = null;
Node pred = node.prev;
// 先行ノードによるキャンセル状態のノードをスキップする
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// この時点で、predはキャンセルされていない状態のノードであり、nodeはキャンセルされた状態のノードである。>node
Node predNext = pred.next;
// 現在のノードの状態をCANCELLEDに設定する。
node.waitStatus = Node.CANCELLED;
// 現在のポイントが末尾ノードである場合、後ろから前に向かって最初のキャンセルされていないノードを末尾ノードに設定しようとする
// 更新が成功した場合、predの後続ノードをnullに設定する。
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else { //更新に失敗した
int ws;
//1.キャンセルされていないノードpredはヘッドノードではなく、predはwoken状態にある。,2.ノードをウェイクアップするためにpredを設定してみる。
//1と2のいずれかが真の場合、現在のスレッドがNULLかどうかを判断する。
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 現在のポイントがheadの後継であるか、上記の条件を満たさない場合、現在のポイントの後継をウェイクアップする。
unparkSuccessor(node);
}
node.next = node;
}
}
非フェア・ロック・リリース同期
ReentrantLockは、ロックを解除する際に、公平なロックと不公平なロックを区別しません。
public void unlock() {
sync.release(1);
}
次に、AQSクラスのリリース・メソッドが呼び出されます。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
//Trueは、ロックを保持するスレッドがもうないことを意味する。,
if (tryRelease(arg)) {
Node h = head;
// ヘッド・ノードが空でなく、ヘッド・ノードのwaitStatusが初期化されていない場合、スレッドは保留状態から解放される。
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/*
初期状態では、headは空であり、最初のノードがキューに入れられると、headは仮想ノードに初期化される。
*/
tryRelease メソッドは、synchronized 状態のリリースを解放した後に、まだ状態を保持しているスレッドがあるかどうかを返します。
// java.util.concurrent.locks.ReentrantLock.Sync
// メソッドは、現在のロックがどのスレッドによっても保持されていないかどうかを返す。
protected final boolean tryRelease(int releases) {
// 状態を解放した後の状態量
int c = getState() - releases;
// 現在のスレッドがロックを保持しているスレッドでない場合は、例外をスローする。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// すべてのスレッドが解放された場合、すべてのスレッドをnullに設定し、状態を更新する。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
フェアロック獲得同期ステータス
前述のnonfairTryAcquire(int acquires)メソッドでは、非フェア・ロックの場合、CASが正常に同期状態をセットすれば、現在のスレッドがロックを取得したことになります。しかし、フェアロックは違います。下図に示すように、nonfairTryAcquireメソッドと比較して、tryAcquireメソッドの判定条件hasQueuedPredecessors()メソッド、つまり、同期キュー内の現在のノードに先行ノードがあるかどうかの判定が追加されています。判定メソッドがtrueを返した場合、現在のスレッドよりも早くロックを要求しているスレッドが存在することを意味するので、先行スレッドがロックを取得するのを待つ必要があります。ロックの取得を続行する前に、先行スレッドがロックを取得し解放するのを待つ必要があります。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/*
まず、現在のポイントがある待ち行列に先行ノードがあるかどうかを判断し、なければCASによる同期状態の更新を試みる。
*/
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再入可能リードライトロック
ReentrantReadWriteLockは、ReadWriteLockインターフェイスのデフォルトのJDKの実装では、ReentrantLockの機能に似ていますが、それはまた、"読み取りと書き込みロック "をサポートしていますが、書き込み操作は、操作を書き込み、操作を読み取ることはできません。
クラスの一般的な内部構造は以下のとおりで、やはり2つのシンクロナイザーと2つのLock実装クラスReadLockとWriteLockを保持しています。
// 内部構造
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 実装
}
static final class NonfairSync extends Sync {
// 実装
}
static final class FairSync extends Sync {
// 実装
}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 実装
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 実装
}
// 2つのロックを初期化するコンストラクタ
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// 読み取りロックと書き込みロックを取得するメソッド
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
書き込みロックの取得と解除
書き込みロックは、再入力をサポートする排他的ロックです。 現在のスレッドがすでに書き込みロックを獲得している場合は、書き込み状態が増加します。読み取りロックがすでに獲得されているときに現在のスレッドが書き込みロックを獲得している場合、またはスレッドがすでに書き込みロックを獲得しているスレッドでない場合は、現在のスレッドは待機状態になります。
同期状態を取得するための書き込みロックの方法は以下のとおりです。 この方法では、現在のスレッドが書き込みロックを取得しているかどうかの判断に加えて、読み取りロックが存在するかどうかの判断も追加されます:読み取りロックが存在する場合、書き込みロックを取得することはできません。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); //ロックの取得回数。
if (c != 0) {
// 読み取りロックがあるか、現在のスレッドが書き込みロックを取得したスレッドでない。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
return false;
}
setExclusiveOwnerThread(current);
return true;
}
書き込みロック解除
書き込みロックの解放は、基本的にReentrantLockの解放処理と似ています。 解放のたびに書き込み状態が減少し、書き込み状態が0になるとロックが解放されたことになり、待機している読み取り/書き込みスレッドは引き続きロックにアクセスできるようになり、書き込みスレッドによる修正は後続の読み取り/書き込みスレッドから見えるようになります。
リード・ロックの取得と解除
リード・ロック・アクイジション
読み取りロックは再エントリーをサポートする共有ロックで、複数のスレッドが同時に取得することができ、他の書き込みスレッドがアクセスしていないときは、常に読み取り状態を増やすために正常に取得されます。
tryAcquireSharedメソッドのコードの一部を以下に示します。他のスレッドが書き込みロックを取得した場合、現在のスレッドは読み取りロックの取得に失敗し、待機状態に入ります。現在のスレッドが書き込みロックを取得した場合、または書き込みロックが取得されなかった場合、現在のスレッドは読み取り状態を増加させ、読み取りロックの取得に成功します。
protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int nextc = c + (1 << 16);
if (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
return -1;
if (compareAndSetState(c, nextc))
return 1;
}
ロック解除を読む
読み取りロックが解除されるたびに、読み取り状態は以下の値だけ減少します。
ロックの降格
ロックの降格とは、書き込みロックが読み取りロックに降格されるプロセス、つまり、現在所有されている書き込みロックが再び取得され、読み取りロックが取得され、その後に以前所有されていた書き込みロックが解放されるプロセスを指します。
以下はロック降格のサンプルコードです。このとき、processData()メソッドにアクセスするすべてのスレッドがそのことを知ることができますが、書き込みロックを取得できるのは1つのスレッドだけで、他のスレッドは読み取りロックと書き込みロックのlock()メソッドでブロックされます。書き込みロックを取得したスレッドは、データの準備を終えてから読み取りロックを取得し、書き込みロックを解放してロック降格を終了します。
public void processData() {
readLock.lock();
if (!update) {
// 最初に読み取りロックを解放しなければならない。
readLock.unlock();
// 書き込みロックが取得されると、ロックの降格が始まる
writeLock.lock();
try {
if (!update) {
// データ・フローを準備する
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// ロックのアップグレードが完了し、書き込みロックが読み取りロックにダウングレードされる。
}
try {
// データ利用の流れ
} finally {
readLock.unlock();
}
}
ロック降格では、読み取りロックの取得が必要です。現在のスレッドが読み取りロックを取得せず、書き込みロックを直接解放した場合、別のスレッドTが書き込みロックを取得し、その時点でデータを変更したと仮定すると、現在のスレッドはスレッドTによるデータの更新に気づくことができません。したがって、現在のスレッドが読み取りロックを取得する限り、スレッド T はブロックされてデータを変更することはなく、スレッド T が書き込みロックを取得してデータを更新する前に、現在のスレッドが読み取りロックを解放したことを知ることができます。
RentrantReadWriteLockは、ロックのアップグレードをサポートしていません。読み取りロックが複数のスレッドによって取得され、それらのスレッドのいずれかが書き込みロックの取得に成功し、データを更新した場合、この更新は読み取りロックを取得した他のスレッドからは見えません。
使用例使用例
public class Test {
class MyTask {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public void readTask() {
try {
try {
lock.readLock().lock();
System.out.println(Thread.currentThread().getName() +" 読み取りロックの取得");
Thread.sleep(3000);
}finally {
System.out.println(Thread.currentThread().getName() +" 読み取りロックを解放する");
lock.readLock().unlock(); // 書き込みロックは.writeLock().unlock()
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
public void writeTask() {
try {
try {
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() +" WriteLockを取得する");
Thread.sleep(3000);
}finally {
System.out.println(Thread.currentThread().getName() +" WriteLockを解放する");
lock.writeLock().unlock(); // 書き込みロックは.writeLock().unlock()
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class WriteThread extends Thread {
private MyTask task;
public WriteThread(MyTask task, String name) {
this.task = task;
setName(name);
}
@Override
public void run() {
task.writeTask();
}
}
class ReadThread extends Thread {
private MyTask task = new MyTask();
public ReadThread(MyTask task, String name) {
this.task = task;
setName(name);
}
@Override
public void run() {
task.readTask();
}
}
public static void main(String[] args) {
Test test = new Test();
MyTask task = test.new MyTask();
test.new WriteThread(task, "WriteLock").start();
test.new ReadThread(task, "ReadLockA").start();
}
}





