blog

KafkaのJavaクライアントの基本的な使用とSpringBootの統合

コードの統合後はまだ非常に簡単ですが、まだネイティブAPIに精通する必要があるので、簡単に実際のプロジェクトで問題に遭遇することができるように。...

Aug 6, 2020 · 5 min. read
シェア

この記事のコードはすべてGithubにあります:

kafka-clients

依存関係の追加

 <!-- https://.com/artifact/..kafka/kafka-clients -->
<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>2.5.0</version>
</dependency>

消費者

コードは一般的に3つの部分に分けられます:

  1. コンシューマ・コンフィギュレーション
    1. org.apache.kafka.clients.consumer.ConsumerConfig コンシューマ・コンフィギュレーションは、各コンフィギュレーション項目のドキュメントを含むクラスにリストされています。
  2. コンシューマー・インスタンスを作成し、トピックを購読します。
  3. メッセージの消費

コードは以下の通り:

// 1.  
Properties properties = new Properties();
//bootstrap.servers kafkaクラスタアドレスhost1:port1,host2:port2 ....
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ".1:9092");
// key.deserializer メッセージキーのシリアライズ
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer メッセージボディのシリアライズ
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// group.id 消費者グループID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
// enable.auto.commit 自動コミットオフセットを設定する
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. コンシューマインスタンスを作成し、トピックを購読する
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String[] topics = new String[]{"demo-topic"};
consumer.subscribe(Arrays.asList(topics));
// 3. メッセージの消費
while (true) {
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
 System.out.println(record);
 }
}

プロデューサー

プロデューサー・ピースのコードは、基本的にコンシューマーの構造と同じです:

  1. プロデューサーの構成
    1. org.apache.kafka.clients.producer.ProducerConfig すべてクラス
  2. プロデューサーインスタンスの作成
  3. トピックへのメッセージ送信
    1. 非同期送信メッセージproducer.send(new ProducerRecord<>("demo-topic", data))
    2. メッセージを同期的に送信し、Future.get() を使用して受信をブロックします。
    3. コールバックを使用した非同期でのメッセージ送信

全体的なコードは次のとおりです。

// 1.  
Properties properties = new Properties();
// bootstrap.servers kafkaクラスタアドレスhost1:port1,host2:port2 ....
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ".1:9092");
// key.deserializer メッセージキーのシリアライズ
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.deserializer メッセージボディのシリアライズ
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. プロデューサーのインスタンスを作成する
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. メッセージを送信する
// 0 非同期メッセージを送信する
for (int i = 0; i < 10; i++) {
 String data = "async :" + i;
 // メッセージを送信する
 producer.send(new ProducerRecord<>("demo-topic", data));
}
// 1 同期メッセージ送信コールget()ブロックの結果を返す
for (int i = 0; i < 10; i++) {
 String data = "sync : " + i;
 try {
 // メッセージを送信する
 Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));
 RecordMetadata recordMetadata = send.get();
 System.out.println(recordMetadata);
 } catch (Exception e) {
 e.printStackTrace();
 }
}
// 2 非同期メッセージのコールバック()を送信する
for (int i = 0; i < 10; i++) {
 String data = "callback : " + i;
 // メッセージを送信する
 producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
 @Override
 public void onCompletion(RecordMetadata metadata, Exception exception) {
 // メッセージを送信するためのコールバック
 if (exception != null) {
 exception.printStackTrace();
 } else {
 System.out.println(metadata);
 }
 }
 });
}
producer.close();

SpringBootの統合

依存関係の追加

<parent>
 <!-- https://.com/artifact/..boot/spring-boot-starter-parent -->
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.3.2.RELEASE</version>
 <relativePath/> <!-- lookup parent from repository -->
</parent>
....
....
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka starter-->
<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
</dependency>
<!--とのテストに便利 - 。>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
</dependency>
<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka-test</artifactId>
 <scope>test</scope>
</dependency>

コード

# application.yml
spring:
 kafka:
 bootstrap-servers: .1:9092
 producer:
 key-serializer: org.apache.kafka.common.serialization.StringSerializer
 value-serializer: org.apache.kafka.common.serialization.StringSerializer
 consumer:
 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 group-id: test-group
//  
@SpringBootApplication
public class DemoApplication {
 public static void main(String[] args) {
 SpringApplication.run(DemoApplication.class, args);
 }
}
//  
@Component
public class Consumer {
	@KafkaListener(topics = { "test-topic" })
	public void receiveMessage(String message) {
		System.out.println(message);
	}
}
//  
@Component
public class Producer {
	@Resource
	KafkaTemplate<String, String> kafkaTemplate;
	public void sendMessage(String topic, String message) {
		kafkaTemplate.send(topic, message);
	}
}
//  
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
	@Autowired
	private Producer producer;
	@Test
	public void send() {
		producer.sendMessage("test-topic", "test-message");
		try {
			Thread.sleep(10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

要約

SpringBootのコードの統合はまだ非常に簡単ですが、まだネイティブAPIに精通する必要があるため、実際のプロジェクトが簡単に問題が発生した後。

Read next

Apache Kylin 4.0 完全重複排除グローバル辞書の原則

OLAP データ分析の分野では、重複排除カウントは非常に一般的な要件であり、重複排除結果の要件に 応じて、近似重複排除と厳密重複排除に分けられます。 大規模なデータセットでは、正確な重複排除を達成し、高速なクエリ応答を保証することは依然として困難です。厳密な重複排除の処理方法としては、ビットマップ法がよく用いられます。整数データに対して ...

Aug 6, 2020 · 4 min read