blog

ブロックキューのSynchronousQueue

この記事では、SynchronousQueueの実態を一行一行、コードごとに分析します コード構造 フェアモードでの転送キュー 規約は上記のとおりです コード例:消費スレッドのスリープ時間、スレッドオ...

Aug 10, 2023 · 13 min. read
シェア

この記事では、SynchronousQueue の実際の動作原理を1行ずつ分析します。

コード構造

public class SynchronousQueue extends AbstractQueue implements BlockingQueue, Serializable { // コンストラクタのパラメータは、公平性戦略を決定します。 public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack();} // 抽象内部クラスで、抽象リソース転送メソッドを提供し、特定のサブクラスで実装されます abstract static class Transferer { abstract E transfer(E e, boolean timed, long nanos); } // CPU の数 static final int NCPUS = Runtime.getRuntime().availableProcessors(); // タイムドスピンとアンタイムドスピンの最大数 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; static final int maxUntimedSpins = maxTimedSpins * 16; // スピンタイムアウトのしきい値 static final long spinForTimeoutThreshold = 1000L; // 非フェアモードでのクラス転送スタックの実装 static final class TransferStack extends Transferer {...} // フェアモードでのクラス転送キューの実装 static final class TransferQueue extends Transferer {...} // volatile修飾子付きの転送者 private transient volatile Transferer transferer; // 外部に公開されるメソッド public boolean offer(E e, long timeout, TimeUnit unit){} public void put(E e){} public E take(){} public E poll(long timeout, TimeUnit unit){}}

SynchronousQueueは、synchronizedやreentrantlockなどのロックを使用しません。スレッドの安全性を確保するためにCASスピンを使用し、スレッドを直接ブロックおよびウェイクアップします。

公平モードの転送キュー

従来の図:

コード例:

コンシューマスレッドのスリープ時間、プロデューサスレッドのオファーのタイムアウト、コンシューマスレッドのポーリングのタイムアウトを変更して、転送キューの1対1転送の効果を観察することができます。

public class SynchronousQueueDemo {
 private static final ThreadPoolExecutor PRODUCT_THREAD = new ThreadPoolExecutor(2,12,30,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new NamedThreadFactory("オークション・スレッド");
 private static final ThreadPoolExecutor CONSUMER_THREAD = new ThreadPoolExecutor(2,12,30,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new NamedThreadFactory("入札スレッド"));
 private static final Integer SIZE = 5;
 private static final String[] WP = {"初心者から始めるJava」「初心者から始めるPython」「C++初心者から入門者まで"、"初心者から入門者までGo"、"初心者から入門者までJS"、"初心者から入門者までPhp"};
 public static void main(String[] args) throws InterruptedException {
 SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(true);
 for (int i = 0 ; i < SIZE; i ++) {
 int finalI = i;
 PRODUCT_THREAD.execute(() -> {
 try {
 System.out.println(Thread.currentThread().getName() + "オークション開始:"+ WP[finalI] + ":" + synchronousQueue.offer(WP[finalI], 5, TimeUnit.SECONDS));
 } catch (InterruptedException e) {
 }
 });
 }
 for (int i = 0 ; i < SIZE; i ++) {
 CONSUMER_THREAD.execute(() -> {
 try {
 TimeUnit.SECONDS.sleep(1);
 System.out.println(Thread.currentThread().getName() + ": アイテムに入札する:"+ synchronousQueue.poll(5, TimeUnit.SECONDS));
 } catch (InterruptedException e) {
 }
 });
 }
 }
}

private static final String[] WP = {「《Java从入门到入院》」, 「《Python从入门到入院》」, 「《C++从入门到入院》」, 「《Go从入门到入院》」, 「《JS从入门到入院》」, 「《Php从入门到入院》」};

public static void main(String[] args) throws InterruptedException {

SynchronousQueue synchronousQueue = new SynchronousQueue<>(true);

for (int i = 0 ; i < SIZE; i ++) {

int finalI = i;

PRODUCT_THREAD.execute(() -> {

try {

System.out.println(Thread.currentThread().getName() + 「: Starting auction for item: 」 + WP[finalI] + 「:」 + synchronousQueue.offer(WP[finalI], 5, TimeUnit.SECONDS));

} catch (InterruptedException e) {

}

});

}

for (int i = 0 ; i < SIZE; i ++) {

CONSUMER_THREAD.execute(() -> {

try {

TimeUnit.SECONDS.sleep(1);

System.out.println(Thread.currentThread().getName() + 「: Bidding for item:」 + synchronousQueue.poll(5, TimeUnit.SECONDS));

} catch (InterruptedException e) {

}

});

}

}

}

ソースコードの解釈

static final class TransferQueue<E> extends Transferer<E> {
 static final class QNode {} // ノード情報
 transient volatile QNode head; // ヘッダー・ノード・ポインター
 transient volatile QNode tail; // テール・ノード・ポインタ
 transient volatile QNode cleanMe; // キャンセルされたノードへの参照
 TransferQueue() { // この空のノードを指すヘッド・ポインタとテール・ポインタを持つ空のノードを初期化する。
 QNode h = new QNode(null, false); 
 head = h;
 tail = h;
 }
}

java

E transfer(E e, boolean timed, long nanos) {

QNode s = null; // 後で構築するための QNode を定義します

boolean isData = (e != null); // プロデューサまたはコンシューマであるかを判断します

for (;;) { // ループ

QNode t = tail;

QNode h = head;

if (t == null

h == null) {continue;} // ヘッドまたはテールが空の場合、処理を続行します

// キューが空であるか、キュー内のスレッドが現在のスレッドと同じモードである場合

if (h == t

t.isData == isData) {

QNode tn = t.next;

if (t != tail) {continue;} // ヘッドとテールが等しくない場合、別のスレッドが連結リストを変更した可能性があるため、ループを続行します

if (tn != null) { // 末尾ノードがnullでなければ、別のスレッドがこの処理中にキューにノードを挿入したことになります。

advanceTail(t, tn); // CASを使用して末尾を新しいノードを指すように変更しようとします

continue; // ループを続行します

}

if (timed && nanos <= 0L) {return null;} // 操作が非ブロック処理の場合、一致するスレッドがないため、nullを直接返してループを終了します

// 新しいノードを初期化し、CAS により新たに追加されたノードをテールの次のノードに設定します。 これが失敗した場合、tail.next!=null であり、別のスレッドが tail.next プロパティを変更している場合は、ループを続行します。

if (s == null) {s = new QNode(e, isData);}

if (!t.casNext(null, s)) {continue;}

// CAS によりテールを新しいノードを指すように変更しようとします

advanceTail(t, s);

// 受信データが現在のデータでなくなるまでスピンまたはブロックします

Object x = awaitFulfill(s, e, timed, nanos);

// リクエストがキャンセルされた場合

if (x == s) {

// t は cleanMe で保存され、別のスレッドが clean() を実行するのを待ち、状況に応じてクリアされます

clean(t, s);

return null; // 戻り値

}

// マッチングに成功しました。現在のリクエストがキューに残っている場合

if (!s.isOffList()) {

advanceHead(t, s); // ヘッドポインタを更新

if (x != null) {

s.item = s; // テールノード情報を更新

s.waiter = null;

}

}

//x != null: コンシューマーがプロデューサーのアイテムを要求し、取得したアイテムを返す。x == null: プロデューサーのアイテムが取得され、自身のアイテムを返す

return (x != null) ? (E)x : e;

} else { // キューは空ではありません

QNode m = h.next; // ヘッドノードを取得します

if (t != tail

m == null

h != head) {continue;} // ヘッドノードとテールノードが変更されたか、取得したヘッドノードが空の場合、ループを続行します

Object x = m.item; // ヘッドノードのデータを取得します

1. 現在のノードはプロデューサーリクエストであり、isData = true、ヘッドノードはコンシューマーリクエストですが、すでにキャンセルされています。x != null

2. 現在のノードはコンシューマーリクエストであり、isData = false、ヘッドノードはプロデューサーリクエストですが、すでに一致しています。x == null

x == m ヘッドノードはキャンセルされています。

// 1. m がプロデューサーのリクエストの場合、x == プロデューサーのデータ、現在のコンシューマーのリクエスト e == null、プロデューサーのリクエスト項目を null に設定

// 2. m がコンシューマーのリクエストの場合、x == null、現在のプロデューサーのリクエスト e == プロデューサーのデータ、コンシューマーのリクエスト項目をプロデューサーのデータに設定

if (isData == (x != null)

x == m

!m.casItem(x, e)) {

advanceHead(h, m);

continue;

}

// ヘッドポインタを更新し、一致するノードのスレッドを起動します。

advanceHead(h, m);

LockSupport.unpark(m.waiter);

//x != null: コンシューマーがプロデューサーのアイテムを要求し、要求されたアイテムを返しました。 x == null: プロデューサーのアイテムが取得され、コンシューマーのアイテムが返されました。

return (x != null) ? (E)x : e;

}

}

}

int spins = (head.next == s) ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) : 0;

