blog

詳しい解説!RabbitMQでデータ消失を防ぐには、これだけ読めば十分だ!

ご覧のように、メッセージの全プロセスは2つのネットワーク伝送を経由します:プロデューサーからRabbitMQサーバーへ、そしてRabbitMQサーバーからコンシューマーへ。 メッセージはコンシューマに...

Aug 21, 2020 · 12 min. read
シェア

マインドマップ

I. データ損失の原因の分析

RabbitMQのメッセージロスを分析するには、プロデューサからコンシューマにメッセージを送信して消費するプロセスを見るのが便利です:

プロデューサからRabbitMQサーバへ、RabbitMQサーバからコンシューマへです。

消費者によって消費されるまでキューに保存されます

つまり、メッセージ・ロスが発生するシナリオは3つあるということです:

  • キューに格納されたメッセージは、キューがメッセージを永続化しない場合、RabbitMQサーバーのダウンタイムや再起動によってデータが失われます。
  • プロデューサーがRabbitMQサーバーにメッセージを送信する過程で、RabbitMQサーバーがダウンしてサービスを停止すると、メッセージは失われます。
  • コンシューマはキューに格納されたデータをRabbitMQサーバから取得して消費しますが、コンシューマプロセスのエラーやダウンタイムによってデータが失われ、正しく消費されません。

上記の3つのシナリオに対して、RabbitMQはそれぞれメッセージの永続性、確認メカニズム、ACKトランザクションメカニズムという3つの解決方法を提供しています。

メッセージの永続性

RabbitMQはメッセージの永続化をサポートしています。 メッセージの永続化には、Exchange for persistentとQueue persistentの設定が必要です。

まず、Exchange スイッチのクラス図を見てください:

このクラス図を見ると、実は前回の記事で紹介した4つのスイッチは、すべてAbstractExchange抽象クラスのサブクラスであることを説明するためのもので、javaの特性上、サブクラスのインスタンスの生成は、まず親クラスのコンストラクタを呼び出すことになり、親クラス、つまりAbstractExchangeのコンストラクタは、どのように見えるでしょうか?

上のコメントからわかるように、durableパラメータは永続的かどうかを示します。デフォルトは persistent です。永続的な Exchange を作成するには、以下のように記述します:

	@Bean
 public DirectExchange rabbitmqDemoDirectExchange() {
 //Directスイッチ
 return new DirectExchange(RabbitMQConfig).RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
 }

Queueキューに移り、まずQueueコンストラクタがどのようなものか見てみましょう:

永続化するかどうかも durable パラメータで設定します。このパラメータのデフォルトは true です:

	@Bean
 public Queue fanoutExchangeQueueA() {
 	//名前を指定するだけで、デフォルトで永続化される
 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
 }

これでメッセージの永続化の設定は完了です。次にプロジェクトを起動して、ご覧のようにいくつかのメッセージを送信してみましょう:

ッセージの永続化は、ダウンタイムや再起動によってRabbitMQ Server内のメッセージが失われることを防ぎます

III.メッセージ確認メカニズム

confirm

プロデューサがRabbitMQ Serverに送信する際、ネットワークの問題により配信に失敗し、データが失われる可能性があります。確認モードを使用することで、データの損失を防ぐことができます。上図から、2つのコールバック関数 **confirm(), returnedMessage()** を使って通知していることがわかります。

プロデューサーから RabbitMQ に送信されたメッセージは、まず Exchange に送信され、コールバック関数confirm()に対応します。第二段階はExchangeからQueueにルーティングされ、対応するコールバック関数はreturnMessage()です

コードはどのように実装されているのか、デモをご覧ください:

まず、application.yml設定ファイルに以下の設定を追加します:

Spring:
 rabbitmq:
 publisher-confirms: true
# publisher-returns: true
 template:
 mandatory: true
# publisher-confirmstrue を設定する。メッセージが Exchange に配信されると、confirm() メソッドがコールバックされてプロデューサに通知される。
# publisher-returnstrueに設定されているとき。メッセージがキューにマッチして失敗した場合、returnMessage()メソッドを呼び出すことでそのメッセージが返される。
# Spring.rabbitmq.template.mandatory: trueに設定すると指定されたメッセージは、それがキューによって受信されなかった場合、コールバックのreturnMessage()メソッドによって返される。

