メッセージを失えば、データも失うことになり、それは容認できません。
したがって、主流のMQは、実際には、ネットワークの異常があっても、メッセージが失われることなく確実に配信されることを保証する信頼性キャストメカニズムを提供します。
それでもメッセージが失われるようであれば、それは開発者の問題である可能性が高く、おそらくMQを正しく設定していないのでしょう。
紛失したメッセージの検証
大企業では一般的に、各メッセージを簡単に追跡できる分散型リンク追跡システムがあります。中小企業であれば、簡単な検証方法もあります。それは、MQの秩序性を利用することです:
- プロデューサー側では、各送信メッセージに順次増加するシリアル番号を付けてください。
- 次に、コンシューマー側でこのシリアル番号の導通を確認します。
- コンシューマーは厳密にインクリメンタルなメッセージ番号を受け取ります。
- シリアルナンバーが不連続の場合、メッセージは失われます。 シリアルナンバーの欠落により、どのメッセージが欠落しているかを判断することもできます。
- 利点 メッセージ検証コードはビジネスコードに侵入しません。また、システムが安定したら、検証ロジックをオフにしたり削除したりするのも簡単です。
分散システム下での認証方法の実装に留意:
- KafkaやRocketMQはTopicの厳密な順序を保証しているわけではなく、パーティション上のメッセージの順序を保証しているだけなので、メッセージ送信時にパーティションを指定する必要があります。また、各パーティションで個別にメッセージのシーケンス番号の連続性を検証します。
システムに複数のProducerが存在する場合、複数のProducer間で送信順序を調整するのは良くないので、各Producerのメッセージ通し番号を個別に生成し、Producer識別情報を添付して、各Producerに応じてCon側で通し番号の連続性を個別に検証する必要もあります。
コンシューマー・インスタンスの数は、理想的にはパーティションの数に対応させるべきです。そうすることで、メッセージ数の連続性をCon内で簡単に検証することができます。
メッセージの確実な配信
メッセージの紛失につながるものは一体何なのか、どうすれば紛失を避けることができるのか、という疑問をお持ちの方もいらっしゃるでしょう。
- メッセージが生産から消費まで完了する段階
生産段階
メッセージはプロデューサで作成され、ネットワーク・トランスポートを介してブローカに送信されます。
メッセージの送信メソッドを呼び出すと、MQクライアントはブローカにメッセージを送信し、ブローカはメッセージを受信して、受信したことを示す確認応答をクライアントに返します。クライアントがレスポンスを受信すると、通常のメッセージ配信が完了します。
ProがBrokerから確認応答を受け取る限り、プロダクション・フェーズでメッセージが失われることはありません。
- 一部のMQでは、送信確認応答を長時間受信しない場合、自動的に再試行が行われます。
- 再試行に失敗した場合は、戻り値または例外をユーザーに通知します。
メッセージングコードを書くときには、その段階でメッセージが失われないように、返り値を正しく処理したり、例外をキャッチしたりするように注意してください。
代表例
メッセージを確実に送信する例としてKafkaを見てください:
同期的に送信する場合は、例外をキャッチするように注意してください。
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("メッセージは正常に送信された");
} catch (Throwable e) {
System.out.println("メッセージの配信に失敗した!");
System.out.println(e);
}
非同期送信ではコールバック・メソッドでのチェックが必要です。コールバックで結果を確認せずに非同期に送信したために、多くのメッセージが失われました。
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("メッセージは正常に送信された");
} else {
System.out.println("メッセージの配信に失敗した!");
System.out.println(exception);
}
});
ストレージ・ステージ
メッセージはブローカー側に保存され、クラスタリングの場合はその段階で他のレプリカにコピーされます。
通常、Broker が正常に動作している限り、メッセージは失われません。しかし、プロセスが停止していたり、サーバーがダウンしているなど、Brokerに異常が発生すると、メッセージが失われることがあります。
メッセージの信頼性が非常に高い場合は、Broker パラメータを設定して、ダウンタイムによるメッセージの損失を避けることができます。
シングル・ノードBrokerの場合、Brokerパラメータを設定する必要があります。メッセージを受信すると、メッセージがディスクに書き込まれ、Proに確認応答が送信されます。
- 例えば、RocketMQでは、デフォルトの非同期フラッシュメソッドflushDiskTypeを変更する必要があります。
- SYNC_FLUSH同期スワイプディスクに設定。
Brokerが複数ノードのクラスタの場合は、クライアントに確認応答を送信する前に、少なくとも2つのノードにメッセージを送信するようにBrokerクラスタを構成する必要があります。こうすることで、Brokerがダウンしても、他のBrokerがダウンしたノードを置き換えることができ、メッセージが失われることはありません。
消費ステージ
ConはBrokerからメッセージを取得し、ネットワーク経由でConに送信します。
このフェーズでは、信頼性の高い配信を保証するために、プロダクション・フェーズと同様の確認メカニズムを使用します。クライアントはブローカからメッセージをプルした後、ユーザの消費ビジネス・ロジックを実行し、成功した場合にのみ消費確認応答をブローカに送信します。Brokerが消費確認応答を受信しなかった場合、次にメッセージをプルしたときにも同じメッセージが返され、メッセージがネットワーク伝送プロセスで失われないようにし、消費ロジックの実行中にクライアントがエラーを起こしたために失われないようにします。
消費コードを記述する際は、メッセージを受信した直後に消費確認を送信するのではなく、すべての消費ビジネスロジックを実行した後に消費確認を送信することに注意してください。信頼性の高い消費コードの実装例として、以下のPython言語によるRabbitMQメッセージの消費を参照してください:
def callback(ch, method, properties, body):
print(" [x] メッセージ受信%r" % body)
# ここで受信メッセージを処理する
database.save(body)
print(" [x] 消費は完了した")
# 消費ビジネス・ロジックの完了後、消費確認応答を送信する。
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
消費用コールバックメソッドのコールバックでは、正しい順序は次のとおりです。
- まずメッセージをデータベースに保存してください。
- その後、消費確認応答が送信されます。
こうすることで、メッセージのデータベースへの保存に失敗しても、消費確認 コードが実行されることはなく、次にメッセージが引き出されたときには、そのメッ セージが正常に消費されるまで、同じメッセージのままになっています。
まとめ
MQでは、消費へのメッセージ送信プロセス全体を通して、どのようにしてメッセージが確実に配信され、失われないようにしているのでしょうか。このプロセスは3つのフェーズに分けることができ、それぞれが正しくコード化され、MQの信頼性メカニズムで動作するように構成されている必要があります。
- 本番フェーズでは、メッセージ配信のエラーを検出し、メッセージを再送信する必要があります。
- ストレージ・フェーズでは、スワイピングとレプリケーションに関連するパラメータを設定し、複数のレプリカ・ディスクにメッセージを書き込むことができます。
- 消費フェーズでは、すべての消費ビジネスロジックが処理された後、消費確認が送信されます。
これらのフェーズの原理を理解した上で、もしまたメッセージを失うようなことがあれば、コードにログを追加することで、どのフェーズで問題が発生したかを特定し、さらに分析することができます。
二人の消費者が次々とメッセージを引きに行った場合、同じメッセージを引きますか?
メッセージをプルするとき、コンシューマ A がプルし、その後確認応答をブローカに送信せずにダウンします。この時点ではメッセージはまだブローカにあるはずですが、この時点でプルした場合、コンシューマ B はそのメッセージをコンシューマ A にプルするのでしょうか?まず、MQには一般的にこのようなことが起こらないような調整メカニズムがあります。しかし、ネットワークの不確実性により、このようなことが起こる確率は非常に小さいです。同じコンシューマ・グループ内で、コンシューマAがindex=10のメッセージをプルし、まだ確認応答を送信しておらず、その時点でこのパーティションのコンシューマの位置はまだ10で、コンシューマBが来てメッセージをプルすると、2つのシナリオが考えられます:
- タイムアウト前に、ブローカーはパーティションがまだAに占有されていると考え、Bのリクエストを拒否します。
- タイムアウト後、ブローカーはAがタイムアウト後に戻ってこなかったと考え、消費に失敗しますが、現在の消費位置はまだ10であり、Bがメッセージを引きに戻れば10を返します。
消費者による重複メッセージの処理
ネットワーク伝送中にメッセージが誤って送信された場合、送信者が確認応答を受け取らないため、メッセージが失われないように再送されます。しかし、ネットワーク伝送中に確認応答が失われると、メッセージの再送にもつながります。つまり、Broker と Consumer の両方が重複したメッセージを受信する可能性があり、消費コードを記述する際にはこの状況を考慮する必要があります。メッセージを消費するコードでは、ビジネス・ロジックの正しさに影響を与えないように、このような重複メッセージにどのように対処すればよいのでしょうか?
重複メッセージの原因を生成します:
- メッセージ送信ステージ、重複メッセージ送信
- メッセージフェーズの消費、重複メッセージの消費
通常、メッセージには一意性があります。
それが MQ 自体の msgId であろうと、ビジネス・オー ダー番号であろうと、そのようなものであろうと、この一意性に一意なインデックスを 作成するコンシューマ・テーブルが DB に存在します。各コンシューマ・ロジックが処理される前に INSERT を行い、DB で重み付けを解除します。
ソリューション:ビジネスサイドの重複排除
- メッセージ・テーブルを作成し、消費者がメッセージIDを唯一の主キーとして、挿入操作を行うためにメッセージを取得します。
- redisを使用して、メッセージにグローバルIDを割り当て、メッセージが消費される限り、K-Vの形式でredisにメッセージを書き込み、メッセージを消費し、対応するレコードがあるかどうかを照会るためにキーに基づいてredisに移動します。
ProがBrokerにメッセージを送信する場合、このメソッドはBrokerがメッセージを受信して適切に格納した後にのみ返されます。
======================= 一般にビジネスでは難しい消費側の冪等性をサポート。コンシューマ側で冗長性解消の仕組みを増やすには、成功した N 個のメッセー ジのうち、最新の消費メッセージの SN をキャッシュしておき、メッセージを 受信したら、まず消費メッセージかどうかを確認し、消費メッセージであ れば、直接 ACK して消費を断念するなどの方法が考えられます。このアイデアは問題ありません。
======================= もしidempotencyができないのであれば、消費側で消費されたメッセージのメッセージIDを保存する必要があります。
- もし消費前のセーブだとしたら、消費に失敗した場合、次に同じメッセージが消費されたら、成功したとみなされるのでしょうか?
- 消費に成功した後に保存した場合、その消費は部分的に成功したことになりますか?トランザクションのACID機能が満たされない限り。IDの連続性チェックに依存する場合、プロデューサは1つのコンシューマにしか対応しませんか?
ProducerはProducerIdと送信するパーティションを指定するメッセージを送信し、Consumerは消費します。
======================= If a Producer's message ack is lost due to network failure, then the Producer at this time to resend the message's unique identification should be the same as that message, then only in the Consumer to accept the message before determining whether there is the same identification of the message, if so, then intercept.また、ビジネスロジックのインターフェイスべき等判断の消費者側で行うことができる、前の種類は、ビジネスコードに侵入することなく行うことができます。
とても良いことです!しかし、分散環境で「同じIDのメッセージがあるかどうかを判断する前に、Consumerがメッセージを受け入れる」実装方法を検討する必要があります。
======================= Detecting message loss is a test that is not yet online, but is it possible that messages may not be inconsistent offline, but are lost online?オンラインで失われたメッセージを検出するロジックはオフになりますが、それ以外の 検出メカニズムはオンラインになるのでしょうか?この検出ロジックはオンラインで行うことができ、ビジネスには影響しません。