blog

Flink DataStream APIの多機能な部分であるProcess Function

FlinkのTimeとDetailedという記事では、FlinkのTimeとWaterlineの内容が説明されています。タイムスタンプや水位線にアクセスするにはどうすればいいのか、と思わず質問してしま...

Jan 24, 2021 · 8 min. read
シェア

、Flinkの時刻と透かしについて説明しています。どうやってタイムスタンプや透かしにアクセスするのか疑問に思うかもしれません。まず、通常のDataStream APIではアクセスできないので、Flinkが提供するAPIの一番下、Process Functionを使う必要があります。 Process Functionはタイムスタンプや透かしにアクセスできるだけでなく、将来のタイマーを登録することができます。タイマーを登録することができます。また、サイド出力により、複数の出力ストリームにデータを送信することができます。このようにして、データストリーミング機能を実装することができ、また、遅延データに対処する方法でもあります。以下では、具体的なユースケースを示しながら、Process Functionの使い方をソースコードから説明します。

はじめに

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction
  • BroadcastProcessFunction
  • onTimer(long timestamp, OnTimerContext ctx, Collector out)

継承関係図を以下に示します:

上記の継承関係からわかるように、すべてRichFunctionインタフェースを実装しているため、open()、close()、getRuntimeContext()やその他のメソッド呼び出しの使用をサポートしています。名前からわかるように、これらの関数は異なるアプリケーションシナリオを持っていますが、基本的な機能は似ています。

ソースコード

KeyedProcessFunction

/**
 * KeyedStreamストリームを処理するための低レベルAPI関数
 * 入力ストリームの各要素は、processElementメソッドの呼び出しをトリガーする。.このメソッドは0個以上の出力を生成する.
 * その実装クラスは、データのタイムスタンプとタイマーにContext(timers).タイマーがトリガーされると、onTimerメソッドがコールバックされる。.
 * onTimerこのメソッドは、0個以上の出力を生成し、未来のタイマーを登録する。.
 *
 * 注:キーイングされたステートとタイマーにアクセスしたい場合は、KeyedStreamのKeyedProcessFunction.
 * また、KeyedProcessFunctionの親クラスであるAbstractRichFunctionはRichFunctionインターフェイスを実装している。
 * open()closeメソッド.
 *
 * @param <K> key 
 * @param <I> 入力要素のデータ型
 * @param <O> 出力要素のデータ型
 */
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
	private static final long serialVersionUID = 1L;
	/**
	 * 入力ストリームの各要素を処理する
	 * このメソッドは、FlatMapの機能と同様に、0個以上の出力を行う。
	 * さらに、このメソッドでは、内部の状態を更新したり、タイマーを設定したりすることもできる(timer)
	 * @param value 入力要素
	 * @param ctx Contextまた、Process Functionは、入力要素のタイムスタンプにアクセスし、タイムサーバーを取得することで、タイマーの登録や時刻の照会を行うことができる。
	 * ContextprocessElementが呼び出されたときのみ有効である。.
	 * @param out 返される結果値
	 * @throws Exception
	 */
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
	/**
	 * はコールバック関数であり、TimerServiceに登録されたタイマーがトリガーされると、関数
	 * @param timestamp タイマーを起動するためのタイムスタンプ
	 * @param ctx OnTimerContextタイムスタンプへのアクセスを可能にするために、TimeDomain列挙クラスは2つの時間タイプを提供している:
	 * EVENT_TIME PROCESSING_TIME
	 * また、タイマーの登録や時刻の照会に使用するタイムサーバーを取得することもできる。
	 * OnTimerContextonTimerメソッドが呼ばれた時のみ有効である。
	 * @param out 出力
	 * @throws Exception
	 */
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
	/**
	 * processElement()メソッドまたはonTimerメソッド呼び出し時のみ有効。
	 */
	public abstract class Context {
		/**
		 * 現在処理中の要素のタイムスタンプ、またはタイマーがトリガーされた時のタイムスタンプを指定する。
		 * プログラムに設定されている時間セマンティクスが、例えば、NULLの場合もある:TimeCharacteristic#ProcessingTime
		 * @return
		 */
		public abstract Long timestamp();
		/**
		 * アクセス時間と登録されたタイマー(timers)
		 * @return
		 */
		public abstract TimerService timerService();
		/**
		 * サイド出力に要素を出力する
		 * @param outputTag サイド出力マーカー
		 * @param value 出力レコード
		 * @param <X>
		 */
		public abstract <X> void output(OutputTag<X> outputTag, X value);
		/**
		 * 処理対象要素のkey
		 * @return
		 */
		public abstract K getCurrentKey();
	}
	/**
	 * onTimerメソッドは、onTimerメソッドが呼び出された時のみ使用できる。OnTimerContext
	 */
	public abstract class OnTimerContext extends Context {
		/**
		 * トリガー・タイマーの時間タイプには、以下の2種類がある:EVENT_TIME PROCESSING_TIME
		 * @return
		 */
		public abstract TimeDomain timeDomain();
		/**
		 * のトリガーとなるタイマー要素を取得する。key
		 * @return
		 */
		@Override
		public abstract K getCurrentKey();
	}
}

