4.1 はじめに
stormは、spoutが送信するすべてのメッセージが完全に処理されることを保証します。本章では、stormシステムがどのようにこの目標を達成するかについて説明し、開発者が信頼性の高いデータ処理を実現するためにこれらのstormメカニズムをどのように使用すべきかについて詳しく説明します。
4.2 メッセージが全体として処理されることの理解
スパウトから送信された1つのメッセージは、そのメッセージに基づいて何百ものメッセージが作成される可能性があります。
ストリーミングの「ワード数」の例を考えてみましょう:
Stormタスクは、データソースから一度に1つの完全な英文を読み込み、この文を個別の単語に分解し***、各単語とそれが出現した回数をリアルタイムで出力します。
この例では、スパウトから送り出されるメッセージのひとつひとつが、多くのメッセージを生み出す引き金となり、文から切り離された単語が新たに生み出されるメッセージとなります。
これらのメッセージは、図1のような「タプル・ツリー」と呼ばれるツリー構造を形成します:
図1 タプル・ツリーの例
ストームは、スパウトから送信されたメッセージをどのような条件で完全に処理したと見なしますか?答えは、以下の条件が同時に満たされた場合です:
- tuple treeこれ以上の成長
- ツリー内のメッセージはすべて "processed "とマークされます。
メッセージから派生したタプル・ツリーが指定されたタイムアウト時間内に正常に完全 に処理されない場合、メッセージは不完全に処理されたと見なされます。このタイムアウト値はタスクレベルのConfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS パラメータで設定可能で、デフォルトのタイムアウト値は30秒です。
4.3 メッセージのライフサイクル
メッセージが完全に処理された場合、または完全に処理されなかった場合、Stormは次にどのように処理を進めるのでしょうか?これを知るには、spoutから送信されるメッセージのライフサイクルを調べます。以下は、spoutが実装すべきインターフェースのリストです:
まず、StormはspoutインスタンスのnextTuple()メソッドを使用してspoutにメッセージを要求します。 リクエストを受信すると、spoutはopenメソッドで提供されるSpoutOutputCollectorを使用して、1つ以上のメッセージを出力ストリームに送信します。送信された各メッセージに対して、Spoutはメッセージを識別するために使用されるメッセージIDを提供します。
メッセージが kestrel キューから読み込まれた場合、Spout は kestrel キューがこのメッセージに対して設定した ID を、このメッセージのメッセージ ID として使用します。 SpoutOutputCollector へのメッセージの送信形式は以下の通りです:
次に、これらのメッセージは後続のビジネス処理ボルトに送信され、Stormはメッセージから派生した新しいメッセージを追跡します。メッセージから派生したタプルツリーが完全に処理されたことを検出すると、StormはSpoutのackメソッドを呼び出し、引数としてメッセージのmessageIDを渡します。同様に、メッセージの処理がタイムアウトした場合、対応するSpoutのfailメソッドが引数として渡されたメッセージのmessageIDとともに呼び出されます。
注:メッセージは、それを送信したspoutタスクによってのみackまたはfailと呼ばれ、システム内の複数のタスクによってspoutが実行されている場合、メッセージはそれを作成したspoutタスクによってのみ応答され、他のspoutタスクによって応答されることはありません。
引き続き、kestrelのキューからメッセージを読み込む例を用いて、spoutが高い信頼性を持って行う必要があることを説明します。
kestrel メッセージキューの簡単な説明から始めましょう:
KestrelSpout が kestrel キューからメッセージを読み込むと、キュー内のメッセージを "オープン" します。これは、メッセージが実際にキューから削除されるのではなく、"pending "に設定され、クライアントからの回答を待ち、その後メッセージが実際にキューから削除されることを意味します。pending "状態のメッセージは、他のクライアントからは見えません。また、クライアントが予期せず切断した場合、そのクライアントによって "開かれた "メッセージはすべてキューに追加されます。kestrelのキューは、"open "されたメッセージに一意な識別子を与えます。
KestrelSpout は、この一意な識別子をタプルの messageID として使用します。その後、ack や fail が呼び出されると、KestrelSpout は messageID とともに ack や fail を kestrel キューに送信し、kestrel は実際にキューからメッセージを削除したり、キューに戻したりします。
#p#
4.4 信頼性関連API
ストームが提供する信頼性の高い処理機能を使用するには、2つのことを行う必要があります:
- タプル・ツリーに新しいノードが作成されるたびに、Stormに明示的に通知する必要があります;
- 1つのメッセージが処理されると、Stormはタプル・ツリーの変更された状態を伝える必要があります。
上記の2つのステップを通して、stormはタプル木が完全に処理されたことを検出し、関連するackまたはfailメソッドを呼び出します。
タプル木の指定されたノードに新しいノードを追加することをアンカリングと呼びます。アンカリングはメッセージの送信中に行われます。説明しやすいように、次のコードを例として使用します。この例のボルトは、文全体を含むメッセージを、それぞれが1つの単語を含む一連のサブメッセージに分割します。
入力メッセージは emit メソッドの *** 引数として使用されます。単語メッセージは、spoutによって送信されるタプル ツリーのルートノードである入力メッセージに固定されているため、単語メッセージのいずれかが処理に失敗すると、タプル ツリーが派生したspoutメッセージが再送されます。
代わりに、Stormがどのようにemitメッセージを処理するか、以下のアプローチで見てみましょう:
この方法でメッセージを送信すると、このメッセージはアンカリングされません。このタプル・ツリーのメッセージの処理が失敗すると、このタプル・ツリーから派生するルート・メッセージは再送されません。タスクのフォールト・トレランス・レベルによっては、アンカリングされていないメッセージを送信することが適切な場合もあります。
出力メッセージは、1 つ以上の入力メッセージにアンカリングすることができます。多重アンカリングされたメッセージの処理に失敗すると、そのメッセージに関連する複数の spout メッセージが再送されます。多重アンカリングは、emit メソッドで複数の入力メッセージを指定することで実現します:
マルチアンカリングは、アンカリングされたメッセージを複数のタプル木に追加します。
注:複数のバインディングは、図2に示すように、DAGを構成する伝統的なツリー構造を乱す可能性があります:
図2 複数のアンカーで構成された菱形の構造体
ストームの実装では、DAGをツリーのように扱うことができます。
アンカリングでは、指定されたタプル・ツリーにメッセージを追加する方法を示し、高信頼処理APIの次の部分では、タプル・ツリー内の1つのメッセージが処理されたときに何を行うかを説明します。これは OutputCollector の ack メソッドと fail メソッドで行います。例の SplitSentence を振り返ると、すべての単語メッセージが送信されたときに、文を表す着信メッセージに .
処理された各メッセージは、成功か失敗かを示さなければなりません。ストームは、各メッセージの処理を追跡するためにメモリーを使用しています!>
多くのボルトは、メッセージを読み、その派生サブメッセージを送信し、実行の最後にメッセージに応答するという、特定の処理フローに従います。一般的なフィルターや単純な処理関数は、このタイプのアプリケーションです。 Stormには、これらの処理をカプセル化するBasicBoltインターフェースがあります。例の SplitSentence は BasicBolt を使って書き換えることができます:
このアプローチでは、コードは若干シンプルになりますが、実装は同じです。BasicOutputCollectorに送られたメッセージは自動的に入力メッセージに固定され、実行が終了すると入力メッセージに自動的に応答します。
多くの場合、メッセージは集約や結合のような遅延された答えを必要とし、入力メッセー ジの集合から結果が得られて初めてすべての入力メッセージに答えられます。また、集約や結合はほとんどの場合、出力メッセージにマルチアンカーされます。しかし、これらの機能はIBasicBoltでは扱えません。
#p#
4.5 タプル木の効率的な実装
Storm システムには、DAG 内の各メッセージを追跡する "ackers" と呼ばれる特別なタスクがあります。DAGが完全に処理されると、ルートメッセージを作成したspoutタスクにシグナルを送ります。トポロジー内のアッカータスクの並列度は設定パラメータConfig.TOPOLOGY_ACKERSで設定できます。システム内のメッセージ数が多い場合は、アッカータスクの並列度を適切に上げる必要があります。
Stormの信頼性メカニズムを理解するために、まずメッセージのライフサイクルとタプ ルツリーの管理について説明します。メッセージが作成されると、システムは64ビットのランダムな値をidとしてメッセージに割り当てます。 これらのランダムなidは、ackerがspoutメッセージから派生したタプルツリーを追跡するために使用されます。
各メッセージはそれが存在するタプルツリーのルートメッセージのidを知っており、ボルトが新しいメッセージを生成するたびに、タプルツリーのルートメッセージのmessageIdがそのメッセージにコピーされます。例えば、「このメッセージは処理されましたが、新しいメッセージがいくつか生まれました。
例として、メッセージ D と E がメッセージ C から派生したものだとします。
Cがツリーから削除されるのと同時にDとEがタプル・ツリーに追加されるため、タプル・ツリーは完全に処理されたと早合点されません。
Stormがどのようにタプルツリーを追跡しているかについて、もう少し詳しく説明します。前述のように、システムにはいくつでもアッカが存在する可能性があります。では、メッセージが作成されたり返信されたりするたびに、どのアッカに通知すべきかをどうやって知るのでしょうか?
各メッセージはそのメッセージに対応するルートメッセージの messageId を知っているので、どの acker と通信すべきかがわかります。
spoutがメッセージを送信すると、対応するackerに新しいルートメッセージが生成されたことが通知され、その時点でackerは新しいタプルツリーを作成します。
タプルはどのように追跡されるのですか?システムには何千ものメッセージがあり、スパウトから送信されるメッセージごとにツリーを構築していたら、すぐにメモリが尽きてしまいます。そのため、各メッセージを追跡するために異なる戦略を使用する必要があります。新しい追跡アルゴリズムのおかげで、ストームがツリーを追跡するのに必要なメモリは一定量だけです。このアルゴリズムはストームを正しく機能させる核心であり、ストーム***にとって画期的なものです。
ackerタスクはspoutメッセージのidを2つの値にマッピングします。***つ目の値は "ack val "と呼ばれる64ビットの数値で、ツリー内の全メッセージのランダムなidです。ackvalはツリー全体の状態を表すので、ツリー全体がどんなに大きくても、固定サイズの数値があれば十分です。メッセージが作成され、それに応答すると、同じメッセージ ID が並べ替えの結果としてツリーに送られます。
アッカは ack val が 0 のツリーを見つけるたびに、そのツリーは完全に処理されたことを知ります。メッセージのランダム ID は 64bit の値なので、ツリーの処理が終わったときに ack val が 0 になる確率は非常に小さいのです。1秒間に10,000のメッセージを送信すると仮定すると、エラーが発生するまでに確率的には少なくとも50,000,000年かかります。それでもデータが失われるのは、メッセージが実際に処理に失敗した場合だけです!
4.6 適切な信頼性レベルの選択
Ackerタスクは軽量なので、トポロジーにあまり多くのAckerが存在する必要はありません。acker タスクのスループットは Storm UI で確認することができ、スループットが十分でないようであれば acker を追加する必要があります。
すべてのメッセージを処理する必要がない場合は、メッセージの信頼できる 処理をオフにして、パフォーマンスを向上させることができます。メッセージの信頼できる処理をオフにするということは、システム内のメッセー ジの数が半分になるということです。さらに、メッセージの信頼できる処理をオフにすると、メッセージの サイズが小さくなり、帯域幅を節約できます。
メッセージの信頼できる処理メカニズムには3つの方法があります:
- Config.TOPOLOGY_ACKERSパラメータを0に設定します。このメソッドでは、Spoutがメッセージを送信すると、すぐにackメソッドが呼び出されます;
- 2つ目の方法は、SpoutがメッセージのmessageIDを指定せずにメッセージを送信する方法です;
- *** 特定のメッセージから派生する子孫メッセージの信頼性を気にしない場合、このメッセー ジから派生する子孫メッセージはアンカリングなしで送信されます。これらの子孫メッセージはどのタプル・ツリーにもアンカリングされていないため、そのメッセージの送信に失敗しても、spout がメッセージを再送信することはありません。
4.7 すべてのレベルのクラスタリングにおける耐障害性
ここまでで、Stormの信頼性メカニズムを理解し、要件を満たすためにさまざまな信頼性レベルを選択する方法を知ることができました。次に、Stormがさまざまな状況でデータを失わないようにする方法を調べます。
3.7.1 ミッションレベルの故障
- ボルト・タスクがクラッシュしたため、メッセージに応答できません。この時点で、このボルトタスクに関連するアッカ内のメッセージはすべてタイムアウトで失敗し、対応するスパウトの fail メソッドが呼び出されます。
- acker タスクが失敗します。acker タスク自体が失敗した場合、 失敗ホールド内のメッセージはタイムアウトですべて失敗します。
- Spoutタスクは失敗します。この場合、Spoutタスクがドッキングされている外部デバイスがメッセージの整合性に責任を持ちます。例えば、クライアントの例外が発生した場合、kestrel のキューは保留状態のすべてのメッセージをキューに戻します。
4.7.2 タスクスロットの失敗
- Worker は失敗します。各 Worker には複数のボルトタスクが含まれています。supervisor はこれらのタスクを監視する責任があり、Worker が失敗すると、supervisor はローカルで再起動を試みます。
- supervisorはステートレスであるため、supervisorがタイムリーに再起動する限り、supervisorの障害が現在実行中のタスクに影響することはありません。
- nimbusはステートレスなので、nimbusが故障しても、タイムリーに再起動される限り、現在実行中のタスクには影響しません。nimbusはブートストラップされていないので、タイムリーに再起動するには外部からの監視が必要です。
4.7.3. クラスタノードの障害
- ストームクラスターのノードに障害が発生しました。この時点で、nimbusはこのマシンで実行中のすべてのタスクを他の利用可能なマシンで実行するように移動します。
- zookeeper クラスタ内のノードに障害が発生した場合、zookeeper はマシンの半分以下がダウンしても正常に機能することを保証します。
4.8 まとめ
この章では、stormクラスタがどのようにデータの信頼性の高い処理を可能にするかについて説明します。革新的なタプル・ツリー・トラッキング技術により、stormはデータ応答メカニズムを通じてデータが失われないことを効率的に保証します。
nimbusはステートレスに設計されているため、タイムリーに再起動できる限り、実行中のタスクに影響を与えることはありません。




