序文
前回までの記事はFutureTaskのソースコードを見て、タスクの戻り値を取得する方法を知り、今日はThreadPoolExecutorを見てください。
スレッドプール
上記のスレッドは、また、スレッドは、システム内の非常に貴重なリソースであることを言った、それは彼を合理的に使用する必要があるので、スレッドプールの出現は、スレッドプールは、どのような利点をもたらすことができますか?
- リソース消費量の削減:すでに作成されたスレッドを再利用することで、スレッドの作成と破棄による消費量を削減します。
- 応答性を提供: キャラクタの作成が到着すると、スレッドの作成を待たずにタスクを即座に実行できます。
- スレッド管理性の向上:スレッドは希少なリソースであり、無限に作成することはできないため、スレッドプールを使用して、同じ方法でスレッドを管理し、割り当て、チューニング、監視などを行います。
ソースコード解析
相続の仕組み
まず、ThreadPoolExecutorの継承を見てみましょう。
public class ThreadPoolExecutor extends AbstractExecutorService{}
public abstract class AbstractExecutorService implements ExecutorService{}
public interface ExecutorService extends Executor {
<!--スレッドプールを停止し、状態をSHUTDOWNに設定し、新しいタスクを受け付けないようにする。>
void shutdown();
<!--スレッドプールを停止し、状態をSTOPに設定し、最初のタスクを受け付けないようにし、実行中のタスクを中断して、実行されていないタスクに戻るようにする。>
List<Runnable> shutdownNow();
<!--SHUTDOWN状態であろうとなかろうと......。>
boolean isShutdown();
<!--すべてのタスクが終了したかどうか>
boolean isTerminated();
<!--タイムアウトの時間、タスクを実行するためにタスクを待ちに行く-。>
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<!--Callable ミッションを提出せよ>
<T> Future<T> submit(Callable<T> task);
<!--Runnable ミッションを提出せよ>
<T> Future<T> submit(Runnable task, T result);
<!--Runnable ミッションを提出せよ>
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
public interface Executor {
void execute(Runnable command);
}
まず、インターフェイスの下からExecutorを参照してくださいには、このインターフェイスの実装では、実行メソッドの実装では、このインターフェイスは、スレッドの実行のエントリポイントです。
ExecutorServiceインターフェイスは、より多くのメソッドを持っているExecutorインターフェイスを継承し、一般的なshutdownNowは、シャットダウンは、このインターフェイスでは、また、スレッドプール内のタスクをサブミットメソッドに提出する共通です。機能
AbstractExecutorServiceは、インターフェイスを実装する抽象クラス内のJavaソースコードの数が多い理由ちなみに、ExecutorServiceインターフェイスを実装する抽象クラスであり、その後、クラスは、抽象クラスを継承し、なぜクラスが直接インターフェイスを実装していないのですか?しかし、また、それの層を設定し、私は、抽象クラスは、インターフェイスを実装するために、つまり、パブリックインターフェイスメソッドのいくつかを実装するために、クラスが再びインターフェイスを実装するように、私は別の良いの実装を懸念している限り、私は1つ以上のクラスのインターフェイスの実装を知っているので、抽象クラスは、インターフェイスの実装は、特に、リスト、セットインターフェイスを繰り返し実装の数を避けるために、再び抽出するインターフェイスのクラスの公開実装を達成するためにです。リスト、セットインターフェイスは、ほとんどすべての抽象クラスの実装への応答を持ってダウンしてください!
主な変数
<!--ctl スレッドプールの状態とスレッド数を保存した。>
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//2第29回-1の力
// runState is stored in the high-order bits
<!--スレッドプールが実行中であり、タスクを受け入れることができることを示す。>
private static final int RUNNING = -1 << COUNT_BITS;
<!--新しい仕事は引き受けないが、キューにある仕事は処理する。>
private static final int SHUTDOWN = 0 << COUNT_BITS;
<!--新たな任務を引き受けない、チーム内のタスクに対処しない、進行中の任務の中断--。>
private static final int STOP = 1 << COUNT_BITS;
<!--タスクは中断され、仕上げの状態にある。>
private static final int TIDYING = 2 << COUNT_BITS;
<!--最終状態を示す>
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
<!--スレッドプールの現在の稼働状況を見る>
private static int runStateOf(int c) { return c & ~CAPACITY; }
<!--現在のスレッドプールの稼働スレッド数を取得する>
private static int workerCountOf(int c) { return c & CAPACITY; }
<!--ctlの価値を知る>
private static int ctlOf(int rs, int wc) { return rs | wc; }
コンストラクター
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
このコンストラクタは、すべてのコンストラクタから呼び出される最後のメソッドです。
Worker
なぜ最初にWorkerの話をする必要があるのでしょうか?なぜなら、Runnabaleタスクはパッケージング後に実行するためにWorkerオブジェクトにサブミットされるからです。
まずWorkerのコードを見てみましょう:
/** Worker AQSを継承し、Runnableインターフェイスを実装している。*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** worker 走りの本糸は、課題を走らせる糸である。*/
final Thread thread;
/** 実行すべき課題*/
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//ここでいうthisとは、現在のワーカーの対象である。
}
/** Run 現在のタスク runWorker は、Thread PoolExecutor 内のメソッドである。*/
public void run() {
runWorker(this);
}
// Lock methods
// 0 ロックされていないことを示す
// 1 ロックされていることを示す
protected boolean isHeldExclusively() {
return getState() != 0;
}
<!--この方法について話したとき、私はAQSになるので、このメソッドは非常に精通している必要があります、この側を行うには、状態の状態を変更しようとすることですので、ワーカーがロックされた状態であることをロックの意味を示すことである、他のスレッドを実行することはできません!,-->
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {//CAS Stateの値を変更するには、1がロックされていることを意味する。
setExclusiveOwnerThread(Thread.currentThread());現在のロック占有スレッドを現在のスレッドに設定する
return true;
}
return false;
}
<!--ロックを解除する、つまり、Stateの値を未使用の0に変更する。このフィールドもなかなか面白い名前になっている。>
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);//現在のロック占有スレッドをnullに設定する
setState(0);
return true;
}
<!--現在のワーカーにロックを追加し、取得できなければ待ち行列に追加し、現在の実行スレッドをブロックする。>
public void lock() { acquire(1); }
<!--フェアでないロックの実施に相当するこちら側で、ロックしようとする-。>
public boolean tryLock() { return tryAcquire(1); }
<!--ロックを解除する>
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
<!--実行中のスレッドタスクを中断させるには、-が実行されたときにshutdownNowを呼び出すことである。>
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute
executeはExecutorインターフェイスを実装したメソッドで、次に投入されるタスクがどのように実行されるかを見るためのエントリメソッドです。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();//現在のctl値を取得する
/*
* workerCountOfまた、私が上記で述べた方法は、現在の稼働スレッド数を把握することである。
* 現在のワーカースレッド数が設定されたコア・スレッド数より少ない場合、addWorkerをコールして新しいワーカースレッドを追加する。
* addWorker addが成功すればそのまま戻るが、addが失敗すれば次のctlの後に行き続けるようになり、addWorkerのctlの過程で防ぐために順番に取得するようにこの書き換えが変更された!
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*
* このステップは、現在のワーカースレッド数がコアスレッド数より多いか、addWorkerが失敗したことを意味する。
* まず、現在のスレッドのステータスがRunningかどうかを判断し、ブロックキューworkQueueに現在のタスクを追加した。
* すべてがうまくいったら、またctlを取得しよう。Runnableを提供するとctlが変わるかもしれないのだから。
*多重検証のこちら側 同時性が高い場合を考えると、コードロジックは非常に厳密である。
* ロジックを続けるには、スレッドプールの状態を再度判断することである 非実行中であれば、現在のタスクを削除し、最後に拒否メソッドを実行する 異なる拒否戦略に従って、異なる動作を行う
* 最後に、現在のスレッド数が0であるか、またはaddWorkerメソッドを呼び出すに戻り、空のRunnalbe,falseを渡す場合は、コア以外の作業スレッドの作成であることを判断するようになった
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* この判断に至るには、スレッドプールの現在の状態が非実行中か、キューのタスク失敗中であることを意味する。
* こちら側は、そのタスクを処理するために非中核スレッドを作成し、それが失敗した場合は拒否ポリシーを実行する。
*/
else if (!addWorker(command, false))
reject(command);
}
これを読んで自分のことを振り返ってみて、いつコアスレッドを作るのか?非中核スレッドを作成するタイミングは?タスクはいつブロッキング・キューに入りますか?そして最後に、拒否ストラテジーが実行される状況とは?これらの質問の答えがわかっていれば、executeメソッドは自明なはずです!
addWorker
次のキーとなるメソッドは、頻繁に呼び出されます。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//これはスピンであり、スピンを設定することであり、その目的は新しいスレッドプールの数をCASすることである。
for (;;) {
int c = ctl.get();//ctlの価値を知る
int rs = runStateOf(c);//現在のスレッド状況を見る
// この症状の側面は、非常に丸く見えるが、よく見てみると、次のようなことがわかる。
// 第一条件>= SHUTDOWN スレッドプールの状態が正常でないことを示す
// 奥に無批判があるが、実はその中の括弧である。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 以下は、スレッド内部で動作しているスレッドを取得するためのものである。 最大値より大きかったり、しきい値を設定した場合は、そのまま返す return false メソッドの終了
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//これが意味するのは、CASがworkerCountをうまく修正すれば、一番外側のスピンがすべて終了するということだ。
if (compareAndIncrementWorkerCount(c))
break retry;
// なぜここで2つのスピンが必要なのか。主な理由は、こちら側と現在のスピンCASの修正WorkerCountの失敗を判断し、ctlが変更されるからだ
//不等号の外層、スピンの外層に戻るために、書き直しに行く。
だからこそ、continue retryを使うのである。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;//workerの実行を開始するかどうか。
boolean workerAdded = false;//worker 成功を加えるかどうか
Worker w = null;
try {
w = new Worker(firstTask);//前述のようにワーカーのコンストラクタに渡されるRunnableは、実際には最初のThreadを構築するためのfirstTaskである。
final Thread t = w.thread;//現在のtは、ワーカーで作成されたRunnableを実行するスレッドである。
if (t != null) {
final ReentrantLock mainLock = this.mainLock;//
mainLock.lock();//ワークダーを追加する際は、スレッドの安全性を確保すること
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//に保存されている労働者HashSetのコレクションに労働者を追加する。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();//
}
if (workerAdded) {//うまく加われば
t.start();//真の仕事人が実行する場所、それがここなのだ。
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//最後のWorkerが実行されない場合は、それをクリーンアップする 対応するWorkerCountを修正する
}
return workerStarted;
}
メソッドは、CASの変更workerCountの失敗の同時実行のケースを解決するために2つのスピンを持つ場所の一番初めに、すべての細部のこちら側は、あらゆる状況が場所で考慮され、判断の状態は特に厳密であり、本当に理解し、プログラミングのマルチスレッドの場合は、パッケージのおかげで、どのように面倒な感じです!
t.Start()メソッドのこちら側を見て、tはスレッドの本体を作成するWorkerであることを知って、スレッドに渡されたタスクを所有することです、スタートは、スレッドの実行を開始することであり、最終的にrunメソッドに呼び出されることを知って、それはrunメソッド内のWorkerに呼び出されると言うことです。
public void run() {
runWorker(this);//ThreadPoolExecutorメソッドの中身
}
runWorker
上で述べたように、runメソッドはスレッドの開始後に呼び出されます。つまり、runWorkerメソッドが呼び出されることになります。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//仕事人の中の仕事を手に入れる
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//いつもループしながら
while (task != null || (task = getTask()) != null) {
w.lock();//ロックされた労働者
//スレッドプールの現在の状態が停止しているかどうかを判断し、現在のスレッドの割り込みステータスを検出する もしそれが偽であれば、現在のスレッドが割り込みを実行するのを助ける 割り込みコール()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//タスクを実行する前に行動する
Throwable thrown = null;
try {
task.run();//最後のRunnableタスクを実行する
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);///ポスト・アクションのタスクを実行する
}
} finally {
task = null;
w.completedTasks++;//Worker完了したタスク+1
w.unlock();//
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//Worker実施終了のお知らせ
}
}
RunWorkerメソッドは、タスクのコアメソッドを実行するために全体のスレッドプールであり、スレッドは、タスクを取得するために、内部のブロッキングキューから常にWhileループを使用し、タスクを実行しに行く、ブロッキングキュー内のタスクがない場合は、この時間getTask()メソッドは、新しいタスクの到着までブロックされますので、単体テストを行う際に、スレッドプーリングの使用は、Shutdownメソッドを呼び出さない場合は、赤いドットが実行されている理由がデバッグされます!Shutdownメソッドを呼び出さない場合は、デバッグドットが実行され続ける、それが理由です!
getTask
このメソッドは、ブロッキング・キューからタスクをフェッチします。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//スレッドプールの状態を判断し、SHUTDOWNでキューが空であるか、直接nullの状態であれば、ブロッキングキューからタスクを取り出さない 直接nullを返す
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//timed は、タスクのブロックキューの取得を制御するために使用される 待ち時間があるかどうか、keepAliveTimeの値を設定すると、この側で使用される、設定値以上のタスクを待っている作業スレッドが待機を終了する場合は、スレッドをリサイクルする
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))//スレッド数マイナス1
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();// ミッションを手に入れる
if (r != null)
return r;
timedOut = true;//待機タイムアウトフラグを設定する スピンの中にあるはずで、次の判定ではこの値が使われる
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
submit
executeメソッドをマスターしたら、submitメソッドを見るのはとても簡単です。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);//RunnableをFutureTaskタスクとしてラップし、スレッドに実行させる。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);//RunnableをFutureTaskタスクとしてラップし、スレッドに実行させる。
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
コードのこの側面を参照してください、味に少し慣れている必要があり、最後の記事チャットFutureTaskである必要がありますすでにされている多くの場合、変換する方法RunnableとCallableを含む、Futureは戻り値を取得する方法ですか?私の記事を見に行くことができるかどうかはわかりません!
FutureTaskコンストラクタに対応する上記の3つのコンストラクタは、率直に言って、実行の使用は、FutureTaskが渡すために使用されるとき、FutureTaskもRunableインターフェイスの実装であるためです。
実行フローチャート
最後に、次のタスクが追加されてから実行終了までにどのようなメソッドを通ったかを示すフローチャート!
まとめ
ThreadPoolExecutor 実行メソッドはたくさんありますが、ソースコードを読むための大前提となる一般的な論理演算子、AQS、スレッド、FutureTaskなどの関連知識をマスターしていれば、それほど疲れることはないでしょう。最後に描いたフローチャートは、タスクがスレッドプールに追加されて実行されるまでの全体の流れです!
危機とは何か?
本当の危機は、適切なタイミングで間違ったことをすることから生まれます。適切なタイミングで次のステップへの積み重ねができなかったことが、危機の根本原因なのです。
もしあなたがこの成長の道を歩んでいるのなら、早く目覚めるよりも遅く目覚める方がいい、それが私が言いたいことです。あなたが自分の堀を確立していないことを見つけるために中年まで待ってはいけない、この時間だけ努力を知っています。ステージでの独自の努力では、危機の根源である自分自身を甘やかすための選択肢に対して懸命に働かないだけではありません。
あなたが何かを得て、その時間に恥じない生き方をしてくれることを願っています!





