Stormは、すべてのタプルが正常に処理されることを保証するためにアンカーとackメカニズムを使用して、分散ストリーム処理システムです。タプルのエラーの場合、それは再送信することができますが、どのようにエラータプルが一度だけ処理されることを確認するには、Stormは、トランザクションコンポーネントのトランザクショントポロジーのセットを提供し、この問題を解決するために使用されます。
5.1 整合性トランザクションの設計
嵐どのようにそのタプル並列処理を実現するだけでなく、トランザクションを確保するためです。このセクションでは、単純なトランザクションの実装方法から始まり、徐々にトランザクショントポロジーの原則につながります。
5.1.1 シンプルなデザインI:強い順次フロー
タプルが一度だけ処理されることを保証する最も簡単な方法は、タプル・ストリームを強く順次にし、一度に1つのタプルのみを処理し、各タプルに1から始まる順次IDを与えることです。タプルが処理されると、処理に成功したタプルのIDと計算結果がデータベースに格納されます。次のタプルが来たら、そのidとデータベースのidを比較します。もし同じなら、そのタプルは正常に処理されたことを意味するので無視し、異なるなら、強い順次性に従って、そのタプルは処理されていないことを意味するので、そのタプルのidと計算結果をデータベースに更新します。
メッセージの総数をカウントする例を見てみましょう。各タプルについて、データベースに保存されているidが現在のタプルidと異なる場合、データベース内のメッセージの総数が1増加し、データベース内の現在のタプルidの値が更新されます。図に示すように
しかし、このメカニズムではシステムは一度に1つのタプルのみを処理することになり、分散コンピューティングを実現することはできません。
5.1.2 シンプルな設計 II: 強順次バッチフロー
バッチ内のタプルは並列処理できます。
バッチが一度だけ処理されるようにするため、バッチIDがデータベースに格納されることを除けば、メカニズムは前節と同様。バッチIDがデータベースに格納される以外は、まずバッチの中間計算結果がローカル変数に格納され、バッチ内のすべてのタプルが処理された後にバッチIDが決定され、中間計算結果がデータベースIDと異なる場合はデータベースに更新されます。
バッチ内のすべてのタプルが処理されたことを確認するには?図のように、Stormが提供するCoordinateBoltを利用できます:
しかし、強力な逐次バッチフローにも限界があり、一度に処理できるバッチは1つだけで、バッチを並列化することはできません。真に分散トランザクション処理を実現するには、stormが提供するTransactional Topologyを使うことができます。ここではまず、CoordinateBoltの原理を詳しく説明します。
5.1.3 座標ボルトの原理
CoordinateBoltの正確な原理は以下の通りです:
- 実際に計算を行うボルトは、外側でCoordinateBoltをカプセル化します。実際にタスクを実行するボルトは、リアルボルトと呼ばれます。
- それぞれのCoordinateBoltは2つの値を記録します:どのタスクが私にタプルを送ってきたか、そしてどのタプルに情報を送りたいか。
- すべてのタプルが送信された後、CoordinateBoltは、emitDirectと呼ばれる特別なストリームを介して、タスクに送信したすべてのタプルに、このタスクに送信したタプルの数を伝えます。下流のタスクは、この数と受信したタプルの数を比較し、等しければ、すべてのタプルの処理を終了したことになります。下流のタスクはこの数と受信したタプルの数を比較し、同じであればすべてのタプルの処理を終了したことになります。
- 下流のCoordinateBoltは、上記の手順を繰り返して下流に通知します。
プロセス全体を図に示します:
CoordinateBoltは主に2つのシナリオで使用されます:
- Transactional Topology
- トランザクション・トポロジー
#p#
5.1.4 トラザクショントポロジー
Stormが提供するトランザクショントポロジーでは、バッチ計算をprocessとcommitの2つのフェーズに分割します。processフェーズでは、連続性を保証することなく複数のバッチを同時に処理することができます。バッチが正常に投入されると、2番目のバッチは投入できません。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields("word"), PARTITION_TAKE_PER_BATCH)PARTITION_TAKE_PER_BATCH
TransactionalTopologyBuilder builder = newTransactionalTopologyBuilder("global-count","spout", spout, 3);
builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
TransactionalTopologyBuilder は、合計 4 つのパラメータを受け取ります。
- Spoutこのトポロジーの id
- TransactionalSpout並列処理。
- TransactionalSpoutの並列性。
以下はBatchCountの定義です:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “count“));
}
}
executeメソッドはバッチ内の各タプルに対して1回実行されるので、バッチの計算状態をローカル変数に保持しておく必要があります。この例では、executeメソッドでタプルの数をインクリメントしています。
*** finishBatchメソッドは、ボルトが特定のバッチのすべてのタプルを受信したときに呼び出されます。この例のBatchCountクラスは、この時にローカルカウントを出力ストリームに出します。
#p#
以下は UpdateGlobalCount クラスの定義です:
public static class UpdateGlobalCount extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “sum“));
}
}
UpdateGlobalCount の finishBatch メソッドでは、現在のトランザクション ID がデータベースに保存されている ID と比較されます。同じ場合は、バッチは無視され、異なる場合は、このバッチの計算結果が合計結果に追加され、データベースが更新されます。
Transactional Topolgyは以下のように実行されます:
トランザクショナルトポロジーの特徴を以下にまとめます。
- BatchBoltは、バッチ化されたタプルを処理し、各タプルに対してexecuteメソッドを呼び出し、バッチ全体が処理されるとfinishBatchメソッドを呼び出します。
- BatchBolt が Committer としてマークされている場合、finishBolt メソッドを呼び出せるのはコミットフェーズの間だけです。バッチのコミットフェーズは、前のバッチが正常にコミットされた後にのみ実行されることがストームによって保証されています。そして、トポロジー内のすべてのボルトがコミットされるまで再試行されます。
5.2 Tridentの紹介
Tridentはタプルを処理のためにバッチにストリーミングし、APIはこれらのバッチの処理をカプセル化し、タプルが一度だけ処理されるようにします。バッチ処理の結果はTridentStateオブジェクトに格納されます。
Tridentの取引原則はここでは詳述しませんので、関心のある読者は各自の情報を参照してください。
/-----!
https://.////-al