上記のソースコードでは、主に2つのメソッドがあり、以下のように分析されています:

  • processElement(I value, Context ctx, Collector out)

このメソッドは、ストリーム内の各レコードに対して 1 回呼び出され、0 個以上の要素を出力します。これは、Collector を通じて結果を出力する FlatMap の関数と同様です。また、この関数にはContextパラメータがあり、ユーザはContextを通じてタイムスタンプ、現在のレコードのキー値、TimerServiceにアクセスできます(TimerServiceについては、以下で詳しく説明します)。さらに、output メソッドを使用してデータをサイド出力に送信し、トリアージや遅延データの処理機能を実現することもできます。

  • onTimer(long timestamp, OnTimerContext ctx, Collector out)

このメソッドは、TimerService に登録されているタイマーがトリガーされたときに呼び出されるコールバック関数です。param timestampパラメータがトリガーされたタイマーのタイムスタンプを示す場合、Collectorはレコードを発行できます。上記のメソッドは Context パラメータを渡し、onTimer メソッドは OnTimerContext パラメータを渡します。onTimerContext はトリガーされたタイマーの時間フィールドを返すこともできます。

TimerService

KeyedProcessFunctionのソースコードでは、時間とタイマーにアクセスするためにTimerServiceが使用されています:

@PublicEvolving
public interface TimerService {
	String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
	String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
	// 現在の処理時間を返す
	long currentProcessingTime();
	// 現在のevent-time水位線(watermark)
	long currentWatermark();
	/**
	 * タイマーを登録し、処理時間がタイマークロックの時間と等しくなったときに呼び出される。
	 * @param time
	 */
	void registerProcessingTimeTimer(long time);
	/**
	 * タイマーを登録し、イベント時刻の水位がその時刻に達したときに起動する。
	 * @param time
	 */
	void registerEventTimeTimer(long time);
	/**
	 * 与えられたトリガー時間に従って削除するprocessing-time 
	 * タイマーが存在しない場合、メソッドは動作しない。
	 * つまり、タイマーは登録済みであり、廃止されることはない。
	 *
	 * @param time
	 */
	void deleteProcessingTimeTimer(long time);
 
	/**
	 * 与えられたトリガー時間に従って削除するevent-time  
	 * タイマーが存在しない場合、メソッドは動作しない。
	 * 	つまり、タイマーは登録済みであり、廃止されることはない。
	 * @param time
	 */
	void deleteEventTimeTimer(long time);
}

TimerService には以下のメソッドがあります:

  • currentProcessingTime()

現在の処理時間を返します。

  • currentWatermark()

現在のイベント時刻のウォーターラインタイムスタンプを返します。

  • deleteEventTimeTimer(long time)

現在のキーの処理時間タイマーを登録し、処理時間がタイマークロックと等しくなったときに呼び出されます。

  • RegisterEventTimeTimer(long time)