for (;;) {

if (w.isInterrupted()) { // 現在のスレッドが中断されました

s.tryCancel(e); // このノードのリクエストをキャンセルします

}

Object x = s.item;

s は現在、e に包まれた末尾ノードです。現在のリクエストが成功またはキャンセルされた場合、末尾ノードの情報が返されます。

if (x != e) {return x;}

タイムアウトを設定するかどうか

if (timed) {

時間を計算します。タイムアウトに達した場合、このノードのリクエストをキャンセルします。キャンセルされた場合、x および s.item は null になります。

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

s.tryCancel(e);

continue;

}

}

// スピン数、スピンを継続

if (spins > 0) {

--spins;

Thread.onSpinWait();

} else if (s.waiter == null) { // スピンなし、末尾ノードの waiter が null の場合、末尾ノードの waiter を現在のスレッドに設定

s.waiter = w;

} else if (!timed) {

LockSupport.park(this); // 現在のスレッドを一時停止

} else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) {

LockSupport.parkNanos(this, nanos); // 指定された時間に従って現在のスレッドを一時停止

}

}

}

アンフェアモードの転送スタック

static final class TransferStack extends Transferer { static final class SNode {} // ノード情報 volatile SNode head; // ヘッドノード static final int REQUEST = 0; // 消費モード static final int DATA = 1; // 生産モード static final int FULFILLING = 2; // 現在のノードがマッチングを要求している static final class SNode { volatile SNode next; // スタックの次のノード volatile SNode match; // 現在のノードに一致するノード volatile Thread waiter; // 現在待機中のスレッド Object item; // データ int mode; // 現在のノードのモード:DATA/REQUEST/FULFILLING }}

従来の図:

コード例:

public class SynchronousStackDemo {
 private static final ThreadPoolExecutor PRODUCT_THREAD = new ThreadPoolExecutor(5,12,30,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new NamedThreadFactory("オークション・スレッド");
 private static final ThreadPoolExecutor CONSUMER_THREAD = new ThreadPoolExecutor(1,12,30,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new NamedThreadFactory("入札スレッド"));
 private static final Integer SIZE = 5;
 private static final String[] WP = {"初心者から始めるJava」「初心者から始めるPython」「C++初心者から入門者まで"、"初心者から入門者までGo"、"初心者から入門者までJS"、"初心者から入門者までPhp"};
 public static void main(String[] args) throws InterruptedException {
 SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(false);
 for (int i = 0 ; i < SIZE; i ++) {
 int finalI = i;
 PRODUCT_THREAD.execute(() -> {
 try {
 System.out.println(Thread.currentThread().getName() + "オークション開始:"+ WP[finalI]);
 synchronousQueue.offer(WP[finalI], 200, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
 }
 });
 }
 TimeUnit.SECONDS.sleep(10);
 for (int i = 0 ; i < SIZE; i ++) {
 CONSUMER_THREAD.execute(() -> {
 try {
 System.out.println(Thread.currentThread().getName() + ": アイテムに入札する:"+ synchronousQueue.poll(500, TimeUnit.SECONDS));
 } catch (InterruptedException e) {
 }
 });
 }
 }
}

private static final String[] WP = {「《Java从入门到入院》」, 「《Python从入门到入院》」, 「《C++从入门到入院》」, 「《Go从入门到入院》」, 「《JS从入门到入院》」, 「《Php从入门到入院》」};

public static void main(String[] args) throws InterruptedException {

SynchronousQueue synchronousQueue = new SynchronousQueue<>(false);

for (int i = 0 ; i < SIZE; i ++) {

int finalI = i;

PRODUCT_THREAD.execute(() -> {

try {

System.out.println(Thread.currentThread().getName() + 「: Starting auction for item: 」 + WP[finalI]);

synchronousQueue.offer(WP[finalI], 200, TimeUnit.SECONDS);

} catch (InterruptedException e) {

}

});

}

TimeUnit.SECONDS.sleep(10);

for (int i = 0 ; i < SIZE; i ++) {

CONSUMER_THREAD.execute(() -> {

try {

System.out.println(Thread.currentThread().getName() + 「: Bidding for item: 」 + synchronousQueue.poll(500, TimeUnit.SECONDS));

} catch (InterruptedException e) {

}

});

}

}

}

ソースコードの解釈

E transfer(E e, boolean timed, long nanos) {
 SNode s = null; 
 int mode = (e == null) ? REQUEST : DATA; // 生産または消費モード
 for (; ; ) { //  
 SNode h = head; // ヘッド・ノードの取得
 // ヘッダーが空であるか、要求スレッドがヘッダー・ノードの要求パターンに一致する場合
 if (h == null 
 h.mode == mode) {
 // タイムアウトが設定され、タイムアウトした
 if (timed && nanos <= 0L) {
 // 
 if (h != null && h.isCancelled()) { // ヘッダー・ノードがキャンセルされたかどうかを判断する。.match = h
 casHead(h, h.next); // ヘッダーの次のノードをheadeノードに設定する
 } else {
 return null;
 }
 } else if (casHead(h, s = snode(s, e, h, mode))) { // ラッパーeのsノードをヘッド・ノードに置き換えるためにCASを試みる。
 // ノードsがレスポンスのデータと一致するかタイムアウトするまでスピン/ブロックする
 SNode m = awaitFulfill(s, timed, nanos);
 if (m == s) { //  
 clean(s); // sに対してアウトオブスタック操作を実行し、NULLを返し、スレッドプールを終了する。
 return null;
 }
 // レスポンスが一致した場合、ヘッドノードから次のノードを設定する
 if ((h = head) != null && h.next == s) {
 casHead(h, s.next); 
 }
 return (E) ((mode == REQUEST) ? m.item : s.item); // 生産/消費モードに応じて異なるアイテムデータを返す
 }
 // スタック・ヘッドは、他のスレッドが異なる操作モードを行うのを待つ h.mode != FULFILLING
 } else if (!isFulfilling(h.mode)) { 
 if (h.isCancelled()) { // ヘッダー・ノードがキャンセルされたかどうかを判断する。.match = h 
 casHead(h, h.next); // ヘッダーの次のノードをheadeノードに設定する
 } else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // ラッパーeのsノードをheadノードに置き換えるCASを試し、FULFILLINGとマークし、失敗したらループを続ける
 for (; ; ) {
 // mは上のsnodeコピーのスタック・ヘッダである。
 SNode m = s.next;
 if (m == null) { // スタック・ヘッダが空の場合、他のスレッドがそれを掴んだことを意味し、ヘッダをリセットする。
 casHead(s, null);
 s = null; 
 break; 
 }
 // mに一致するノードを取得する。
 SNode mn = m.next;
 // はマッチングを試み、マッチングに成功すると待機中のスレッドをウェイクアップする。
 if (m.tryMatch(s)) {
 casHead(s, mn); // // 成功したら、アウトオブスタック操作を実行する
 return (E) ((mode == REQUEST) ? m.item : s.item); // 異なる
 } else {
 s.casNext(m, mn);
 } 
 }
 }
 // ペアになっている他のスレッドがあることを示す(m& FULFILLING) != 0, をマッチングさせるために、スタックアウト操作を実行する
 } else { 
 SNode m = h.next; 
 if (m == null) { 
 casHead(h, null); 
 } else {
 SNode mn = m.next;
 if (m.tryMatch(h)) { 
 casHead(h, mn); 
 } else { 
 h.casNext(m, mn); 
 } 
 }
 }
 }
}
// ブロックされたスタック・ヘッドmをウェイクアップし、現在のノードsをmのマッチ属性に割り当て、スタック・ヘッドmがウェイクアップされたときに、現在のsの属性を取得できるようにする。.item(e)
boolean tryMatch(SNode s) {
 if (match == null && SMATCH.compareAndSet(this, null, s)) {
 Thread w = waiter;
 if (w != null) { // waiters need at most one unpark
 waiter = null;
 LockSupport.unpark(w);
 }
 return true;
 }
 return match == s;
}

m は、上記の snode からコピーしたスタックのヘッドです。

SNode m = s.next;

if (m == null) { // スタックのヘッドが空の場合は、他のスレッドが取得していることを意味します。そのため、ヘッダーをリセットします。

casHead(s, null);

s = null;

break;

}

// m に一致するノードを取得します。

SNode mn = m.next;

一致を試みます。成功した場合、待機中のスレッドを起動します。

if (m.tryMatch(s)) {

casHead(s, mn); // // 成功した一致。スタック外の操作を実行します。

return (E) ((mode == REQUEST) ? m.item : s.item); // 生産/消費モードに応じて異なる値を返します。

} else {

s.casNext(m, mn);

}

}

}

