blog

Spring Cloud Streamメッセージドライバコンポーネント

本体MQメッセージングミドルウェアの詳細の違いは、特定のデータベースをブロックアウトHibernateのようなものです。これにより、MQの学習、開発、保守が簡単になります。Spring Cloud S...

Mar 14, 2020 · 4 min. read
シェア

Stream解決した問題

Spring Cloud Streamは、Hibernateが特定のデータベースからシールドするように、特定のメッセージングミドルウェアから切り離し、基盤となる特定のMQメッセージングミドルウェアの詳細からシールドする、優れた上位層の抽象化を実行します。このようにして、MQの学習、開発、保守が容易になります。Spring Cloud Streamは現在RabbitMQとKafkaをサポートしています。

エッセンス:基盤となるさまざまなMQメッセージング・ミドルウェア間の違いを保護し、MQプログラミング・モデルを統一し、MQの学習、開発、保守のコストを削減します。

Stream重要な概念

Spring Cloud Streamはメッセージ駆動型のマイクロサービスを構築するためのフレームワークです。アプリケーションは入力や出力を介してSpring Cloud Streamのバインダーオブジェクトとやり取りし、バインダーオブジェクトは特定のメッセージングミドルウェアから基礎となるMQの詳細を保護するために使われます。

バインダー バインダー

バインダバインダはSpring Cloud Streamでは非常に注意深い概念です。基礎となるMQメッセージングミドルウェアの詳細を遮蔽するので、別のメッセージングミドルウェアに切り替えるときに必要なのは、使用すべきロジックを一切変更せずに対応するバインダに切り替えることだけです。

III.従来のMQモデルとストリーム・メッセージ・ドリブン・モデル

ストリームメッセージ通信方式とプログラミングモデル

Streamのメッセージ通信方法は、Publish-Subscribeパターンに従います。

Spring Cloud Streamのメッセージ通信方式はPublish-Subscribeパターンに従っており、メッセージがメッセージミドルウェアに配信されると共有トピックを通じてブロードキャストされ、メッセージコンシューマーはサブスクライブされたトピックでそれを受け取り、独自のビジネスロジック処理をトリガーします。ここで言うトピックとは、Spring Cloud Streamの抽象概念で、共有メッセージがコンシューマーに公開される場所を表すのに使われます。Topicは、RabbitMQのExchangeやKakfaのKafkaのTopicのように、異なるメッセージングミドルウェアの異なる概念に対応することがあります。

ストリーム・メッセージ駆動開発、クリエイター・サイド

  • pom.xmlでの依存関係の追加
<!--eureka client クライアント側の依存入力>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!--spring cloud stream 依存関係>
<dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  • 実装クラス
package com.lagou.edu.service.impl;
import com.lagou.edu.service.IMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
// Source.class以下は出力チャネルの定義である。
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProducer {
 // MessageChannelラッパーオブジェクトを挿入する。
 @Autowired
 private Source source;
 @Override
 public void sendMessage(String content) {
 // mqにメッセージを送信する
 // チャネルを使った送信メッセージ
 source.output().send(MessageBuilder.withPayload(content).build());
 }
}
  • test
public class MessageProducerTest {
@Autowired
private IMessageProducer iMessageProducer;
 @Test
 public void testSendMessage() {
 iMessageProducer.sendMessage("hello world-lagou101");
 }
}

ストリーム・メッセージ主導の消費者開発

  • application.yml
  • メッセージコンシューマリスニング
@EnableBinding(Sink.class)
public class MessageConsumerService {
 @StreamListener(Sink.INPUT)
 public void recevieMessages(Message<String> message) {
 System.out.println("=========受信したメッセージ: " + message);
 }
}

V. ストリームのハイレベル・カスタム・メッセージ・チャンネル

StreamにはSourceとSinkという2つのビルトインインターフェースがあり、それぞれ入力ストリームには「input」、出力ストリームには「output」というバインディングを定義します。 また、様々な入出力ストリームを定義することが可能ですが、実際にはサービス内で複数のバインダー、複数の入力チャンネル、複数の出力チャンネルを使用することが可能です。様々な入出力ストリームを定義することも可能ですが、実際には、サービス内で複数のバインダ、複数の入力チャネル、複数の出力チャネルを使用することが可能です。では、デフォルトで入力チャネルと出力チャネルがある場合はどうすればよいのでしょうか?どうすればいいかというと、Source と Sink の例にならって、独自の名前をつけてメッセージチャネルをカスタマイズすることができますし、複数の入力チャネルと出力チャネルをひとつのクラスに記述することもできます。

  • インターフェイスの定義
interface CustomChannel {
 String INPUT_LOG = "inputLog";
 String OUTPUT_LOG = "outputLog";
 @Input(INPUT_LOG)
 SubscribableChannel inputLog();
 @Output(OUTPUT_LOG)
 MessageChannel outputLog();
}
  • 使用方法
    • EnableBindingアノテーションでカスタム接続をバインドします。

Stream高レベルのメッセージグループ化

アプリケーション・シナリオ: 2 人のコンシューマがいますが、このビジネス・シナリオでは、このトピックのメッセー ジを 1 人のコンシューマのみに消費させたいため、メッセージ・グループ化を使用できます。

問題解決:繰り返しメッセージが消費される問題を解決できます。

サービスのコンシューマ側でspring.cloud.stream.bindings.input.groupプロパティを設定し、同じグループ名で複数のコンシューマインスタンスを構成するだけです。

Read next

WSL+OCaml開発環境のWindowsインストール

Windows上で様々なOCaml開発環境を試した後、私は最終的にWSLソリューションを選びました。 ここで、今後の参考のために簡単にメモしておきます。 実は、最初はCentOSをインストールするつもりだったのですが、インストールしてみると、いろいろと問題があることがわかりました。システム全体が非常に固まりやすく、レスポンスも失われやすく、基本的に普通に使えない状況に属しています。 そこで、生き馬の目を抜く思いで死蔵していたのですが...。

Mar 14, 2020 · 4 min read