blog

第13回|Flink Kafkaのソースを本当に知っているか?

FlinkはKafkaトピックにデータを読み書きするための特別なKafkaコネクタを提供し、Flink Kafka ConsumerはFlinkの機構と統合してジャストワンス処理のセマンティクスを提供...

Apr 3, 2020 · 28 min. read
シェア

FlinkはKafkaトピックにデータを読み書きする特別なKafkaコネクタを提供し、Flink Kafka ConsumerはFlinkのチェックポイント機構と統合して、ジャストワンス処理のセマンティクスを提供します。このため、FlinkはKafkaコンシューマーグループのオフセットの追跡だけに頼らず、内部的にオフセットを追跡してチェックします。

はじめに

Spark StreamingやFlink、その他のコンピューティングフレームワークをリアルタイムデータ処理に使用する場合、Kafkaをパブリッシュ&サブスクライブメッセージングシステムとして使用することが標準となっています。パラメータを設定し、kafkaソースを追加すれば準備完了です。本当に簡単だと感じているのであれば、もう母があなたの学習を心配する必要はないと感じています。この記事では、FlinkのKafkaソースを議論の対象として、まず初めの基本的な使い方から、ソースコードを一つずつ詳細に分析し、Flink Kafkaコネクタの謎を開くことができます。この記事は、読者がKafkaの関連知識を持っていることを前提としていることに留意する価値がある、Kafka関連の詳細は、この記事の範囲ではありません。

Flink Kafka Consumer

Flink Kafka Connectorには多くのバージョンがあり、kafkaとFlinkのバージョンに応じて適切なパッケージとクラス名を選択できます。本記事で扱うFlinkのバージョンは1.10、Kafkaのバージョンは2.3.4です。Flinkが提供するMaven依存関係は以下の表の通りです:

flink-connector-kafka-0.10_2.111.2.0FlinkKafkaConsumer010 FlinkKafkaProducer0100.10.xこのコネクタは、生産と消費のための サポートします。
flink-connector-kafka-0.11_2.111.4.0FlinkKafkaConsumer011 FlinkKafkaProducer110>= 0.11.xKafkaはバージョン0.11.xの時点でScala 2.10をサポートしていません。このコネクターは サポートし、プロデューサーにExactly Onceセマンティクスを提供します。
flink-connector-kafka_2.111.7.0FlinkKafkaConsumer FlinkKafkaProducer>= 1.0.0この汎用 Kafka コネクタは、Kafka クライアントの最新バージョンに対応するよう最善を尽くします。このコネクタが使用する Kafka クライアントのバージョンは、Flink のバージョン間で変更される可能性があります。Flinkバージョン1.9からはKafka 2.2.0クライアントを使用し、現在のKafkaクライアントはKafkaブローカー0.10.0以降と下位互換性があります。 ただし、Kafkaバージョン0.11.xおよび0.10.xについては、専用のflink-connector-kafka-を使用することを推奨します。0.11_2.11 および flink-connector-kafka-0.10_2.11 コネクタを使用することを推奨します。

Demo

Maven 依存関係の追加

<!--FlinkKafkaConsumerBaseは一般的なコネクタである。>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_2.11</artifactId>
 <version>1.10.0</version>
</dependency>

簡単なコード例

