blog

RabbitMQでメッセージの信頼性を確保するには?

正常に消費されたメッセージは、Producer->MQ->Consumer のプロセスを経るため、3 つのステップすべてでメッセージ損失の可能性があります。 AMQP プロトコルには、メッセージ配信時...

Nov 27, 2020 · 6 min. read
シェア

コンシューマされたメッセージは、Producer->MQ->Consumer を経て正常にコンシューマされるため、3つのステップすべてでメッセージが失われる可能性があります。

メッセージ・プロデューサが MQ にメッセージを正常に送信していません。

業務の遂行

AMQP プロトコルは、メッセージ配信時にトランザクション・サポートをオンにし、メッセージ配信に失敗した場合にトランザクションをロールバックするトランザクション・メカニズムを提供します。

カスタム・トランザクション・マネージャー

@Configuration
public class RabbitTranscation {
	
 @Bean
 public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
 return new RabbitTransactionManager(connectionFactory);
 }
 @Bean
 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
 return new RabbitTemplate(connectionFactory);
 }
}

ymlの修正

spring:
 rabbitmq:
	# メッセージがキューに受信されずに返される
 publisher-returns: true

トランザクション・サポートの有効化

rabbitTemplate.setChannelTransacted(true);

メッセージ未受信時の ReturnCallback の呼び出し

rabbitTemplate.setMandatory(true);

プロデューサーの配信メッセージ

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {
 @Autowired
 RabbitTemplate rabbitTemplate;
 @PostConstruct
 public void init(){
 // トランザクションを有効にするチャネルを設定する
 rabbitTemplate.setChannelTransacted(true);
 rabbitTemplate.setReturnCallback(this);
 }
 
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 System.out.println("このメッセージは送信に失敗した"+message+", ");
 }
 
 @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
 public void publishMessage(String message) throws Exception {
 rabbitTemplate.setMandatory(true);
 rabbitTemplate.convertAndSend("javatrip",message);
 }
}

しかし、これは同期的な操作であるため、メッセージを送信すると、送信者は次のメッセージに進む前にRabbitMQ-Serverからの応答を待つ必要があり、メッセージを生成するプロデューサーのスループットとパフォーマンスが大幅に低下するため、ほとんど行われません。

送信者確認メカニズム

メッセージ送信時にチャネルは確認モードに設定されます。 メッセージがチャネルに入ると一意の ID が割り当てられ、メッセージがマッチしたキューに配信されると RabbitMQ はプロデューサーに確認応答を送信します。

メッセージ確認メカニズムの有効化

spring:
 rabbitmq:
 # メッセージがキューに受信されずに返される
 publisher-returns: true
 # メッセージ確認応答メカニズムを有効にする
 publisher-confirm-type: correlated

メッセージ未受信時の ReturnCallback の呼び出し

rabbitTemplate.setMandatory(true);

プロデューサーの配信メッセージ

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
 @Autowired
 RabbitTemplate rabbitTemplate;
 @PostConstruct
 public void init() {
 rabbitTemplate.setReturnCallback(this);
 rabbitTemplate.setConfirmCallback(this);
 }
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 if(ack){
 System.out.println("メッセージを確認した"+correlationData);
 }else{
 System.out.println("確認応答が失敗する:"+correlationData+"コンシューマがメッセージを正常に消費した後、手動で確認する:"+cause);
 }
 }
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 System.out.println("このメッセージは送信に失敗した"+message+", ");
 }
 public void publisMessage(String message){
 rabbitTemplate.setMandatory(true);
 rabbitTemplate.convertAndSend("javatrip",message);
 }
}

メッセージのリトライ・メカニズムであるメッセージ補償は、メッセージの確認に失敗した場合に実行できます。メッセージの再配信は、確認が取れなかった場合に行われます。これは以下のコンフィギュレーションを設定することで可能です。

spring:
 rabbitmq:
 # メッセージ失敗後のキュー再参加をサポートする
 publisher-returns: true
 # メッセージ確認応答メカニズムを有効にする
 publisher-confirm-type: correlated
 listener:
 simple:
 retry:
 # リトライを有効にする
 enabled: true
 # 最大再試行回数
 max-attempts: 5
 # リトライ間隔
 initial-interval: 3000