// 別のスレッドが一致処理を行っていることを示します (m & FULFILLING) != 0、一致処理、スタック外の操作を実行

} else {

SNode m = h.next;

if (m == null) {

casHead(h, null);

} else {

SNode mn = m.next;

if (m.tryMatch(h)) {

casHead(h, mn);

} else {

h.casNext(m, mn);

}

}

}

// ブロックされたスタックヘッドmを解除し、mのmatch属性に現在のノードsを割り当てます。これにより、スタックヘッドmが解除された際に、現在のsの属性を取得できるようになります。つまり、s.item(e)を取得できます。

boolean tryMatch(SNode s) {

if (match == null && SMATCH.compareAndSet(this, null, s)) {

Thread w = waiter;

if (w != null) { // ウェイターは最大でも1つだけアンパークする必要があります

waiter = null;

LockSupport.unpark(w);

}

return true;

}

return match == s;

}
/**

* ノード s が操作によって一致するまでスピン/ブロックします。

* @param s 待ちたいノード

* @param timed true タイムアウト待ちの場合

* @param nanos タイムアウトをナノ秒単位で指定

* @return 一致したノード、またはキャンセルされた場合は s

*/

SNode awaitFulfill(SNode s, boolean timed, long nanos) {

// 期限

最終的な長さの期限 = timed ? System.nanoTime() + nanos : 0L;

スレッド w = Thread.currentThread();

// スピン回数を取得します

int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);