public class KafkaConnector {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
 // ミリ秒間隔でチェックポイントをオンにする
 senv.enableCheckpointing(5000L);
 // ステートバックエンドの選択
 senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
 //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
 Properties props = new Properties();
 // kafka broker 
 props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
 // kafka0のみ.8バージョンの設定が必要
 props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
 // コンシューマーグループ
 props.put("group.id", "test");
 // 自動オフセットコミット
 props.put("enable.auto.commit", true);
 // オフセットコミット間隔(ミリ秒
 props.put("auto.commit.interval.ms", 5000);
 // kafka メッセージのキーシリアライザー
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 // kafka メッセージの値シリアライザー
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 // カフカコンシューマーがどこからデータを消費し始めるかを指定する
 // 全部で3つのモードがある,
 // #earliest
 // 各パーティションの下にコミットされたオフセットがある場合、コミットされたオフセットから消費が開始される;
 // コミットオフセットがない場合にゼロからコンシュームする
 // #latest
 // 各パーティションの下にコミットされたオフセットがある場合、コミットされたオフセットから消費が開始される;
 // コミットオフセットがない場合は、そのパーティションの下に新しく生成されたデータを消費する。
 // #none
 // topic各パーティションにコミットされたオフセットがある場合,
 // オフセット後に消費が始まる
 // コミットされたオフセットを持つパーティションが存在しない場合、例外がスローされる。
 props.put("auto.offset.reset", "latest");
 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
 "qfbap_ods.code_city",
 new SimpleStringSchema(),
 props);
 //チェックポイントを設定し、オフセットをコミットする。
 // デフォルトはtrueである。,
 consumer.setCommitOffsetsOnCheckpoints(true);
 
 // 最も早くデータが消費され始める
 // このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されない。
 //consumer.setStartFromEarliest();
 // デフォルトでは、コンシューマーグループの最新コミットのオフセットとなる。
 // パーティショニングされたオフセットが見つからない場合は、コンフィギュレーション自動.offset.reset  
 //consumer.setStartFromGroupOffsets();
 // コンシューマーを開始する最新データ
 // このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されない。
 //consumer.setStartFromLatest();
 // オフセットのタイムスタンプをミリ秒単位で指定する
 // 各パーティションについて、タイムスタンプが指定されたタイムスタンプ以上のレコードが開始位置として使用される。
 // パーティションの最新レコードが指定されたタイムスタンプより前の場合は、最新レコードからパーティションデータのみを読み込む。
 // このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されない。
 //consumer.setStartFromTimestamp(1585047859000L);
 // パーティションごとにオフセットを指定する
 /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
 specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
 specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
 specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
 consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/
 /**
 *
 * 注意: ジョブが障害から自動的に回復するとき、または手動でセーブポイント,
 * これらの開始位置設定メソッドは、コンシューマーの開始位置には影響しない。
 * リカバリ時に、各Kafkaパーティションの開始位置は、セーブポイントまたはチェックポイントに格納されたオフセットによって決定される。
 *
 */
 DataStreamSource<String> source = senv.addSource(consumer);
 // TODO
 source.print();
 senv.execute("test kafka connector");
 }
}

パラメータ設定の説明

デモの例では、詳細なコンフィギュレーション情報が提供され、上記のパラメータ・コンフィギュレーションは、以下で1つずつ分析されます。

kakfaのプロパティパラメータ構成

  • bootstrap.serverskafkaブローカーのアドレス

  • zookeeper.connect: kafka 0.8 でのみ必要です。

  • group.idコンシューマーグループ

  • enable.auto.commit:

    自動オフセット・コミット、この値の設定は最終的なオフセット・コミット・モードではなく、ユーザーがチェックポイントを有効にしているかどうかを考慮する必要があります。

    以下のソースコード解析で解読されます。

  • auto.commit.interval.ms: オフセットコミットの間隔、ミリ秒

  • value.deserializer

  • auto.offset.reset:

    kafkaコンシューマがデータを消費し始める場所を指定するには、次の3つの方法があります。

    • 最初に:earliest 各パーティションの下にコミットされたオフセットがある場合、コミットされたオフセットから消費を開始し、コミットされたオフセットがない場合、最初から消費を開始します。
    • 2番目のタイプ:最新 各パーティションの下にコミットされたオフセットがある場合、コミットされたオフセットから消費が開始され、コミットされたオフセットがない場合、パーティションの下に新しく生成されたデータが消費されます。
    • Third: none topicコミットされたオフセットがすべてのパーティションに存在する場合、オフセットの後に消費が開始されます。コミットされたオフセットが1つのパーティションに存在しない限り、例外がスローされます。

    注:上記で指定された消費モードは最終的な消費モードではなく、Flinkアプリケーションでユーザーが設定した消費モードに依存します。

Flinkアプリケーションのユーザーが設定したパラメーター

  • consumer.setCommitOffsetsOnCheckpoints(true)

説明:オフセット送信後にチェックポイントを設定する、つまりoncheckpointモード、値はデフォルトでtrue、このパラメータは、オフセットの送信方法に影響を与えます。

  • consumer.setStartFromGroupOffsets()。

    説明: コンシューマーグループの最新コミットのオフセット、デフォルト。 パーティションのオフセットが見つからない場合は、親クラスFlinkKafkaConsumerBaseから継承したメソッドである、設定のauto.offset.reset設定が使用されます。

  • consumer.setStartFromLatest()

説明: 最新のデータの消費を開始します。 このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されません。このメソッドは、親クラス FlinkKafkaConsumerBase から継承しています。

  • consumer.setStartFromTimestamp(1585047859000L)

