前序
生産者-消費者モデルの実装では、リソースを格納するためのキャッシュの必要性。 このようなキャッシュの実装のためのJDK:ブロッキングキューBlockingQueueは、唯一のスレッドのセキュリティ問題のマルチスレッド環境を心配することなく、ストレージ、検索操作を実装する必要があります。
インターフェイスBlockingQueueは、Javaのutil.concurrentパッケージの下で重要なデータ構造であり、通常のキューとは異なり、BlockingQueueは、キューへのスレッドセーフアクセスを提供し、多くの高レベルの同期クラスの実装では、同時実行パッケージは、BlockingQueueの実装に基づいています。
BlockingQueue詳細は
ブロッキング・キューは、要素を挿入、削除、チェックするための4つの異なるメソッド・セットを提供します:
例外のスロー: 実行しようとした操作をすぐに実行できない場合、例外がスローされます。ブロッキング・キューが満杯のときにキューに要素を挿入すると、IllegalStateException("Queue full") がスローされます。キューが空のとき、キューから要素を取得すると NoSuchElementException がスローされます。
特別な値を返します: 実行しようとした操作がすぐに実行できない場合、特別な値が返されます。
常にブロック:実行しようとした操作がすぐに実行できない場合、常にブロックするか、割り込みに応答します。
タイムアウト終了: 実行しようとした操作をすぐに実行できない場合、メソッド呼び出しは実行できるまでブロックされます。操作が成功したかどうかを示すために、特定の値が返されます。
NULLポインタ例外がスローされます。
BlockingQueueの実装クラスを参照してください。
ArrayBlockingQueue
配列構造からなる有界ブロックキュー。内部構造は配列なので、配列の特性を持ちます。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
 final Object[] items;
 final ReentrantLock lock;
 
 private final Condition notEmpty;
 private final Condition notFull;
 
 public ArrayBlockingQueue(int capacity) { this(capacity, false); }
 
 public ArrayBlockingQueue(int capacity, boolean fair){
 if (capacity <= 0)
 throw new IllegalArgumentException();
 this.items = new Object[capacity];
 lock = new ReentrantLock(fair);
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
 }
}
これはキューのサイズを初期化することができ、一度初期化すると変更することはできません。コンストラクタ・メソッドの fair は、コントロール・オブジェクトの内部ロックがfairロックであるかどうかを示します。
LinkedBlockingQueue
連鎖表構造からなる有界ブロッキング待ち行列。内部構造は連鎖表の特徴を持つ連鎖表です。デフォルトのキューのサイズはInteger.MAX_VALUEですが、サイズを指定することもできます。このキューは、先入れ先出しの原則に従って要素をソートします。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
 static class Node<E> {
 E item;
 Node<E> next;
 Node(E x) { item = x; }
 }
 transient Node<E> head;
 private transient Node<E> last;
 
 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
 public LinkedBlockingQueue(int capacity) {
 if (capacity <= 0) throw new IllegalArgumentException();
 this.capacity = capacity;
 last = head = new Node<E>(null);
 }
}
DelayQueue
このキューの要素は、指定された遅延時間が経過したときにのみ、キューから取り出すことができます。このキューに入れられる要素は、java.util.concurrent.Delayedインタフェースを実装していなければなりません。
DelayQueueはサイズ制限のないキューです。そのため、キューにデータを挿入する操作がブロックされることはありませんが、データをフェッチする操作だけがブロックされます。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
 private final transient ReentrantLock lock = new ReentrantLock();
 private final PriorityQueue<E> q = new PriorityQueue<E>();
 
 public DelayQueue() {}
 public DelayQueue(Collection<? extends E> c) {
 this.addAll(c);
 }
}
SynchronousQueue
これは要素を保存しないブロッキングキューです。各 put オペレーションは take オペレーションを待たなければなりません。キュー自身は要素を保存しません。
フェアアクセスキューをサポートしています。デフォルトでは、スレッドは非フェアアクセスキューです。
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
	public SynchronousQueue() {
 this(false);
 }
 public SynchronousQueue(boolean fair) {
 transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
 }
}
このキューのいくつかのメソッドの戻り値を紹介します:
- iterator() は常に null を返します。
- peek() 常にnullを返します。
- put() は要素をキューに入れ、他のスレッドがそれを取りに来るまで待ちます。
- offer() は、要素をキューに入れた直後に返します。 その要素がたまたま他のスレッドに取られてしまった場合は、 offer メソッドはオファーが成功したとみなして true を返し、そうでない場合は false を返します。
- isEmpty() 常にtrueを返します。
- remove()&removeAll() 常にfalseを返します。
PriorityBlockingQueue
優先度をサポートした、境界のないブロック・キューです。デフォルトでは、要素は自然昇順でソートされますが、 compareTo() メソッドを実装したカスタム・クラスを実装して要素のソート・ルールを指定したり、コンストラクタのパラメータ Comparator を指定して PriorityBlockingQueue の初期化時に要素のソートを指定することができます。キューは、同じ優先順位の要素の順序を保証しないことに注意してください。
public class Test {
 static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<>();
 static class User implements Comparable<User> {
 public User(int age,String name) {
 this.age = age;
 this.name = name;
 }
 int age;
 String name;
 @Override
 public int compareTo(User o) {
 return this.age > o.age ? -1 : 1;
 }
 }
 public static void main(String[] args) throws InterruptedException{
 queue.add(new User(1, "w1"));
 queue.add(new User(66, "w66"));
 queue.add(new User(55, "w55"));
 queue.add(new User(77, "w77"));
 queue.add(new User(3, "w3"));
 queue.add(new User(9, "w9"));
 for (User user : queue) {
 System.out.println(queue.take().age);
 }
/* 
 77
 66
 55
 9
 3
 1
*/
 }
}
キューは束縛されないので、PriorityBlockingQueueはデータ生成者をブロックせず、消費可能なデータがない場合にのみ消費者をブロックします。
注意すべき点は、プロデューサはコンシューマがデータを消費するよりも速くデータを生成すべきではないということです。デフォルト・サイズのLinkedBlockingQueueの場合も同様です。
ブロッキング・キューArrayBlockingQueueの原理
JDK 1.8のソースコードのArrayBlockingQueueを見てみましょう。
コンストラクタのパラメータ
キューのサイズとフェア・ロックかどうかの初期化に加えて、同じ内部ロックを初期化するために、notEmptyとnotFullという2つのモニターがあります。
//データ要素の配列
final Object[] items;
//次に取り出す要素のインデックス
int takeIndex;
//次に追加される要素のインデックス
int putIndex;
//要素数
int count;
// 
final ReentrantLock lock;
//コンシューマー・モニター
private final Condition notEmpty;
//プロデューサー・モニター
private final Condition notFull; 
public ArrayBlockingQueue(int capacity, boolean fair) {
 //..残りのコードは省略する
 lock = new ReentrantLock(fair);
 notEmpty = lock.newCondition();
 notFull = lock.newCondition();
}
put
プロデューサースレッドは put メソッドを使ってリソースを入れようとします:
putオペレーションを実行するすべてのスレッドがロック錠を奪い合い、ロック錠を得たスレッドが次のステップを実行し、ロック錠を得られなかったスレッドがスピンしてロック錠を奪い合います。
ブロッキング・キューが満杯かどうかを判断します。満杯の場合は、awaitメソッドを呼び出してスレッドをブロックし、notFullスレッドとしてマークします。一方、ロックは解放し、コンシューマースレッドによってウェイクアップされるのを待ちます。
もし満杯でなければ、enqueueメソッドが呼ばれ、リソースをブロッキングキューに入れます。
notEmptyとマークされたスレッドを起動します。
public void put(E e) throws InterruptedException {
 checkNotNull(e);
 final ReentrantLock lock = this.lock;
 // 1.スピンでロックを取る
 lock.lockInterruptibly();
 try {
 // 2.キューが満杯かどうかを判断する
 while (count == items.length)
 // 2.1満杯になったらスレッドをブロックし、notFullとマークする。,
 // notFullが目覚めるのを待ち、目覚めた後もwhileループを実行し続ける。
 notFull.await();
 // 3.満杯でなければ、キューに入る
 enqueue(e);
 } finally {
 lock.unlock();
 }
}
private void enqueue(E x) {
 // assert lock.getHoldCount() == 1;
 // assert items[putIndex] == null;
 final Object[] items = this.items;
 items[putIndex] = x;
 if (++putIndex == items.length)
 putIndex = 0;
 count++;
 // 4 待機中のスレッドを起動する
 notEmpty.signal();
}
take
コンシューマースレッドは、take オペレーションを使ってキューからリソースを取ろうとします:
テイク操作を実行するすべてのスレッドがロック錠を奪い合い、ロック錠を得たスレッドが次のステップを実行し、ロック錠を得られなかったスレッドがスピンしてロック錠を奪い合います。
ブロッキングキューが空であるかどうかを判断し、それが空の場合は、スレッドをブロックするawaitメソッドを呼び出し、notEmptyスレッドとしてマークし、同時に、プロデューサースレッドによって起こされるのを待って、ロックを解除します。
空でなければ、dequeueメソッドが呼ばれます。
notFullとマークされたスレッドを起動します。
public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
// 1.スピンでロックを取る
 lock.lockInterruptibly();
 try {
 // 2.キューが空かどうかを判断し、空であればスレッドをブロックする。
 while (count == 0)
 notEmpty.await();
 //3.もしキューが空でなければ、dequeueメソッドが呼ばれる。.
 return dequeue();
 } finally {
 lock.unlock();
 }
}
private E dequeue() {
 // assert lock.getHoldCount() == 1;
 // assert items[takeIndex] != null;
 final Object[] items = this.items;
 @SuppressWarnings("unchecked")
 E x = (E) items[takeIndex];
 items[takeIndex] = null;
 if (++takeIndex == items.length)
 takeIndex = 0;
 count--;
 if (itrs != null)
 itrs.elementDequeued();
 notFull.signal();
 return x;
}
ほら
ポイント2では、put操作とtake操作の両方で、キューが空か満杯かを判断するために、whileでステートメントを実行しています。これは、起動されたスレッドが実行を続行するためにロックを取得する必要があるためです。ロックを取得するためには、キューが使用可能かどうかを判断する必要があります。if文が実行されると、ウェイクアップされてロックを取得したスレッドは、キューが使用可能かどうかを再度確認せず、直接次のステップを実行します。
使用示例
せいさんしょうひしゃモデル
キューが最大3つのリソースに配置されることを指定します:
public class Test {
 private int queueSize = 3;
 ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
 class Consumer extends Thread {
 @Override
 public void run() {
 consume();
 }
 private void consume() {
 while(true) {
 try {
 queue.take();
 System.out.println("要素が取り出され、キューの残りの"+ queue.size() + "要素");
 }catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 class Producer extends Thread {
 @Override
 public void run() {
 produce();
 }
 private void produce() {
 while(true) {
 try {
 queue.put(1);
 System.out.println("要素が生成され、キューを: "に入れることもできる。+(queueSize-queue.size()) + "要素");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
 }
 public static void main(String[] args) throws InterruptedException{
 Test test = new Test();
 test.new Producer().start();
 test.new Consumer().start();
 }
}
出力は次のとおりです。結果は、System.out.println文がロックされていないため、これは、スレッド1がすぐにCPUのタイムスライスを失うためにプット/テイク操作の実装後、スレッド2に取得/テイク操作を実行するために、出力文の実装とスレッド1に戻って出力、この時点でキューのサイズがスレッド2によって変更されているので、サイズの出力は、問題がある可能性があります。キューのサイズは、スレッド2によって変更されているので、出力のサイズは、スレッド1のプット/テイク操作後のキューのサイズではありませんが、それはキューのサイズが3を超えないことを保証することができます。
スレッドプールでのブロッキングキューの使用
Javaのスレッド・プールはブロッキング・キューを使って実装されています。