II メッセージがMQに送信された後、MQがダウンし、メッセージがメモリから失われます。

MQではメッセージが失われる可能性があり、キューとメッセージの両方を永続化する必要があります。

Queue アノテーションは、キューに関連する以下のプロパティを提供します:

  1. name: キューの名前
  2. durable: 永続的かどうか
  3. exclusive: 排他的、排他的かどうか;
  4. autoDelete: 自動削除するかどうか;
  5. arguments: キューのその他の属性パラメータで、以下のオプションがあります:
    • x-message-ttl: メッセージの有効期限;
    • x-expires: キューの有効期限時間、キューがアクセスされない場合に削除される時間、ミリ秒単位;
    • x-max-length: キューの最大長。これを超えた場合、メッセージはキューの先頭から 削除されます;
    • x-max-length-bytes: メモリサイズによって制限される、キューのメッセージ内容によって占められる最大スペース;
    • x-overflow: キューのオーバフローの挙動を設定します。これは、キューの最大長に達した場合にメッセージがどうなるかを決定します。有効な値は drop-head、reject-publish、reject-publish-dlx のいずれかです。調停キュータイプでは drop-head のみがサポートされています;
    • x-dead-letter-exchange: 期限切れまたは削除されたメッセージが送信されるように指定できる dead-letter-exchange の名前;
    • x-dead-letter-routing-key: デッド・レター・メッセージのルーティング・キー。メッセージがデッド・レター交換に送られるときに使われます。
    • x-single-active-consumer: キューが単一のアクティブなコンシューマであるかどうか。true の場合、登録されたコンシューマグループの中の一つのコンシューマだけがメッセージを消費し、他のコンシューマは無視されます。
    • x-max-priority: キューがサポートする優先度の最大数。設定されていない場合、キューはメッセージの優先度をサポートしません;
    • x-queue-mode: RAM の使用量を減らすために、可能な限り多くのメッセージを ディスク上に保持するために、キューを遅延モードに設定します;
    • x-queue-master-locator: クラスタモードにおける Image キューのマスタノード情報を設定します。

永続キュー

キューを作成する際には、永続属性 durable を true に設定し、autoDelete を false に設定してください。

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

永続的メッセージ

SpringBootのデフォルトでは永続的です。

III コンシューマがメッセージの消費を完了する前に例外を発生させてメッセージを消費

コンシューマはメッセージを消費しただけで、まだビジネスを処理していないため、例外が発生します。このような場合は、自動確認をオフにして、代わりに手動でメッセージを確認する必要があります。

ymlを手動サインオンモードに変更

spring:
 rabbitmq:
 listener:
 simple:
 # 手動サインオフ・モード
 acknowledge-mode: manual
 # 一度に1つのメッセージに署名する
 prefetch: 1

消費者マニュアルへの署名

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {
 @RabbitHandler
 public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
 System.out.println(message);
 // 一意のメッセージID
 Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
 // メッセージを確認する
 if(...){
 	channel.basicAck(deliverTag,false);
 }else{
 // 消費に失敗した場合、メッセージはキューに戻される
 channel.basicNack(deliverTag,false,true);
 }
 
 }
}

概要

メッセージ紛失の理由

プロデューサー、MQ、およびコンシューマーはすべて、メッセージの損失を引き起こす可能性があります。

メッセージの信頼性を確保するには?

  • 送信者は送信者確認モードを採用
  • キューとメッセージの永続化のためのMQ
  • 消費者購入成功後の手動確認メッセージ
Read next

OCMockのソースコード解析

開発作業負荷の需要を増加させるが、ある程度コードの安定性を向上させるために、毎日の仕事の不可欠な部分として、ユニットテスト。特に、以前のアルゴリズムや境界が異常であるかどうかを検証するには、通常、反復はより速く、より正確であり、手動テストの間に歴史的なロジックの詳細の省略を避けることができます。 新しいコードを追加する場合、テスト容易性を確保することで、単一のテストをスムーズに書くことができます。

Nov 27, 2020 · 9 min read