説明:特定のオフセット・タイムスタンプ(ミリ秒)を指定します。各パーティションについて、指定したタイムスタンプ以上のタイムスタンプを持つレコードが開始位置として使用されます。 パーティションの最新レコードが指定されたタイムスタンプより前の場合は、最新レコードからパーティション・データのみが読み込まれます。このモードでは、Kafka のコミットされたオフセットは無視され、開始位置として使用されません。

  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

説明:各パーティションのオフセットを指定します。このメソッドは、親クラスFlinkKafkaConsumerBaseから継承されたメソッドです。

注意:これらの開始位置の設定方法は、ジョブが障害から自動的に回復するときや、セーブポイントを使用して手動で回復するときの消費の開始位置には影響しません。回復時、各Kafkaパーティションの開始位置は、savepointまたはcheckpointに保存されたオフセットによって決定されます。

Flink Kafka Consumerソースコードの解釈

継承関係

ソースコードの解釈

FlinkKafkaConsumer

最初にFlinkKafkaConsumerのソースコードを見て、読書の側面にするために、この記事では、具体的には次のように、より完全なソースコードスニペットを与えるためにしようとします:長いコードは、ここではまず、一般的な印象を持つことができ、以下は、詳細に分析するための重要なコードスニペットになります。

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
	// flinkを使ってポーリングタイムアウトのタイムアウトを設定する.poll-timeoutパラメータはプロパティで設定する
	public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
	// 利用可能なデータがない場合は、ポーリングに要する時間を待つ。 0であれば、利用可能なすべてのレコードが即座に返される
	//デフォルトのポーリングタイムアウト
	public static final long DEFAULT_POLL_TIMEOUT = 100L;
	// ユーザー提供のkafkaパラメータ設定
	protected final Properties properties;
	// 利用可能なデータがない場合は、ポーリングに必要な時間待つ。 0 の場合、利用可能なすべてのレコードが即座に返される
	protected final long pollTimeout;
	/**
	 * カフカコンシューマーソースを作成する
	 * @param topic コンシューマのサブジェクト名
	 * @param valueDeserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
		this(Collections.singletonList(topic), valueDeserializer, props);
	}
	/**
	 * カフカコンシューマーソースを作成する
	 * コンストラクタは、KafkaDeserialisationSchemaを渡すことができる。KafkaDeserialisationSchemaは、カフカの消費に関する追加情報へのアクセスをサポートするデシリアライゼーションクラスである。
	 * 例:キー/値ペア、オフセット、トピック
	 * @param topic コンシューマのサブジェクト名
	 * @param deserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(Collections.singletonList(topic), deserializer, props);
	}
	/**
	 * カフカコンシューマーソースを作成する
	 * コンストラクタでは、複数のトピックを渡すことができ、複数のトピックを消費することができる。
	 * @param topics Consumerのトピック名、Listコレクションとしての複数のトピック
	 * @param deserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
		this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
	}
	/**
	 * カフカコンシューマーソースを作成する
	 * コンストラクタでは、複数のトピックを渡すことができ、複数のトピックを消費することができる。,
	 * @param topics Consumerのトピック名、Listコレクションとしての複数のトピック
	 * @param deserializer デシリアライズ型 , カフカバイトメッセージをFlinkオブジェクトに変換するために使用される。
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(topics, null, deserializer, props);
	}
	/**
	 * 正規表現に基づいて複数のトピックにサブスクライブする
	 * パーティションディスカバリーが有効な場合、つまり、FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS値は非負の数値である
	 * トピックは、定期的にマッチングできる限り、作成されるとすぐにサブスクライブされる。
	 * @param subscriptionPattern トピックの正規表現
	 * @param valueDeserializer デシリアライズ型 , カフカバイトメッセージをFlinkオブジェクトに変換するために使用される , 追加情報を取得するためのサポート
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
		this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
	}
	/**
	 * 正規表現に基づいて複数のトピックにサブスクライブする
	 * パーティションディスカバリーが有効な場合、つまり、FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS値は非負の数値である
	 * トピックは、定期的にマッチングできる限り、作成と同時に購読される。
	 * @param subscriptionPattern トピックの正規表現
	 * @param deserializer デシリアライズクラスは、カフカコンシューマーに関する追加情報(キー/値ペア、オフセット、トピックなど)へのアクセスをサポートしている。
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(null, subscriptionPattern, deserializer, props);
	}
	private FlinkKafkaConsumer(
		List<String> topics,
		Pattern subscriptionPattern,
		KafkaDeserializationSchema<T> deserializer,
		Properties props) {
		// 親クラスのコンストラクタを呼び出す,PropertiesUtil.getLongメソッドの最初のパラメータはProperties、2番目のパラメータはkey、3番目のパラメータはvalue default valueである。
		super(
			topics,
			subscriptionPattern,
			deserializer,
			getLong(
				checkNotNull(props, "props"),
				KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
			!getBoolean(props, KEY_DISABLE_METRICS, false));
		this.properties = props;
		setDeserializer(this.properties);
		// プロパティでKEYが設定されている場合は、ポーリングタイムアウトを設定する。_POLL_TIMEOUTパラメータを指定した場合、特定の設定値が返され、そうでない場合はデフォルト値 DEFAULT が返される。_POLL_TIMEOUT
		try {
			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
			} else {
				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
			}
		}
		catch (Exception e) {
			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
		}
	}
 // フェッチャーのインスタンスを返す親クラスのメソッドオーバーライド,
	// fetcherカフカ・ブローカーに接続し、データを取り出し、デシリアライズして、データ・ストリームとして出力する。
	@Override
	protected AbstractFetcher<T, ?> createFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		StreamingRuntimeContext runtimeContext,
		OffsetCommitMode offsetCommitMode,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
 // オフセットコミットモードがONであることを確認する_CHECKPOINTS(条件1:チェックポイントがオンになっている。.setCommitOffsetsOnCheckpoints(true))自動コミットを無効にする
		// このメソッドは、親クラス
		// これは、ユーザーがプロパティ
		// オフセットモードがONの場合_CHECKPOINTS,ユーザーが設定したプロパティ属性をオーバーライドする場合は、DISABLEDにする。
		// 具体的には、ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"false "の値は "false "にリセットされる。
 // チェックポイントがオンで、Consumerが設定されている場合、以下のように理解できる。.setCommitOffsetsOnCheckpoints(true),デフォルトはtrueである。,
		// そして、FlinkKafkaConsumerBaseのカフカ・プロパティを有効化する。.auto.commit強制的にfalseにする
		adjustAutoCommitConfig(properties, offsetCommitMode);
		return new KafkaFetcher<>(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			runtimeContext.getProcessingTimeService(),
			runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getTaskNameWithSubtasks(),
			deserializer,
			properties,
			pollTimeout,
			runtimeContext.getMetricGroup(),
			consumerMetricGroup,
			useMetrics);
	}
	//親クラスのメソッドのオーバーライド
	// パーティションディスカバリークラスを返すと、パーティションディスカバリーは、トピックとパーティションに関するメタデータを発見するために、カフカブローカの高度なコンシューマーAPIを使用することができる。
	@Override
	protected AbstractPartitionDiscoverer createPartitionDiscoverer(
		KafkaTopicsDescriptor topicsDescriptor,
		int indexOfThisSubtask,
		int numParallelSubtasks) {
		return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
	}
	/**
	 *kafkaパラメータで自動コミットが有効になっているかどうかを判断する。.auto.commit=true,
	 * そして、自動.commit.interval.ms>0,
	 * 注:有効化されていない.auto.commitデフォルトは
	 * 自動.commit.interval.msデフォルトは5000ミリ秒である。
	 * @return
	 */
	@Override
	protected boolean getIsAutoCommitEnabled() {
		//
		return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
			PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
	}
	/**
	 * カフカメッセージのキーと値のデシリアライズが設定されていることを確認する。,
	 * 設定されていない場合は、ByteArrayDeserializerシリアライザーが使用される。,
	 * このクラスのdeserializeメソッドは、何の処理もせずにデータを直接RETURNする。
	 * @param props
	 */
	private static void setDeserializer(Properties props) {
		final String deSerName = ByteArrayDeserializer.class.getName();
		Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
		Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
		if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
			LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
		}
		if (valDeSer != null && !valDeSer.equals(deSerName)) {
			LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
		}
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
	}
}