現在のキーについて、水位線のタイムスタンプがタイマークロック以上になったときに呼び出されるイベントタイムタイマーを登録します。

  • deleteEventTimeTimer(long time)

タイマーが存在しない場合、このメソッドは動作しません。

  • deleteEventTimeTimer(long time)

タイマーが存在しない場合、このメソッドは動作しません。

タイマーがトリガーされると、onTimer()関数がコールバックされ、システムはProcessElement()メソッドとonTimer()メソッドの間で同期されます。

注意: 上記のソース・コードには2つのエラー・メッセージがありますが、これはタイマーがキー付きストリームでのみ使用できることを意味します。 タイマーの一般的な使用方法としては、キー値が使用されなくなった後にキー付き状態をクリアしたり、時間に基づくカスタム・ウィンドウ・ロジックを実装したりすることが挙げられます。キー設定されていないストリームでタイマーを使用する場合は、KeySelectorを使用して固定パーティション値を返すことで、すべてのデータが1つのパーティションにのみ送信されるようにすることができます。

使用例

以下は、プロセスファンクション側のシャント処理用出力ファンクションを使用するもので、具体的なコードは以下の通りです:

public class ProcessFunctionExample {
 // サイド出力タグの定義
 static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") {
 };
 static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") {
 };
 static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") {
 };
 static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> {
 @Override
 public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception {
 switch (value.behavior) {
 case "buy":
 ctx.output(buyTags, value);
 break;
 case "cart":
 ctx.output(cartTags, value);
 break;
 case "fav":
 ctx.output(favTags, value);
 break;
 default:
 out.collect(value);
 }
 }
 }
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 // シミュレーションデータソース[userId,behavior,product]
 SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements(
 new UserBehaviors(1L, "buy", "iphone"),
 new UserBehaviors(1L, "cart", "huawei"),
 new UserBehaviors(1L, "buy", "logi"),
 new UserBehaviors(1L, "fav", "oppo"),
 new UserBehaviors(2L, "buy", "huawei"),
 new UserBehaviors(2L, "buy", "onemore"),
 new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction());
 //トリアージ後の購買行動データを取得する
 splitStream.getSideOutput(buyTags).print("data_buy");
 //トリアージ後の購買行動データを取得する
 splitStream.getSideOutput(cartTags).print("data_cart");
 //トリアージ後の収集行動のデータを取得する
 splitStream.getSideOutput(favTags).print("data_fav");
 env.execute("ProcessFunctionExample");
 }
}

まとめ

この記事では、まずFlinkが提供するいくつかの基本的なプロセス関数APIを紹介します。これらのAPIは、タイムスタンプやウォーターマークへのアクセスや、コールバック関数onTimer()を呼び出すためのタイマー登録のサポートを提供します。次に、ソースコードの観点から、これらのAPIの共通部分を説明し、各メソッドの具体的な意味と使用方法について詳しく説明します。最後に、Process Functionの一般的な使用例として、シャント処理を実現するためにProcess Functionを使用する例を示します。また、ユーザーはこれらの関数を、タイマーの登録を通じて、コールバック関数で処理ロジックを定義することもでき、非常に柔軟な使い方ができます。

公共ビッグデータ技術とデジタルウェアハウス、ビッグデータ情報パッケージの受信情報を返信します。

Read next

知る|TypeScriptとegg.jsの基本的な使い方を組み合わせる

Xiao Xiaoは再び学習状態に入り、この時Xiao Xiaoは最近jsのコンテンツに接触し、その後tsに関連するいくつかのコンテンツに接触したので、Xiao Xiao今回の主な学習コンテンツはtsです。 ここで、最初にnpm関連ツールがインストールされていることを確認する必要があります。 ここではeggのグローバルインストール、依存プロジェクトの初期化を実現します。 ここでは、関連するサービスレイヤーを設定します。 ミドルウェアは一般的にjwt認証関連のコンテンツに使用されます。ここでは ...

Jan 23, 2021 · 5 min read