blog

遅延タスク - DelayQueue

1. 使い方 2. ソースコードの解析 3. まとめ 1. 使い方 2. ソースコードの解析 ソースコードを通して、add、offer、put、offer という合計 4 つのキューイングメソッドがあ...

Apr 7, 2020 · 4 min. read
Share this

DelayQueue

使用方法

public interface Delayed extends Comparable<Delayed> {
 //DelayQueueは、期限切れまでの残り時間を返すために使われる。
 long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
 //遅延キューでは、通常、最も短い有効期限がキューの先頭に置かれる。
 public int compareTo(T o);
}
class Work implements Delayed{
 String name;
 //有効期限
 long time;
 public Work(String name, long time, TimeUnit timeUnit) {
 this.name = name;
 //期限を計算する
 this.time = timeUnit.toMillis(time)+System.currentTimeMillis();
 }
 @Override
 public long getDelay(TimeUnit unit) {
 //残りの有効期限に戻るには、与えられた時間単位によると、適切なTimeUnitを選択する時間に応じて、ストレージユニットの時間に注意を払う
 return unit.convert(time-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
 }
 @Override
 public int compareTo(Delayed o) {
 //有効期限の小さいものから大きいものへソートする
 return Long.compare(this.time,o.getDelay(TimeUnit.MILLISECONDS));
 }
}
public class TestMain {
 public static void main(String[] args) throws InterruptedException {
 
 DelayQueue<Work> q=new DelayQueue<>();
 q.add(new Work("001",10,TimeUnit.SECONDS));
 q.add(new Work("002",15,TimeUnit.SECONDS));
 q.add(new Work("003",20,TimeUnit.SECONDS));
 System.out.println("begin time:" + new Date()); 
 for (int i = 0; !q.isEmpty(); i++) {
 Work take = q.take();
 if(take == null){ 
 continue;
 }
 System.out.format("name:{%s}, time:{%s}\n",take.name, new Date());
 } 
 }
}

ソースコード解析

上記の呼び出し例に従ってソースコードを見ると、まずキュー・エントリーの操作を見てください。

キュー

ソースコードを見ると、DelayQueueにはadd(E)、offer(E)、put(E)、offer(T, long, TimeUnit)という合計4つのエントリーメソッドがあることがわかりますが、実はこれらはすべてoffer(E)の実装の呼び出しです。

 public boolean add(E e) {
 return offer(e);
 }
 public void put(E e) {
 offer(e);
 }
 public boolean offer(E e, long timeout, TimeUnit unit) {
 return offer(e);
 }
 public boolean offer(E e) {
 /**
 * this.lock  
 * private final transient ReentrantLock lock = new ReentrantLock();
 * 再入可能ロック
 */
 final ReentrantLock lock = this.lock;
 //  
 lock.lock();
 try {
 /**
 * q のソースコード
 * private final PriorityQueue<E> q = new PriorityQueue<E>();
 * 優先順位に基づく、境界のない優先順位キュー。
 */
 // 要素をキューに入れ、順序を保つ
 q.offer(e);
 
 //入ってくる要素がキューの先頭にある場合
 if (q.peek() == e) {
 /**
 * leader  
 * private Thread leader = null;
 * leader現在キューの先頭で待機しているスレッドのために。
 */
 // リーダーを空に設定する。なぜなら、キューから出た最初の要素を、リーダーは待っていないからだ。,
 leader = null;
 /**
 * avaiable  
 * private final Condition available = lock.newCondition();
 * availaleスレッドをウェイクアップまたはブロックできる条件を示す。
 */
 //この条件で待機しているスレッドの1つをウェイクアップし、そのスレッドは再度ロックを取得しなければならない。
 available.signal();
 }
 return true;
 } finally {
 lock.unlock();
 }
 }

キュー外

poll()、take()、poll(long, TimeUnit)、peek()の4つのメソッドがありますが、ここでは主にpollとtakeについて書きます。

poll()

 /**
 * キューの最初の期限付き要素、またはキューに期限付き要素がない場合はNULLを返し、削除する。
 *  
 */
 public E poll() {
 
 final ReentrantLock lock = this.lock;
 // 
 lock.lock();
 try {
 //キューの最初の要素を取得する
 E first = q.peek();
 //最初の要素が空であるか、有効期限が切れていない場合はnullを返す。
 if (first == null || first.getDelay(NANOSECONDS) > 0)
 return null;
 else
 return q.poll();
 } finally {
 lock.unlock();
 }
 }

take()

 /**
 * キューの最初の期限付き要素を返し、それが期限付きの場合は、期限付き要素が返されるまで待つ。
 *  
 */
 public E take() throws InterruptedException {
 final ReentrantLock lock = this.lock;
 // 
 lock.lockInterruptibly();
 try {
 for (;;) {
 E first = q.peek();
 if (first == null)
 //キューの最初の要素が空であれば、待機してロックを解放する。
 available.await();
 else {
 //キューの最初の要素の残りの有効期限を取得する
 long delay = first.getDelay(NANOSECONDS);
 //期限切れになると、キューの最初の要素が返される。
 if (delay <= 0)
 return q.poll();
 //最初に参照を解放する,
 // スレッドが待機している間、参照は解放されない。,
 // マルチスレッドの状況では、要素は他のスレッドによってポップされるかもしれないが、現在のスレッドはまだ参照を解放していないため、メモリリークとなる。
 first = null; // don't retain ref while waiting
 // ヘッダー要素を待っているスレッドがすでにある場合、このスレッドは待つ。
 if (leader != null)
 available.await();
 else {
 Thread thisThread = Thread.currentThread();
 // ヘッダー要素を待っているスレッドがない場合、このスレッドをヘッダー要素を待っているスレッドにする。
 leader = thisThread;
 try {
 // ローカル・スレッドは、ヘッダー要素の有効期限が切れるまで待機し、自動的にウェイクアップする。
 available.awaitNanos(delay);
 } finally { 
 if (leader == thisThread)
 leader = null;
 }
 }
 }
 }
 } finally {
 if (leader == null && q.peek() != null)
 available.signal();
 lock.unlock();
 }
 }

まとめ

  • グローバル再入可能ロックによるDelayQueueの同期
  • DelayQueueは時間制限のあるタスクによく使われます。
  • DelayQueue時限タスクによく使われます
Read next

分割、バックトラック、BFS&DFS、貪欲、二分探索

分割、バックトラック ◉ 要素の多数決 ◉ ブラケット生成問題 ◉ 島の数 ◉ pow ◉ substrの部分集合 ◉ 多要素 ◉ 電話番号の組み合わせ ◉ NqueenBFS&DFS ◉ 2分木の階層的走査 ◉ 最小遺伝子変化 ◉ ブラケット生成

Apr 7, 2020 · 3 min read