for (;;) {

if (w.isInterrupted()) {

s.tryCancel(); // 現在のスレッドが中断された場合、sノードの一致ノードを自身に設定します

}

SNode m = s.match; // sの一致ノードを取得します。設定はせず、デフォルトはnullです

// nullでなければ一致ノードが存在するため、一致ノード情報を直接返します

if (m != null) {

return m;

}

if (timed) { // タイムアウトが設定され、タイムアウトが切れた場合、sノードの一致ノードを自身に設定し、ループを継続します

nanos = deadline - System.nanoTime();

if (nanos <= 0L) {

s.tryCancel();

continue;

}

}

// // スピン回数、スピンを継続します

if (spins > 0)

spins = shouldSpin(s) ? (spins-1) : 0;

// 主にスレッドによるブロックの完了とウェイクアップを完了させるため、現在のスレッドを待ち手として設定します

else if (s.waiter == null)

s.waiter = w; // スピンカウントなし。sノードの待ち手がnullの場合、sノードの待ち手を現在のスレッドに設定します

else if (!timed)

LockSupport.park(this); // 現在のスレッドを一時停止

else if (nanos > spinForTimeoutThreshold)

LockSupport.parkNanos(this, nanos); // 指定の時間に従って現在のスレッドを一時停止

}

}

