マインドマップ
I. データ損失の原因の分析
RabbitMQのメッセージロスを分析するには、プロデューサからコンシューマにメッセージを送信して消費するプロセスを見るのが便利です:
プロデューサからRabbitMQサーバへ、RabbitMQサーバからコンシューマへです。
消費者によって消費されるまでキューに保存されます。
つまり、メッセージ・ロスが発生するシナリオは3つあるということです:
- キューに格納されたメッセージは、キューがメッセージを永続化しない場合、RabbitMQサーバーのダウンタイムや再起動によってデータが失われます。
- プロデューサーがRabbitMQサーバーにメッセージを送信する過程で、RabbitMQサーバーがダウンしてサービスを停止すると、メッセージは失われます。
- コンシューマはキューに格納されたデータをRabbitMQサーバから取得して消費しますが、コンシューマプロセスのエラーやダウンタイムによってデータが失われ、正しく消費されません。
上記の3つのシナリオに対して、RabbitMQはそれぞれメッセージの永続性、確認メカニズム、ACKトランザクションメカニズムという3つの解決方法を提供しています。
メッセージの永続性
RabbitMQはメッセージの永続化をサポートしています。 メッセージの永続化には、Exchange for persistentとQueue persistentの設定が必要です。
まず、Exchange スイッチのクラス図を見てください:
このクラス図を見ると、実は前回の記事で紹介した4つのスイッチは、すべてAbstractExchange抽象クラスのサブクラスであることを説明するためのもので、javaの特性上、サブクラスのインスタンスの生成は、まず親クラスのコンストラクタを呼び出すことになり、親クラス、つまりAbstractExchangeのコンストラクタは、どのように見えるでしょうか?
上のコメントからわかるように、durableパラメータは永続的かどうかを示します。デフォルトは persistent です。永続的な Exchange を作成するには、以下のように記述します:
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Directスイッチ
return new DirectExchange(RabbitMQConfig).RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
Queueキューに移り、まずQueueコンストラクタがどのようなものか見てみましょう:
永続化するかどうかも durable パラメータで設定します。このパラメータのデフォルトは true です:
@Bean
public Queue fanoutExchangeQueueA() {
//名前を指定するだけで、デフォルトで永続化される
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
}
これでメッセージの永続化の設定は完了です。次にプロジェクトを起動して、ご覧のようにいくつかのメッセージを送信してみましょう:
メ
ッセージの永続化は、ダウンタイムや再起動によってRabbitMQ Server内のメッセージが失われることを防ぎます。
III.メッセージ確認メカニズム
confirm
プロデューサがRabbitMQ Serverに送信する際、ネットワークの問題により配信に失敗し、データが失われる可能性があります。確認モードを使用することで、データの損失を防ぐことができます。上図から、2つのコールバック関数 **confirm(), returnedMessage()** を使って通知していることがわかります。
プロデューサーから RabbitMQ に送信されたメッセージは、まず Exchange に送信され、コールバック関数confirm()に対応します。第二段階はExchangeからQueueにルーティングされ、対応するコールバック関数はreturnMessage()です。
コードはどのように実装されているのか、デモをご覧ください:
まず、application.yml設定ファイルに以下の設定を追加します:
Spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirmstrue を設定する。メッセージが Exchange に配信されると、confirm() メソッドがコールバックされてプロデューサに通知される。
# publisher-returnstrueに設定されているとき。メッセージがキューにマッチして失敗した場合、returnMessage()メソッドを呼び出すことでそのメッセージが返される。
# Spring.rabbitmq.template.mandatory: trueに設定すると指定されたメッセージは、それがキューによって受信されなかった場合、コールバックのreturnMessage()メソッドによって返される。
細かいことですが次にコールバック・メソッドを定義する必要があります:
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* Exchangeに届くメッセージを待つ
*
* @param correlationData メッセージの一意な識別子を含むオブジェクト。
* @param ack true マーカーアック、偽マーカー nack
* @param cause nack 不履行の理由
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("メッセージの配信に成功~メッセージID:{}", correlationData.getId());
} else {
logger.error("メッセージの配信に失敗した:{}エラーメッセージが表示された:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("返されたメッセージを取得するために、メッセージがキューにルーティングされることはない。");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
ここでは、コールバックメソッドによって返されたメッセージを単純に表示することにします。 実際のプロジェクトでは、返されたメッセージをログテーブルに保存し、さらに処理を行うために定時タスクを使用することができます。
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
// ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
次のテストは、コンソールにメッセージを送信することです:次のテストでは、メッセージを送信すると、コンソールすることができます:メッセージを送信すると、キューに一致するようにルーティングされていないと仮定すると、次のメッセージを見ることができます:これは確認モードです。その役割は、RabbitMQにメッセージを配信するプロデューサーが失われないようにすることです。
トランザクション・メカニズム
非常に最初の図は、すでに話している、キューからメッセージを取得する消費者は、直接、消費者がダウンしているか、プログラムが例外があると仮定して、署名が確認され、データが正しく消費されない、このような状況は、データの損失になります。
そこで重要なのは、自動署名を手動署名に変更することです。通常の消費は、署名を確認するために戻り、例外が発生した場合は、キューに戻って署名を拒否するために戻ります。コードを達成する方法は、デモを参照してください:
まず、コンシューマのapplication.ymlファイルで、トランザクションコミットを手動手動モードに設定します:
Spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # マニュアルアックモード
concurrency: 1 # 最低消費者数
max-concurrency: 10 # 最大消費者数
それから、消費者のリスナーを書いてください:
@Component
public class RabbitDemoConsumer {
enum Action {
//加工に成功
SUCCESS,
//再試行可能なエラー、キューへのメッセージの再参加
RETRY,
//再試行を必要としないエラーは、メッセージを拒否し、キューから削除する。
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("コンシューマーであるRabbitDemoConsumerは、RabbitMQサーバーからメッセージを消費する:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("テスト:リエントラントキュー例外を投げる");
}
if ("error".equals(msg)) {
throw new Exception("テスト:キューに入り直す必要のない例外を投げる");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//例外のタイプに応じて、アクションがリトライ可能かどうかを設定する。
action = Action.RETRY;
} catch (Exception e2) {
//プリント異常
e2.printStackTrace();
//例外のタイプに応じて、アクションがリトライ可能かどうかを設定する。
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple バッチ処理を行うかどうかを示す。true はバッチ ack でタグより小さいメッセージをすべて処理することを意味する。
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nackメッセージはキューに戻される。
channel.basicNack(tag, false, true);
} else {
//Nackポリシーは拒否され、キューから削除される。
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
上記のコードを説明し、例外がなければ、手動でRabbitMQサーバーのbasicAck(消費成功)への返信を確認します。
リエントラント例外がスローされた場合は、basicNackに応答してリエントラント・キューを設定します。
返却不可能なキュー例外がスローされた場合は、basicNackに返信し、RabbitMQキューから削除するように設定します。
ackが返す3つのメソッドの意味を説明してください。
確認成功
void basicAck(long deliveryTag, boolean multiple) throws IOException;
コンシューマは、正常に処理された後にこのメソッドを呼び出してメッセージを確認します。
- deliveryTag: メッセージのインデックス。
- multiple: バッチするかどうか。true: deliveryTag よりも小さなメッセージを一度に受け取ります。
故障確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag: メッセージのインデックス。
- multiple: バッチかどうか。true: deliveryTag より小さいメッセージを一度に拒否します。
- requeue: リジェクトされたものを再びキューに入れるかどうか。
故障確認
void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:メッセージのインデックス。
- requeue: リジェクトされたものを再びキューに入れるかどうか。
basicNack()とbasicReject()の違いは、basicNack()はまとめて拒否でき、basicReject()は一度に1つのメッセージしか拒否できないことです。
IV.遭遇した甌穴
ナック機構を有効にすることで発生するデッドループ
上のコードで、テストが "bad" メッセージを送信し、再参加キュー例外を投げるというバグを意図的に書きました。問題があります。キューが再スタートした後にコンシューマが再びコンシューマを行い、コンシューマが例外を投げて再びキューを再スタートさせるため、デッドループが発生します。では、この状況を回避するにはどうすればよいのでしょうか?
nackはデッド・ループを引き起こす可能性があるため、私が提供したアイデアの1つは、basicNack()を使用せず、例外をスローしたメッセージを、スローされた例外、メッセージ・ボディ、メッセージIDを記録するテーブルに入れることです。 これは時間指定タスクで処理されます。
何か良い解決策があれば、メッセージでご相談ください。
double ack
うっかりしてAuto Ackモードをオンにしてしまい、手動でAckモードに戻すと、このエラーが報告されることがあります:
RabbitDemoConsumerコンシューマーはRabbitMQサーバーからメッセージを消費する。
.148 ERROR 4880 --- [ .1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=-:2 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@.1:5672/,5), conn: Proxy@ Shared Rabbit Connection: SimpleConnection@5 [delegate=amqp://guest@.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
このエラーでは、ymlファイルに以下の設定が追加されているかどうかを確認できます:
Spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
上記の設定を追加してもエラーが出る場合は、@Configurationを使ってSimpleRabbitListenerContainerFactoryを設定している可能性があり、SpringBootの特性上、configurationよりもcodeの方が優れており、ymlの設定よりもcodeの設定の方が優先されるため、manualモードを設定し忘れている可能性があります。マニュアルモード
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//マニュアルアックモードを設定する
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
それでもエラーが発生する場合は、おそらく間違った場所、プロデューサーのプロジェクトに書かれています。上記の設定はコンシューマのプロジェクトで行う必要があります。なぜなら ack パターンはコンシューマのためのものだからです。プロデューサーの間違った場所に書いて、数時間放り投げて、ボロボロにしてました~。
パフォーマンスの問題
実際、手動ACKは自動ACKに比べて確かにかなり遅く、インターネットでいくつかの情報を調べましたが、性能差は約10倍でした。ですから、一般的に実際のアプリケーションでは、手動ACKモードを開くことは推奨されません。しかし、それは絶対に開いていないわけではなく、ケースバイケースの分析、同時実行の量に応じて、データの重要性など。
そのため、特定のソリューションを決定する前に、実際のプロジェクトにおける同時実行性とデータの重要性を考慮する必要があります。
手動 ack モードを有効にしてください。
手動ACKモードがオンになっているにもかかわらず、コードがバグっているためにRabbitMQサーバへの返信がない場合、このメッセージはメッセージの山のUnacked状態に置かれ、コンシューマの接続が切断されたときに初めてReadyメッセージになります。コンシューマの接続が切断されない場合、Unackedメッセージはどんどん大きくなり、メモリをどんどん消費し、最終的には例外が発生します。
V. まとめ
以上のことから、RabbitMQがデータ損失を防ぐ方法は3つあると言えます:
- メッセージの永続性
- プロデューサー・メッセージ確認メカニズム
- 消費者メッセージ確認モデル