細かいことですが次にコールバック・メソッドを定義する必要があります:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
 private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
 /**
 * Exchangeに届くメッセージを待つ
 *
 * @param correlationData メッセージの一意な識別子を含むオブジェクト。
 * @param ack true マーカーアック、偽マーカー nack
 * @param cause nack 不履行の理由
 */
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 if (ack) {
 logger.info("メッセージの配信に成功~メッセージID:{}", correlationData.getId());
 } else {
 logger.error("メッセージの配信に失敗した:{}エラーメッセージが表示された:{}", correlationData.getId(), cause);
 }
 }
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
 logger.info("返されたメッセージを取得するために、メッセージがキューにルーティングされることはない。");
 Map map = byteToObject(message.getBody(), Map.class);
 logger.info("message body: {}", map == null ? "" : map.toString());
 logger.info("replyCode: {}", replyCode);
 logger.info("replyText: {}", replyText);
 logger.info("exchange: {}", exchange);
 logger.info("routingKey: {}", exchange);
 logger.info("------------> end <------------");
 }
 @SuppressWarnings("unchecked")
 private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
 T t;
 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
 ObjectInputStream ois = new ObjectInputStream(bis)) {
 t = (T) ois.readObject();
 } catch (Exception e) {
 e.printStackTrace();
 return null;
 }
 return t;
 }
}

ここでは、コールバックメソッドによって返されたメッセージを単純に表示することにします。 実際のプロジェクトでは、返されたメッセージをログテーブルに保存し、さらに処理を行うために定時タスクを使用することができます。

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
	@Resource
 private RabbitmqConfirmCallback rabbitmqConfirmCallback;
 @Resource
 private RabbitTemplate rabbitTemplate;
 @PostConstruct
 public void init() {
 //  ConfirmCallback
 rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
 //  ReturnCallback
 rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
 }
 
 @Override
 public String sendMsg(String msg) throws Exception {
 Map<String, Object> message = getMessage(msg);
 try {
 CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
 rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
 return "ok";
 } catch (Exception e) {
 e.printStackTrace();
 return "error";
 }
 }
 
	private Map<String, Object> getMessage(String msg) {
 String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
 CorrelationData correlationData = new CorrelationData(msgId);
 String sendTime = sdf.format(new Date());
 Map<String, Object> map = new HashMap<>();
 map.put("msgId", msgId);
 map.put("sendTime", sendTime);
 map.put("msg", msg);
 map.put("correlationData", correlationData);
 return map;
 }
}

次のテストは、コンソールにメッセージを送信することです:次のテストでは、メッセージを送信すると、コンソールすることができます:メッセージを送信すると、キューに一致するようにルーティングされていないと仮定すると、次のメッセージを見ることができます:これは確認モードです。その役割は、RabbitMQにメッセージを配信するプロデューサーが失われないようにすることです。

トランザクション・メカニズム

非常に最初の図は、すでに話している、キューからメッセージを取得する消費者は、直接、消費者がダウンしているか、プログラムが例外があると仮定して、署名が確認され、データが正しく消費されない、このような状況は、データの損失になります

そこで重要なのは、自動署名を手動署名に変更することです。通常の消費は、署名を確認するために戻り、例外が発生した場合は、キューに戻って署名を拒否するために戻ります。コードを達成する方法は、デモを参照してください:

まず、コンシューマのapplication.ymlファイルで、トランザクションコミットを手動手動モードに設定します:

Spring:
 rabbitmq:
 listener:
 simple:
		acknowledge-mode: manual # マニュアルアックモード
 concurrency: 1 # 最低消費者数
 max-concurrency: 10 # 最大消費者数

それから、消費者のリスナーを書いてください:

@Component
public class RabbitDemoConsumer {
 enum Action {
 //加工に成功
 SUCCESS,
 //再試行可能なエラー、キューへのメッセージの再参加
 RETRY,
 //再試行を必要としないエラーは、メッセージを拒否し、キューから削除する。
 REJECT
 }
 @RabbitHandler
 @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
 public void process(String msg, Message message, Channel channel) {
 long tag = message.getMessageProperties().getDeliveryTag();
 Action action = Action.SUCCESS;
 try {
 System.out.println("コンシューマーであるRabbitDemoConsumerは、RabbitMQサーバーからメッセージを消費する:" + msg);
 if ("bad".equals(msg)) {
 throw new IllegalArgumentException("テスト:リエントラントキュー例外を投げる");
 }
 if ("error".equals(msg)) {
 throw new Exception("テスト:キューに入り直す必要のない例外を投げる");
 }
 } catch (IllegalArgumentException e1) {
 e1.printStackTrace();
 //例外のタイプに応じて、アクションがリトライ可能かどうかを設定する。
 action = Action.RETRY;
 } catch (Exception e2) {
 //プリント異常
 e2.printStackTrace();
 //例外のタイプに応じて、アクションがリトライ可能かどうかを設定する。
 action = Action.REJECT;
 } finally {
 try {
 if (action == Action.SUCCESS) {
 //multiple バッチ処理を行うかどうかを示す。true はバッチ ack でタグより小さいメッセージをすべて処理することを意味する。
 channel.basicAck(tag, false);
 } else if (action == Action.RETRY) {
 //Nackメッセージはキューに戻される。
 channel.basicNack(tag, false, true);
 } else {
 //Nackポリシーは拒否され、キューから削除される。
 channel.basicNack(tag, false, false);
 }
 channel.close();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
}

上記のコードを説明し、例外がなければ、手動でRabbitMQサーバーのbasicAck(消費成功)への返信を確認します。

リエントラント例外がスローされた場合は、basicNackに応答してリエントラント・キューを設定します。

返却不可能なキュー例外がスローされた場合は、basicNackに返信し、RabbitMQキューから削除するように設定します。

ackが返す3つのメソッドの意味を説明してください。

確認成功

void basicAck(long deliveryTag, boolean multiple) throws IOException;

コンシューマは、正常に処理された後にこのメソッドを呼び出してメッセージを確認します。

  • deliveryTag: メッセージのインデックス。
  • multiple: バッチするかどうか。true: deliveryTag よりも小さなメッセージを一度に受け取ります。

故障確認

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag: メッセージのインデックス。
  • multiple: バッチかどうか。true: deliveryTag より小さいメッセージを一度に拒否します。
  • requeue: リジェクトされたものを再びキューに入れるかどうか。

故障確認

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:メッセージのインデックス。
  • requeue: リジェクトされたものを再びキューに入れるかどうか。

basicNack()とbasicReject()の違いは、basicNack()はまとめて拒否でき、basicReject()は一度に1つのメッセージしか拒否できないことです。

IV.遭遇した甌穴

ナック機構を有効にすることで発生するデッドループ

上のコードで、テストが "bad" メッセージを送信し、再参加キュー例外を投げるというバグを意図的に書きました。問題があります。キューが再スタートした後にコンシューマが再びコンシューマを行い、コンシューマが例外を投げて再びキューを再スタートさせるため、デッドループが発生します。では、この状況を回避するにはどうすればよいのでしょうか?

nackはデッド・ループを引き起こす可能性があるため、私が提供したアイデアの1つは、basicNack()を使用せず、例外をスローしたメッセージを、スローされた例外、メッセージ・ボディ、メッセージIDを記録するテーブルに入れることです。 これは時間指定タスクで処理されます。

何か良い解決策があれば、メッセージでご相談ください。

double ack

うっかりしてAuto Ackモードをオンにしてしまい、手動でAckモードに戻すと、このエラーが報告されることがあります:

RabbitDemoConsumerコンシューマーはRabbitMQサーバーからメッセージを消費する。
 .148 ERROR 4880 --- [ .1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=-:2 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@.1:5672/,5), conn: Proxy@ Shared Rabbit Connection: SimpleConnection@5 [delegate=amqp://guest@.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0

このエラーでは、ymlファイルに以下の設定が追加されているかどうかを確認できます:

Spring:
 rabbitmq:
 listener:
 simple:
 acknowledge-mode: manual
 concurrency: 1
 max-concurrency: 10

上記の設定を追加してもエラーが出る場合は、@Configurationを使ってSimpleRabbitListenerContainerFactoryを設定している可能性があり、SpringBootの特性上、configurationよりもcodeの方が優れており、ymlの設定よりもcodeの設定の方が優先されるため、manualモードを設定し忘れている可能性があります。マニュアルモード

@Bean
 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
 factory.setConnectionFactory(connectionFactory);
 //マニュアルアックモードを設定する
 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return factory;
 }

それでもエラーが発生する場合は、おそらく間違った場所、プロデューサーのプロジェクトに書かれています。上記の設定はコンシューマのプロジェクトで行う必要があります。なぜなら ack パターンはコンシューマのためのものだからです。プロデューサーの間違った場所に書いて、数時間放り投げて、ボロボロにしてました~。

パフォーマンスの問題

実際、手動ACKは自動ACKに比べて確かにかなり遅く、インターネットでいくつかの情報を調べましたが、性能差は約10倍でした。ですから、一般的に実際のアプリケーションでは、手動ACKモードを開くことは推奨されません。しかし、それは絶対に開いていないわけではなく、ケースバイケースの分析、同時実行の量に応じて、データの重要性など。

そのため、特定のソリューションを決定する前に、実際のプロジェクトにおける同時実行性とデータの重要性を考慮する必要があります

手動 ack モードを有効にしてください。

手動ACKモードがオンになっているにもかかわらず、コードがバグっているためにRabbitMQサーバへの返信がない場合、このメッセージはメッセージの山のUnacked状態に置かれ、コンシューマの接続が切断されたときに初めてReadyメッセージになります。コンシューマの接続が切断されない場合、Unackedメッセージはどんどん大きくなり、メモリをどんどん消費し、最終的には例外が発生します。

V. まとめ

以上のことから、RabbitMQがデータ損失を防ぐ方法は3つあると言えます:

  • メッセージの永続性
  • プロデューサー・メッセージ確認メカニズム
  • 消費者メッセージ確認モデル
Read next

multipart/form-dataを深く掘り下げる

基本的な文字列型であれば、www-form-で転送できますが、-dataの威力はバイナリファイルを転送できることなので、バイナリファイルを含む場合の扱い方を見てみましょう。ファイルタイプの入力を追加し、画像をアップロードします。

Aug 21, 2020 · 4 min read