分析

  • コンストラクタの分析

FlinkKakfaConsumerは、上の図に示すように、7つのコンストラクタのメソッドを提供します。それぞれ異なる機能を持つ構成メソッドは、渡されたパラメータを介して、各構成メソッド固有の機能を大まかに分析することもできます、理解を容易にするために、本稿では、以下のように、グループごとに説明します:

シングルトピック

/**
	 * カフカコンシューマーソースを作成する
	 * @param topic コンシューマのサブジェクト名
	 * @param valueDeserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
		this(Collections.singletonList(topic), valueDeserializer, props);
	}
/**
	 * カフカ・コンシューマー・ソースを作成する
	 * コンストラクタは、KafkaDeserialisationSchemaを渡すことができる。KafkaDeserialisationSchemaは、カフカの消費に関する追加情報へのアクセスをサポートするデシリアライゼーションクラスである。
	 * 例:キー/値ペア、オフセット、トピック
	 * @param topic コンシューマのサブジェクト名
	 * @param deserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(Collections.singletonList(topic), deserializer, props);
	}

上記2つのコンストラクタは単一のトピックのみをサポートし、違いはデシリアライズが異なることです。最初のコンストラクタは DeserialisationSchema を使用し、2 番目のコンストラクタは KafkaDeserializationSchema を使用します。KafkaDeserializationSchema パラメータを持つコンストラクタを使用すると、より依存性の高い情報を取得できます。値のペア、オフセット、トピックなどを取得したい場合は、このメソッドを使用します。これらのメソッドはどちらもプライベートコンストラクタメソッドを呼び出します。

マルチテーマ

/**
	 * カフカコンシューマーソースを作成する
	 * コンストラクタでは、複数のトピックを渡すことができ、複数のトピックを消費することができる。
	 * @param topics Consumerのトピック名、Listコレクションとしての複数のトピック
	 * @param deserializer カフカのバイトメッセージをFlinkオブジェクトに変換するためのデシリアライズ型
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
		this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
	}
	/**
	 * カフカ・コンシューマー・ソースを作成する
	 * コンストラクタでは複数のトピックを渡すことができ、複数のトピックを消費することができる。,
	 * @param topics Consumerのトピック名、Listコレクションとしての複数のトピック
	 * @param deserializer デシリアライズ型 , カフカバイトメッセージをFlinkオブジェクトに変換するために使用される , 追加情報を取得するためのサポート
	 * @param props ユーザーから渡されるkafkaパラメータ
	 */
	public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(topics, null, deserializer, props);
	}

