この記事のコードはすべてGithubにあります:
kafka-clients
依存関係の追加
<!-- https://.com/artifact/..kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
消費者
コードは一般的に3つの部分に分けられます:
- コンシューマ・コンフィギュレーション
org.apache.kafka.clients.consumer.ConsumerConfigコンシューマ・コンフィギュレーションは、各コンフィギュレーション項目のドキュメントを含むクラスにリストされています。
- コンシューマー・インスタンスを作成し、トピックを購読します。
- メッセージの消費
コードは以下の通り:
// 1.
Properties properties = new Properties();
//bootstrap.servers kafkaクラスタアドレスhost1:port1,host2:port2 ....
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ".1:9092");
// key.deserializer メッセージキーのシリアライズ
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer メッセージボディのシリアライズ
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// group.id 消費者グループID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
// enable.auto.commit 自動コミットオフセットを設定する
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. コンシューマインスタンスを作成し、トピックを購読する
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String[] topics = new String[]{"demo-topic"};
consumer.subscribe(Arrays.asList(topics));
// 3. メッセージの消費
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
プロデューサー
プロデューサー・ピースのコードは、基本的にコンシューマーの構造と同じです:
- プロデューサーの構成
org.apache.kafka.clients.producer.ProducerConfigすべてクラス
- プロデューサーインスタンスの作成
- トピックへのメッセージ送信
- 非同期送信メッセージ
producer.send(new ProducerRecord<>("demo-topic", data)) - メッセージを同期的に送信し、Future.get() を使用して受信をブロックします。
- コールバックを使用した非同期でのメッセージ送信
- 非同期送信メッセージ
全体的なコードは次のとおりです。
// 1.
Properties properties = new Properties();
// bootstrap.servers kafkaクラスタアドレスhost1:port1,host2:port2 ....
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ".1:9092");
// key.deserializer メッセージキーのシリアライズ
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.deserializer メッセージボディのシリアライズ
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. プロデューサーのインスタンスを作成する
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. メッセージを送信する
// 0 非同期メッセージを送信する
for (int i = 0; i < 10; i++) {
String data = "async :" + i;
// メッセージを送信する
producer.send(new ProducerRecord<>("demo-topic", data));
}
// 1 同期メッセージ送信コールget()ブロックの結果を返す
for (int i = 0; i < 10; i++) {
String data = "sync : " + i;
try {
// メッセージを送信する
Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
} catch (Exception e) {
e.printStackTrace();
}
}
// 2 非同期メッセージのコールバック()を送信する
for (int i = 0; i < 10; i++) {
String data = "callback : " + i;
// メッセージを送信する
producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// メッセージを送信するためのコールバック
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println(metadata);
}
}
});
}
producer.close();
SpringBootの統合
依存関係の追加
<parent>
<!-- https://.com/artifact/..boot/spring-boot-starter-parent -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
....
....
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka starter-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--とのテストに便利 - 。>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
コード
# application.yml
spring:
kafka:
bootstrap-servers: .1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-group
//
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
//
@Component
public class Consumer {
@KafkaListener(topics = { "test-topic" })
public void receiveMessage(String message) {
System.out.println(message);
}
}
//
@Component
public class Producer {
@Resource
KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
//
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
private Producer producer;
@Test
public void send() {
producer.sendMessage("test-topic", "test-message");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
要約
SpringBootのコードの統合はまだ非常に簡単ですが、まだネイティブAPIに精通する必要があるため、実際のプロジェクトが簡単に問題が発生した後。




