DelayQueue
使用方法
public interface Delayed extends Comparable<Delayed> {
//DelayQueueは、期限切れまでの残り時間を返すために使われる。
long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
//遅延キューでは、通常、最も短い有効期限がキューの先頭に置かれる。
public int compareTo(T o);
}
class Work implements Delayed{
String name;
//有効期限
long time;
public Work(String name, long time, TimeUnit timeUnit) {
this.name = name;
//期限を計算する
this.time = timeUnit.toMillis(time)+System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
//残りの有効期限に戻るには、与えられた時間単位によると、適切なTimeUnitを選択する時間に応じて、ストレージユニットの時間に注意を払う
return unit.convert(time-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//有効期限の小さいものから大きいものへソートする
return Long.compare(this.time,o.getDelay(TimeUnit.MILLISECONDS));
}
}
public class TestMain {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Work> q=new DelayQueue<>();
q.add(new Work("001",10,TimeUnit.SECONDS));
q.add(new Work("002",15,TimeUnit.SECONDS));
q.add(new Work("003",20,TimeUnit.SECONDS));
System.out.println("begin time:" + new Date());
for (int i = 0; !q.isEmpty(); i++) {
Work take = q.take();
if(take == null){
continue;
}
System.out.format("name:{%s}, time:{%s}\n",take.name, new Date());
}
}
}
ソースコード解析
上記の呼び出し例に従ってソースコードを見ると、まずキュー・エントリーの操作を見てください。
キュー
ソースコードを見ると、DelayQueueにはadd(E)、offer(E)、put(E)、offer(T, long, TimeUnit)という合計4つのエントリーメソッドがあることがわかりますが、実はこれらはすべてoffer(E)の実装の呼び出しです。
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
/**
* this.lock
* private final transient ReentrantLock lock = new ReentrantLock();
* 再入可能ロック
*/
final ReentrantLock lock = this.lock;
//
lock.lock();
try {
/**
* q のソースコード
* private final PriorityQueue<E> q = new PriorityQueue<E>();
* 優先順位に基づく、境界のない優先順位キュー。
*/
// 要素をキューに入れ、順序を保つ
q.offer(e);
//入ってくる要素がキューの先頭にある場合
if (q.peek() == e) {
/**
* leader
* private Thread leader = null;
* leader現在キューの先頭で待機しているスレッドのために。
*/
// リーダーを空に設定する。なぜなら、キューから出た最初の要素を、リーダーは待っていないからだ。,
leader = null;
/**
* avaiable
* private final Condition available = lock.newCondition();
* availaleスレッドをウェイクアップまたはブロックできる条件を示す。
*/
//この条件で待機しているスレッドの1つをウェイクアップし、そのスレッドは再度ロックを取得しなければならない。
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
キュー外
poll()、take()、poll(long, TimeUnit)、peek()の4つのメソッドがありますが、ここでは主にpollとtakeについて書きます。
poll()
/**
* キューの最初の期限付き要素、またはキューに期限付き要素がない場合はNULLを返し、削除する。
*
*/
public E poll() {
final ReentrantLock lock = this.lock;
//
lock.lock();
try {
//キューの最初の要素を取得する
E first = q.peek();
//最初の要素が空であるか、有効期限が切れていない場合はnullを返す。
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
take()
/**
* キューの最初の期限付き要素を返し、それが期限付きの場合は、期限付き要素が返されるまで待つ。
*
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
//キューの最初の要素が空であれば、待機してロックを解放する。
available.await();
else {
//キューの最初の要素の残りの有効期限を取得する
long delay = first.getDelay(NANOSECONDS);
//期限切れになると、キューの最初の要素が返される。
if (delay <= 0)
return q.poll();
//最初に参照を解放する,
// スレッドが待機している間、参照は解放されない。,
// マルチスレッドの状況では、要素は他のスレッドによってポップされるかもしれないが、現在のスレッドはまだ参照を解放していないため、メモリリークとなる。
first = null; // don't retain ref while waiting
// ヘッダー要素を待っているスレッドがすでにある場合、このスレッドは待つ。
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
// ヘッダー要素を待っているスレッドがない場合、このスレッドをヘッダー要素を待っているスレッドにする。
leader = thisThread;
try {
// ローカル・スレッドは、ヘッダー要素の有効期限が切れるまで待機し、自動的にウェイクアップする。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
まとめ
- グローバル再入可能ロックによるDelayQueueの同期
- DelayQueueは時間制限のあるタスクによく使われます。
- DelayQueue時限タスクによく使われます