上記の2つのマルチトピック構築メソッドは、リストコレクションを使用して複数のトピックを受け取り、消費することができます。前者は DeserialisationSchema を使用し、後者は KafkaDeserializationSchema を使用します。KafkaDeserialisationSchema パラメータを指定したコンストラクタを使用すると、より詳細な情報を取得できます。値のペア、オフセット、トピックなどを取得したい場合は、このメソッドを使用します。これらのメソッドはどちらも private コンストラクタのメソッドを呼び出します。private コンストラクタのメソッドについては後述します。

定期戦トピック

/**
	 * 正規表現に基づいて複数のトピックにサブスクライブする
	 * パーティションディスカバリーがオンになっている場合、つまりFlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS値は非負の数値である
	 * トピックは、定期的にマッチングできる限り、作成されるとすぐにサブスクライブされる。
	 * @param subscriptionPattern トピックの正規表現
	 * @param valueDeserializer デシリアライズ型 , カフカバイトメッセージをFlinkオブジェクトに変換するために使用される , 追加情報を取得するためのサポート
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
		this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
	}
	/**
	 * 正規表現に基づいて複数のトピックにサブスクライブする
	 * パーティションディスカバリーが有効な場合、つまり、FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS値は非負の数値である
	 * トピックは、定期的にマッチングできる限り、作成されるとすぐにサブスクライブされる。
	 * @param subscriptionPattern トピックの正規表現
	 * @param deserializer デシリアライズクラスは、カフカコンシューマーに関する追加情報(キー/値ペア、オフセット、トピックなど)へのアクセスをサポートしている。
	 * @param props ユーザーから渡されたkafkaパラメータ
	 */
	public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
		this(null, subscriptionPattern, deserializer, props);
	}

実際の本番環境では、flinkジョブが異なるkafkaトピックに対応する多くの異なる種類のデータを集約する必要があるなど、いくつかのニーズがあるかもしれません。ビジネスの成長に伴い、新しいデータクラスが追加され、同時に新しいkafkaトピックが追加されます。FlinkKafkaConsumerを構築する際のプロパティで、flink.partition-discovery.interval-millisパラメータを非負の値に設定し、動的検出をオンにするスイッチと設定時間間隔を示します。この時点で、FLinkKafkaConsumer は別のスレッドを起動して定期的に kafka にアクセスして最新のメタ情報を取得します。呼び出し方については、次のプライベートコンストラクタを参照してください。