まとめ

  • ゼロバッファ:
    • SynchronousQueue は要素を格納する内部容量を持たないため、メモリー使用量が非常に少なくなります。
    • キャッシュがないため、データのコピーや移動が不要となり、SynchronousQueue は非常に効率的です。
  • ロックフリーの競合:
    • マルチスレッド環境では、ほとんどのキューはスレッドの安全性を確保するために何らかのロック機構を必要とします。
    • SynchronousQueueは、スピンロックやCAS操作などのノンブロッキング同期技術を使用して、従来のロック競合のオーバーヘッドを回避します。
  • 効率的なスレッドの連携:
    • プロデューサスレッドがキューに要素を追加しようとした際に、待機中のコンシューマスレッドが存在しない場合、そのスレッドはブロックされ、コンシューマスレッドが現れるまで待ちます。

      同様に、コンシューマスレッドがキューから要素を取得しようとする際に、利用可能な要素がなく、待ち状態のプロデューサスレッドもない場合、そのスレッドもブロックし、要素を提供するプロデューサスレッドを待ちます。

      このメカニズムにより、スレッド間のやり取りが非常に効率的になり、不要なコンテキストスイッチが排除されます。

      低レイテンシ:
      • SynchronousQueue は生産と消費がほぼ同時に発生するように設計されているため、待ち時間が非常に短くなります。
      • SynchronousQueue は高いスループットと短い待ち時間が必要なアプリケーションに最適です。
    • スケーラビリティ:
      • SynchronousQueue 自体は無制限ですが、スレッドプールの制限にも使用できます。
      • スレッドプールが最大容量に達した場合、新しいタスクはキューでブロックされるのではなく拒否されるため、リソースの枯渇や過剰なCPU時間の消費を防ぐことができます。

      • 適応的な最適化:
        • SynchronousQueueは現在の負荷状況に応じて動作を動的に調整することができます。
        • 負荷が高い場合、SynchronousQueueはスピンオフをより頻繁に行うことでコンテキストスイッチとロックの競合を減らすことができます。

      アプリケーション

      • NewCachedThreadPool
Read next