プライベートコンストラクタ

	private FlinkKafkaConsumer(
		List<String> topics,
		Pattern subscriptionPattern,
		KafkaDeserializationSchema<T> deserializer,
		Properties props) {
		// 親クラスのコンストラクタを呼び出す,PropertiesUtil.getLongメソッドの最初のパラメータはProperties、2番目のパラメータはkey、3番目のパラメータは値のdefault value.KEYである。_PARTITION_DISCOVERY_INTERVAL_MILLIS値は、パーティションディスカバリーを有効にするための設定パラメータであり、プロパティ内で configure flink.partition-discovery.interval-millis=5000(の数が0より大きい場合)、設定されていない場合はPARTITIONを使用する。_DISCOVERY_DISABLED=Long.MIN_VALUE(はパーティションディスカバリーが無効であることを示す)
		super(
			topics,
			subscriptionPattern,
			deserializer,
			getLong(
				checkNotNull(props, "props"),
				KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
			!getBoolean(props, KEY_DISABLE_METRICS, false));
		this.properties = props;
		setDeserializer(this.properties);
		// プロパティでKEYが設定されている場合は、ポーリングタイムアウトを設定する。_POLL_TIMEOUTパラメータが指定された場合、特定の設定値が返され、そうでない場合はデフォルト値 DEFAULT が返される。_POLL_TIMEOUT
		try {
			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
			} else {
				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
			}
		}
		catch (Exception e) {
			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
		}
	}
  • その他のメソッド

KafkaFetcher オブジェクトの作成

 // フェッチャーのインスタンスを返す親クラスのメソッドオーバーライド,
	// fetcherカフカ・ブローカーに接続し、データを取り出し、デシリアライズして、データストリームとして出力する。
	@Override
	protected AbstractFetcher<T, ?> createFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		StreamingRuntimeContext runtimeContext,
		OffsetCommitMode offsetCommitMode,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
 // オフセットコミットモードがONの場合_CHECKPOINTS(条件1:チェックポイントがオンになっている。.setCommitOffsetsOnCheckpoints(true))自動コミットを無効にする
		// このメソッドは、親クラスである
		// これは、ユーザーがプロパティ
		// オフセットモードがONの場合_CHECKPOINTS,ユーザーが設定したプロパティを上書きする場合は、DISABLED にする。
		// 具体的には、ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"false "の値は "false "にリセットされる。
 // チェックポイントがオンで、コンシューマーがセットされている場合、以下のように理解できる。.setCommitOffsetsOnCheckpoints(true),デフォルトはtrueである。,
		// そして、FlinkKafkaConsumerBaseのカフカ・プロパティを有効化する。.auto.commit強制的にfalseにする
		adjustAutoCommitConfig(properties, offsetCommitMode);
		return new KafkaFetcher<>(
			sourceContext,
			assignedPartitionsWithInitialOffsets,
			watermarksPeriodic,
			watermarksPunctuated,
			runtimeContext.getProcessingTimeService(),
			runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
			runtimeContext.getUserCodeClassLoader(),
			runtimeContext.getTaskNameWithSubtasks(),
			deserializer,
			properties,
			pollTimeout,
			runtimeContext.getMetricGroup(),
			consumerMetricGroup,
			useMetrics);
	}

このメソッドの機能は、フェッチャーのインスタンスを返すことです。フェッチャーの役割は、kafka ブローカーに接続してデータを取得し、デシリアライズしてストリームとして出力することです。チェックポイント、条件 2: consumer.setCommitOffsetsOnCheckpoints(true)) の場合、自動コミットが無効になります。チェックポイントが有効で、consumer.setCommitOffsetsOnCheckpoints(true)が設定されている場合(デフォルトではtrue)、kafkaプロパティの自動コミットが無効になります。enable.auto.commitは強制的にfalseになります。 オフセットコミットモードの詳細については、以下の「オフセットコミットモード分析」を参照してください。

自動投稿が設定されているかどうか

 @Override
	protected boolean getIsAutoCommitEnabled() {
		//
		return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
			PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
	}

つまり、enable.auto.commit=trueかつauto.commit.interval.ms>0です。注意: enable.auto.commitが設定されていないパラメータがない場合、デフォルトはtrueになります。commit.interval.msパラメータが設定されていない場合、デフォルトは5000ミリ秒です。このメソッドは、FlinkKafkaConsumerBase の open メソッドが初期化されたときに呼び出されます。

デシリアライズ

private static void setDeserializer(Properties props) {
 // デフォルトのデシリアライズ方法 
		final String deSerName = ByteArrayDeserializer.class.getName();
 //キーと値のデシリアライズパターンに関するユーザー設定プロパティを取得する
		Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
		Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 // 設定されている場合は、ユーザーが設定した値が使用される
		if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
			LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
		}
		if (valDeSer != null && !valDeSer.equals(deSerName)) {
			LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
		}
 // 設定なしで、デシリアライズはByteArrayDeserializerを使用して行われる。
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
	}

カフカメッセージのキーと値のデシリアライズメソッドが設定されていることを確認します。設定されていない場合は、ByteArrayDeserializerシリアライザーを使用します。ByteArrayDeserializerクラスのdeserializeメソッドは、何の処理もせずにデータを直接RETURNします。

FlinkKafkaConsumerBase

@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
		CheckpointListener,
		ResultTypeQueryable<T>,
		CheckpointedFunction {
	public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
	public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
	public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
	private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
	private boolean enableCommitOnCheckpoints = true;
	/**
	 * オフセットコミットモードにアクセスするには、FlinkKafkaConsumerBaseの#open(Configuration)設定を行う
	 * チェックポイントを有効にしているかどうかで値が変わる
	 */
	private OffsetCommitMode offsetCommitMode;
	/**
	 * どの位置からカフカメッセージの消費を開始するかを設定する,
	 * デフォルトはStartupMode#GROUP_OFFSETS,つまり、現在のコミットのオフセットから消費が始まる
	 */
	private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
	private Long startupOffsetsTimestamp;
	/**
	 * オフセットコミットモードがONの場合_CHECKPOINTS自動コミットを無効にする,
	 * これは、ユーザーがプロパティで設定したあらゆる設定をカバーする。
	 * オフセットモードがONの場合_CHECKPOINTS,ユーザー設定プロパティを上書きする場合は、DISABLED にする。
	 * 具体的には、ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"false "の値は、自動コミットを無効にするために "false "にリセットされる。
	 * @param properties kafka設定されたプロパティは、このメソッドでオーバーライドされる
	 * @param offsetCommitMode offsetコミットモード
	 */
	static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
			properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		}
	}
	/**
	 * チェックポイントをオンにした場合、チェックポイン後にオフセットをコミットするかどうかを決める,
	 * このパラメータは、ユーザーがチェックポイントを有効にしている場合にのみ機能する。
	 * チェックポイントが有効になっていない場合は、kafkaの設定パラメータenableを使用する。.auto.commit
	 * @param commitOnCheckpoints
	 * @return
	 */
	public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
		this.enableCommitOnCheckpoints = commitOnCheckpoints;
		return this;
	}
	/**
	 * 最も古いオフセットから消費する,
	 *このモードでは、Kafkaですでにコミットされたオフセットは無視され、開始位置として使用されない。
	 *これは、consumer1.setStartFromEarliest()セットアップを行う
	 */
	public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
		this.startupMode = StartupMode.EARLIEST;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
	/**
	 * 消費は最新のデータから始まる,
	 * このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されない。
	 *
	 */
	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
		this.startupMode = StartupMode.LATEST;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
	
	/**
	 *オフセットのタイムスタンプをミリ秒単位で指定する
	 *各パーティションについて、タイムスタンプが指定されたタイムスタンプ以上のレコードが開始位置として使用される。
	 * パーティションの最新レコードが指定されたタイムスタンプより前の場合は、最新レコードからパーティションデータのみを読み込む。
	 * このモードでは、Kafkaのコミットオフセットは無視され、開始位置として使用されない。
	 */
	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
		checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");
		long currentTimestamp = System.currentTimeMillis();
		checkArgument(startupOffsetsTimestamp <= currentTimestamp,
			"Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);
		this.startupMode = StartupMode.TIMESTAMP;
		this.startupOffsetsTimestamp = startupOffsetsTimestamp;
		this.specificStartupOffsets = null;
		return this;
	}
	/**
	 *
	 * 特定のコンシューマーグループの直近にコミットされたオフセットからの消費がデフォルトモードである。
	 * パーティショニングされたオフセットが見つからない場合は、auto.offset.resetパラメータ設定
	 * @return
	 */
	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
		this.startupMode = StartupMode.GROUP_OFFSETS;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
	/**
	 *消費するパーティションごとにオフセットを指定する
	 */
	public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
		this.startupMode = StartupMode.SPECIFIC_OFFSETS;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
		return this;
	}
	@Override
	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode
		// オフセットを決定するコミットモード,
		// 最初のパラメータは、自動コミットが有効かどうかである。,
		// 2つ目のパラメータは、CommitOnCheckpointモードが有効かどうかである。
		// 3つ目のパラメータは、チェックポイントが有効かどうかである。
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
 
	 // 省略したコード
	}
// 省略されたコード
	/**
	 * カフカ・ブローカーに接続し、データを取り込んでデシリアライズし、データをデータ・ストリームとして出力するフェッチャーを作る
	 * @param sourceContext データ出力のコンテキスト
	 * @param subscribedPartitionsToStartOffsets 現在のSUB TASKが処理するTOPICのパーティションのセット、すなわちTOPICのPARTITIONSとOFFSETSのMAPのセットである。
	 * @param watermarksPeriodic オプションで、定期的な透かしを生成するシリアル化されたタイムスタンプ抽出器を提供する。
	 * @param watermarksPunctuated オプションで、句読点型の透かしを生成するシリアル化されたタイムスタンプ抽出器も用意されている。
	 * @param runtimeContext taskのランタイムコンテキストコンテキストは
	 * @param offsetCommitMode offsetコミットモードは3つある。,ON_CHECKPOINTS(オフセットコミットは、チェックポイントが完了したときのみカフカにコミットする)
	 * KAFKA_PERIODIC(kafka autocommit関数を使用した定期的な自動オフセットコミット)
	 * @param kafkaMetricGroup Flinkのメトリックは
	 * @param useMetrics メトリックを使用するかどうか
	 * @return フェッチャーのインスタンスを返す
	 * @throws Exception
	 */
	protected abstract AbstractFetcher<T, ?> createFetcher(
			SourceContext<T> sourceContext,
			Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
			StreamingRuntimeContext runtimeContext,
			OffsetCommitMode offsetCommitMode,
			MetricGroup kafkaMetricGroup,
			boolean useMetrics) throws Exception;
	protected abstract boolean getIsAutoCommitEnabled();
	// 省略したコード
}

オフセット提出モード解析

オフセットコミットの動作の設定は、ジョブでチェックポイントが有効になっているかどうかに応じて同じです。コミットモードの具体的な結論はまずここで示し、2つのアプローチについては以下で詳しく分析します。基本的な結論は

  • チェックポイントのオン

コミットモードのソースコード解析

  • offsetコミットモードは
public enum OffsetCommitMode {
	// 自動オフセットコミットを無効にする
	DISABLED,
	// チェックポイントが完了したときだけオフセットをkafkaにコミットする
	ON_CHECKPOINTS,
	// kafka autocommit関数を使った定期的な自動オフセットコミット
	KAFKA_PERIODIC;
}
  • コミットモードの呼び出し
public class OffsetCommitModes {
	public static OffsetCommitMode fromConfiguration(
			boolean enableAutoCommit,
			boolean enableCommitOnCheckpoint,
			boolean enableCheckpointing) {
		// チェックインポイントが有効になっている場合は、以下の判定を行う。
		if (enableCheckpointing) {
			// チェックポイントがオンになっている場合は、チェックポインが有効になっているときにコミットするかどうかを判断する)。_CHECKPOINTS 
			// それ以外の場合はDISABLEDモードが使用される
			return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
		} else {
			// Kafkaプロパティパラメータが "enable "に設定されている場合.auto.commit" = "true",次に、KAFKAを使用する。_PERIODICモードコミットオフセット
			// それ以外の場合はDISABLEDモードが使用される
			return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
		}
	}
}

まとめ

この記事では、Flink Kafka Consumer を紹介し、最初に FlinkKafkaConsumer の異なるバージョンを比較し、次に完全なデモケースを提供し、ケースの設定パラメータを詳細に説明します。FlinkKafkaConsumerとその親クラスFlinkKafkaConsumerBaseのソースコードを解釈し、最後にFlink Kafka Consumerのオフセットコミットモードをソースコードレベルで分析し、それぞれのコミットモードを整理します。

Read next

データ構造とアルゴリズム - 貪欲アルゴリズム

局所最適:最小の消費で子供を満足させます。小さいビスケット」は「食欲が最も小さい」子供に最初に与えられます。アルゴリズムの手順:前提条件:神の視点、将来の価格を知っていること。局所最適:取れるものは取る、取れるものは残す、長期的な計画は立てない。アルゴリズムのステップ

Apr 2, 2020 · 2 min read