blog

パートXV|Flinkのウィンドウ総合分析

ウィンドウは、ストリーミング計算で非常に一般的に使用される演算子の1つであり、ウィンドウを介して無限の流れの有限ストリームに分割することができますし、各ウィンドウの上に計算関数を使用すると、非常に柔軟...

Dec 29, 2020 · 29 min. read
シェア

ウィンドウは、非常に一般的にストリーミング計算で使用される演算子の1つで、ウィンドウを介して無限の流れの有限ストリームにカットすることができますし、計算関数の使用の上に、各ウィンドウでは、非常に柔軟な操作を実現することができます。この記事を通して、あなたは学ぶことができます:

  • ウィンドウズの基本概念と簡単な使い方
  • 組み込みウィンドウアサイナーの分類、ソースコード、および使用法
  • ウィンドウ機能の分類と使用
  • ウィンドウコンポーネントとライフサイクルソースコードの解釈
  • 完全なウィンドウの使用デモ例

Quick Start

何なのか

ウィンドウ(ウィンドウ)は、電卓の無限の流れの処理の中核であり、ウィンドウは、データフロー "バケット "の固定サイズに分割することができ、各ウィンドウでは、ユーザーがデータを処理するウィンドウの計算機能の一部を使用することができますので、統計結果の特定の時間範囲を取得します。たとえば、統計は5分ごとに最初のN個の製品に最もクリックされた最後の時間を出力するため、時間の1時間のウィンドウを使用して、固定時間範囲のデータを制限することができますし、集計処理の範囲内で境界データの集計を実行することができます。

KeyedWindowは、KeyedStreamに対するwindow(...)オペレーションを使用してWindowedStreamを生成します。Non-Keyed Windows では、DataStream に対して windowAll(...) オペレーションを使用して AllWindowedStream を生成します。非Keyed Windowsでは、次の図に示すように、DataStreamに対してwindowAll(...)操作を使用してAllWindowedStreamを生成します。注意: 一般に、AllWindowedStream を使用することは推奨されません。通常のストリー ムでウィンドウを作成すると、パーティション化されたすべてのストリームが 1 つのタスクに集ま り、並列度が 1 になり、パフォーマンスに影響するからです。

使用方法

ウィンドウとは何か?具体的には次のようなコードです:

Keyed Windows

stream
 .keyBy(...) // keyedStream window
 .window(...) //  : ウィンドウ・アロケータを指定する ( window assigner)
 [.trigger(...)] //  : トリガーを指定する。指定しない場合はデフォルト値が使用される。
 [.evictor(...)] //  : リムーバーを指定する。
 [.allowedLateness(...)] //  : データの処理を遅延させるかどうかを指定する。0 
 [.sideOutputLateData(...)] //  : サイド出力を設定する。
 .reduce/aggregate/fold/apply() //  : ウィンドウ計算関数を指定する
 [.getSideOutput(...)] //  : サイド出力からデータを取得する

Non-Keyed Windows

stream
 .windowAll(...) //  : ウィンドウ・アロケータを指定する ( window assigner)
 [.trigger(...)] //  : トリガーを指定する。指定しない場合はデフォルト値が使用される。
 [.evictor(...)] //  : リムーバーを指定する。
 [.allowedLateness(...)] //  : データの処理を遅延させるかどうかを指定する。0
 [.sideOutputLateData(...)] //  : サイド出力を設定する。
 .reduce/aggregate/fold/apply() //  : ウィンドウ計算関数を指定する
 [.getSideOutput(...)] //  : サイド出力からデータを取得する

ウィンドウ操作の省略

上記のコード・スニペットでは、keyedStreamでwindow(...)またはDataStreamのwindowAll(...)を使用するには、window assignerパラメータを渡す必要があります。windowAssignerパラメータを渡す必要がありますが、windowAssignerについては後で詳しく説明します。以下のコードを見てください:

stream.c(id).window(TumblingEventTimeWindows.of(Time.a(0x5))).reduce(MyReduceFunction);
stream.foo(TumblingEventTimeWindows.of(Time.a(0x5))).reduce(MyReduceFunction);

上記のコードは次のように省略できます:

stream.foo(id).obj(Time.b(0x5)).reduce(MyReduceFunction);
stream.c(Time.b(0x5)).reduce(MyReduceFunction);
// ユーザーによって使用される時間のタイプに応じて、それは異なるビルトインウィンドウを呼び出す。window Assigner
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

Window Assigners

分類

上記のWindowAssignerは、時間に基づいて、さらに、Flinkはまた、ウィンドウの大きさを定義するウィンドウ内の要素の数に応じて、つまり、ウィンドウの数を提供しています、この場合、データのカオス順序がある場合、それはウィンドウの計算結果の不確実性につながる。本稿では、時間ベースの窓の使用に焦点を当て、スペースの制限のため、窓の数については説明しません。

使用方法

以下は、Flinkに組み込まれた4つの時間ベースのウィンドウ割り当て機能の分析です。

Tumbling Windows

  • 図解

タンブリング・ウィンドウは、データを定義されたウィンドウに割り当て、一定の時間またはサイズでスライスします。これはよりシンプルで、指標が定期的にカウントされるシナリオに適しています。

  • 使用方法
datastream.foo(id).window(TumblingEventTimeWindows.of(Time.c(0xa))).process(new MyProcessFunction());
datastream.foo(id).window(TumblingProcessingTimeWindows.of(Time.c(0xa))).process(new MyProcessFunction());

Sliding Windows

  • 図解

スライディング ウィンドウは、スクロール ウィンドウの上にスライディング ウィンドウの時間を追加します。スクロールウィンドウはウィンドウの固定された時間サイズに従って前方にスクロールし、スライディングウィンドウは設定されたスライディング時間に従って前方にスライドします。ウィンドウの重なりの大きさはウィンドウの大きさとスライディング時間に依存し、スライディング時間がウィンドウの大きさより小さいと重なりが生じます。ウィンドウの不連続は、スライディングタイムがウィンドウの時間サイズより大きいときに発生し、どのウィンドウにも属さないデータが発生します。この2つが等しい場合、機能はスクロールウィンドウと同じです。スライディング・ウィンドウは、次のシナリオで使用されます。ユーザーは、設定された統計期間に従って、指定されたウィンドウ・サイズのインジケータを計算します。例えば、5分ごとに、過去1時間に最もクリックされた上位N個の製品を出力します。

  • 使用方法
datastream.a(id).window(SlidingEventTimeWindows.of(Time.bar(0xa), Time.bar(0x5))).process(new MyProcessFunction());
datastream.a(id).window(SlidingProcessingTimeWindows.of(Time.bar(0xa), Time.bar(0x5))).process(new MyProcessFunction());

Session Windows

  • 図解

セッション・ウィンドウ(Session Windows)は、主に一定期間にアクティビティの高いデータを計算用のウィンドウに集約するためのもので、ウィンドウはセッション・ギャップ(Session Gap)の条件によってトリガーされます。つまり、指定された期間にアクティブにアクセスされたデータがない場合、ウィンドウは終了したとみなされ、その後ウィンドウがトリガーされて結果が計算されます。データがウィンドウに途切れることなくアクセスされている場合、ウィンドウがトリガされることはないという状況にもなることに注意する必要があります。セッションウィンドウは、スライディングウィンドウやスクロールウィンドウと異なり、ウィンドウサイズやスライディング時間を固定する必要はありません。下図に示すように、セッション・ウィンドウズ・ウィンドウズ・タイプは、非連続的なデータ処理または周期的なデータ生成シナリオに適しています。

  • 使用方法
//  EventTime
datastream
 .keyBy(id)
 .window((EventTimeSessionWindows.withGap(Time.minutes(15)))
 .process(new MyProcessFunction())
//  processing-time
datastream
 .keyBy(id)
 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
 .process(new MyProcessFunction())

**注意:** セッションウィンドウの開始時刻と終了時刻は、受信したデータに依存するため、windowassigner はすべての要素をすぐに正しいウィンドウに割り当てません。をウィンドウサイズとして使用します。 ProcessWindowFunctionしたがって、セッションウィンドウの操作では、ReduceFunction、AggregateFunction、.NET などの、マージに使用する Trigger Window Function指定する必要があります。

Global Windows

  • 図解

グローバルウィンドウ(Global Windows)は、同じキーを持つ全てのデータを一つのウィンドウに割り当てて結果を計算します。ウィンドウには開始時刻と終了時刻がなく、ウィンドウはTrigerの助けを借りて計算をトリガーする必要があります。そのため、グローバルウィンドウの使用には細心の注意が必要です。ユーザーはウィンドウ全体で何をカウントしているのかを明確にし、対応するトリガーを指定する必要があります。

  • 使用方法
datastream.c(id).window(GlobalWindows.create()).process(new MyProcessFunction());

Window Functions

分類

Flinkは主に2種類のウィンドウ関数、すなわちインクリメンタル集計関数とフルウィンドウ関数を提供します。インクリメンタル集計ウィンドウの性能は、フルボリュームウィンドウ関数よりも優れています。なぜなら、インクリメンタル集計ウィンドウは、最終結果を計算するために中間結果状態に基づいているからです。一方、フルボリュームウィンドウ機能では、ウィンドウに入るデータをキャッシュし、ウィンドウがトリガーされるまでウィンドウ内のすべてのデータをトラバースして結果を計算するのを待つ必要があります。ウィンドウのデータ量が比較的大きかったり、ウィンドウの時間が長かったりすると、データをキャッシュするために多くのリソースが必要になり、パフォーマンスの低下を招きます。

  • インクリメンタル集計機能

    ReduceFunction、AggregateFunction、FoldFunctionが含まれます。

  • フルウィンドウ機能

    Includes: ProcessWindowFunction

使用方法

ReduceFunction

指定された計算方法に従って集計された同じ型の 2 つのデータ要素を入力し、同じ型の結果要素を出力します。入力要素のデータ型と出力要素のデータ型は同じでなければなりません。この実装の効果は、以前の結果値を使用して現在の値を集約することです。具体的な使用例は以下の通りです:

public class ReduceFunctionExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input
 .map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
 @Override
 public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
 return Tuple2.of(value.f0, value.f1);
 }
 })
 .keyBy(0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
 @Override
 public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
 // 最初の要素のグループ化に基づいて、2番目の要素の累積和を求める。
 return Tuple2.of(value1.f0, value1.f1 + value2.f1);
 }
 }).print();
 env.execute("ReduceFunctionExample");
 }
}

AggregateFunction

AggregateFunctionもReduceFunctionと同様に、中間状態の計算結果に基づくインクリメンタルな計算関数です。 AggregateFunctionはReduceFunctionに比べてウィンドウ計算の柔軟性が高いですが、実装が若干複雑で、AggregateFunctionインターフェースを実装し、4つのメソッドを書き換える必要があります。最大の利点は、中間結果のデータ型と最終結果のデータ型が入力データ型に依存しないことです。AggregateFunction のソースコードを以下に示します:

/** 
* @param <IN> 入力要素のデータタイプ
 * @param <ACC> 中間集計結果のデータ型
 * @param <OUT> 最終的な集計結果のデータ型
 */
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
	/**
	 * 新しいアキュムレータの作成
	 */
	ACC createAccumulator();
	/**
	 * 新しいデータをアキュムレータで集約して新しいアキュムレータを返す
	 */
	ACC add(IN value, ACC accumulator);
	/**
	 アキュムレータから最終結果を計算して
	 */
	OUT getResult(ACC accumulator);
	/**
	 * 2つのアキュムレータをマージして結果を返す
	 */
	ACC merge(ACC a, ACC b);
}

コードケースの具体的な使い方は以下の通り:

public class AggregateFunctionExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input.keyBy(0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .aggregate(new MyAggregateFunction()).print();
 env.execute("AggregateFunctionExample");
 }
 private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>,Tuple2<Long,Integer>,Tuple2<Long,Integer>> {
 /**
 * アキュムレータを作成し、値を初期化する
 * @return
 */
 @Override
 public Tuple2<Long, Integer> createAccumulator() {
 return Tuple2.of(0L,0);
 }
 /**
 *
 * @param value 入力要素の値
 * @param accumulator 中間結果値
 * @return
 */
 @Override
 public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
 return Tuple2.of(value.f0,value.f1 + accumulator.f1);
 }
 /**
 * 計算結果の値を取得する
 * @param accumulator
 * @return
 */
 @Override
 public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
 return Tuple2.of(accumulator.f0,accumulator.f1);
 }
 /**
 * 中間結果の値をマージする
 * @param a 中間結果値a
 * @param b 中間結果値b
 * @return
 */
 @Override
 public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
 return Tuple2.of(a.f0,a.f1 + b.f1);
 }
 }
}

FoldFunction

FoldFunction は、ウィンドウ内の入力要素を外部要素とマージするロジックを定義します。このインターフェイスは廃止されたものとしてマークされており、ユーザーは FoldFunction の代わりに AggregateFunction を使用することをお勧めします。

public class FoldFunctionExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input.keyBy(0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .fold(" ",new FoldFunction<Tuple3<Long, Integer, Long>,String>() {
 @Override
 public String fold(String accumulator, Tuple3<Long, Integer, Long> value) throws Exception {
 // の最初の要素の値をスプライシングする。" "文字列、出力
 return accumulator + value.f0 ;
 }
 }).print();
 env.execute("FoldFunctionExample");
 }
}

ProcessWindowFunction

先に述べたReduceFunctionとAggregateFunctionは、どちらも中間状態に基づくインクリメンタル計算を実装したウィンドウ関数です。中央値や複数値を求めるなど、計算のためにウィンドウ全体のすべてのデータを使用する必要がある場合があります。また、ProcessWindowFunction の Context オブジェクトは、ウィンドウの終了位置や水位線など、ウィンドウのメタデータ情報の一部にアクセスできます。

システム内部では、ProcessWindowFunctionによって処理されたウィンドウは、すべての割り当てられたデータをListStateに格納し、データを収集し、ウィンドウのメタデータやその他の機能へのアクセスを提供することで、ReduceFunctionやAggregateFunctionよりも幅広いシナリオで使用することができます。AggregateFunctionとReduceFunction。ProcessWindowFunction抽象クラスのソースコードを以下に示します:

/**
 * @param <IN> 入力データタイプ
 * @param <OUT> 出力データ型
 * @param <KEY> keyウィンドウ関数のデータタイプ
 * @param <W> window 
 */
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
	private static final long serialVersionUID = 1L;
	/**
	 * ウィンドウデータを計算し、0個以上の要素を出力する。
	 * @param key  key
	 * @param context ウィンドウのコンテキスト
	 * @param elements ウィンドウ内部のすべての要素
	 * @param out コレクターオブジェクトの出力要素
	 * @throws Exception
	 */
	public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
	/**
	 * ウィンドウが破棄されたときの状態を削除する
	 * @param context
	 * @throws Exception
	 */
	public void clear(Context context) throws Exception {}
	//contextウィンドウのメタデータ情報にアクセスできる.
	public abstract class Context implements java.io.Serializable {
	//現在計算中のウィンドウを返す
		public abstract W window();
 // 現在に戻るprocessing time. 
		public abstract long currentProcessingTime();
	// 現在に戻るevent-time  .
		public abstract long currentWatermark();
 //各キーと各ウィンドウのステートアクセッサ
		public abstract KeyedStateStore windowState();
	// 各キーのグローバル状態のためのステートアクセッサ.
		public abstract KeyedStateStore globalState();
		/**
		 * サイド出力にデータを出力する
		 * @param outputTag the {@code OutputTag} side output ロゴのアウトプット.
		 * @param value 出力データ.
		 */
		public abstract <X> void output(OutputTag<X> outputTag, X value);
	}
}

具体的な使用例を以下に示します:

public class ProcessWindowFunctionExample {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input.keyBy(t -> t.f0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .process(new MyProcessWindowFunction())
 .print();
 }
 private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Long>,Tuple3<Long,String,Integer>,Long,TimeWindow> {
 @Override
 public void process(
 Long aLong,
 Context context,
 Iterable<Tuple3<Long, Integer, Long>> elements,
 Collector<Tuple3<Long, String, Integer>> out) throws Exception {
 int count = 0;
 for (Tuple3<Long, Integer, Long> in: elements) {
 count++;
 }
 // ウィンドウごとのデータ数とウィンドウの出力数を数える
 out.collect(Tuple3.of(aLong,"" + context.window(),count));
 }
 }
}

ProcessWindowFunction

ProcessWindowFunctionは素晴らしい機能を提供しますが、唯一の欠点は、より大きなステート ストレージ データを必要とすることです。多くの場合、インクリメンタル アグリゲーションは非常に頻繁に使用されるため、インクリメンタル アグリゲーションとウィンドウ メタデータへのアクセスの両方をサポートする操作を実装するにはどうすればよいでしょうか。ReduceFunctionとAggregateFunctionをProcessWindowFunctionと組み合わせて使用することが可能です。この組み合わせでは、ウィンドウに割り当てられた要素が即座に計算され、ウィンドウがトリガーされると、集計の結果が ProcessWindowFunction に渡されるため、ProcessWindowFunction の process メソッドの Iterable パラメータにはインクリメンタルな集計の結果という 1 つの値しかありません。

  • ReduceFunctionとProcessWindowFunctionの組み合わせ
public class ReduceProcessWindowFunction {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input.map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
 @Override
 public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
 return Tuple2.of(value.f0, value.f1);
 }
 })
 .keyBy(t -> t.f0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .reduce(new MyReduceFunction(),new MyProcessWindowFunction())
 .print();
 env.execute("ProcessWindowFunctionExample");
 }
 private static class MyReduceFunction implements ReduceFunction<Tuple2<Long, Integer>> {
 @Override
 public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
 //インクリメンタルサムメーション
 return Tuple2.of(value1.f0,value1.f1 + value2.f1);
 }
 }
 private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
 @Override
 public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
 // ウィンドウの終了時刻と共に合計した結果を出力する。
 out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
 }
 }
}
  • AggregateFunctionとProcessWindowFunctionの組み合わせ
public class AggregateProcessWindowFunction {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 // シミュレーションデータソース
 SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
 Tuple3.of(1L, 91228L),
 Tuple3.of(1L, 91229L),
 Tuple3.of(1L, 91238L),
 Tuple3.of(1L, 91248L),
 Tuple3.of(2L, 91258L),
 Tuple3.of(2L, 91268L),
 Tuple3.of(2L, 91278L))
 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
 @Override
 public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
 return element.f2 * 1000;
 }
 });
 input.keyBy(t -> t.f0)
 .window(TumblingEventTimeWindows.of(Time.seconds(10)))
 .aggregate(new MyAggregateFunction(),new MyProcessWindowFunction())
 .print();
 env.execute("AggregateFunctionExample");
 }
 private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
 /**
 * アキュムレータを作成し、値を初期化する
 *
 * @return
 */
 @Override
 public Tuple2<Long, Integer> createAccumulator() {
 return Tuple2.of(0L, 0);
 }
 /**
 * @param value 入力要素の値
 * @param accumulator 中間結果値
 * @return
 */
 @Override
 public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
 return Tuple2.of(value.f0, value.f1 + accumulator.f1);
 }
 /**
 * 計算結果の値を取得する
 *
 * @param accumulator
 * @return
 */
 @Override
 public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
 return Tuple2.of(accumulator.f0, accumulator.f1);
 }
 /**
 * 中間結果の値をマージする
 *
 * @param a 中間結果値a
 * @param b 中間結果値b
 * @return
 */
 @Override
 public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
 return Tuple2.of(a.f0, a.f1 + b.f1);
 }
 }
 private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
 @Override
 public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
 // ウィンドウの終了時刻と共に合計した結果を出力する。
 out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
 }
 }
}

ウィンドウ・ライフサイクルの解釈

ライフサイクルイラスト

ウィンドウは、作成されてからウィンドウの計算を実行し、クリアされるまでの一連のプロセスを経ており、このプロセスがウィンドウのライフサイクルです。

まず、要素がウィンドウ演算子に入ると、WindowAssignerによって要素が入るウィンドウが割り当てられ、ウィンドウが存在しない場合はウィンドウが作成されます。

次に、データがウィンドウに入ると、インクリメンタル集計関数が使用されるかどうかに依存します。インクリメンタル集計関数 ReduceFunction または AggregateFunction が使用される場合、ウィンドウに追加された新しい要素は直ちにインクリメンタル計算をトリガし、計算結果がウィンドウのコンテンツとして使用されます。インクリメンタル集計関数が使用されない場合、ウィンドウに入るデータは ListState 状態に格納され、さらにウィンドウがトリガーされるのを待って、集計計算のためにウィンドウ要素をトラバースします。

各要素は、ウィンドウに入ると、そのウィンドウのトリガーに渡されます。トリガーは、ウィンドウがいつ計算を実行し、いつそれ自身と保存された内容をクリアする必要があるかを決定します。トリガは、割り当てられた要素や登録されたタイマに基づいて、ウィンドウの計算を実行したり、ウィンドウの内容をクリアする特定の瞬間を決定することができます。

ReduceFunctionやAggregateFunctionのようなインクリメンタルな集約関数が使用されている場合、集約の結果は直接出力されます。ProcessWindowFunction のようなフル ウィンドウ関数のみが含まれる場合、ウィンドウのすべての要素に作用して計算を実行し、結果を出力します。ReduceFunction と ProcessWindowFunction の組み合わせ、つまり増分集約ウィンドウ関数と完全ウィンドウ関数の組み合わせが使用される場合、完全ウィンドウ関数は増分集約関数の集約値に作用し、最終結果を出力します。

  • ケース1:増分集計ウィンドウ機能のみを使用する場合

  • ケース2:フルウィンドウ機能のみを使用する場合

  • シナリオ3:増分集合ウィンドウ関数とフルウィンドウ関数の併用

ディスペンサー(液体石鹸などの消耗品用)

WindowAssignerの役割は、1つまたは複数のウィンドウに入力要素を割り当てることです、WindowAssignerがウィンドウに割り当てられた最初の要素になるとき、ウィンドウが作成されますので、ウィンドウが作成され、ウィンドウは、少なくとも1つの要素を持っている必要があります。時間ベースのWindowAssigners、これらのassignersはWindowAssigner抽象クラスから継承されます。一般的に使用されるアサインについては上記で詳しく説明しました。継承図を見てみましょう:

次に、WindowAssigner抽象クラスのソースコードを次のように分析します:

/**
 * WindowAssigner要素を0個以上のウィンドウに割り当てる
 * ウィンドウ演算子内部では 要素はキーに従ってグループ化される(ウィンドウ演算子内のKeyedStream),
 * 同じキーとウィンドウを持つ要素の集まりをペインと呼ぶ。
 * @param <T> 割り当てられる要素のデータ型
 * @param <W> window :TimeWindow、GlobalWindow
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
	private static final long serialVersionUID = 1L;
	/**
	 * 要素が割り当てられたウィンドウのコレクションを返す
	 * @param element 割り当てられる要素
	 * @param timestamp 要素のタイムスタンプ
	 * @param context WindowAssignerContext 
	 * @return
	 */
	public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
	/**
	 * このWindowAssignerに関連付けられたデフォルトトリガーを返す
	 * @param env 実行環境
	 * @return
	 */
	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
	/**
	 * ウィンドウシリアライザーを返す
	 * @param executionConfig
	 * @return
	 */
	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
	/**
	 * ある要素がイベント時間に基づいてウィンドウに割り当てられた場合、それは以下を返す。true
	 * @return
	 */
	public abstract boolean isEventTime();
	/**
	 * コンテキストは現在の処理時間にアクセスできる。processing time
	 */
	public abstract static class WindowAssignerContext {
		/**
		 * 現在の処理時間を返す
		 */
		public abstract long getCurrentProcessingTime();
	}
}

はじめに

ウィンドウにデータを入力すると、ウィンドウがトリガー条件を満たすかどうかによって、WindowFuncitonの計算をトリガーするかどうかが決まります。トリガーは、ウィンドウが計算をトリガーするタイミングを決定し、条件の結果を出力することです。先に説明した組み込みのWindowAssignersは、すべて独自のデフォルト トリガを持っています。 Processing Timeが使用されている場合、処理時間がウィンドウの終端を超えるとトリガがトリガされます。Event Timeが使用される場合、ウォーターラインがウィンドウの終了時間を超えるとトリガされます。

Flinkは多くの組み込みトリガを提供しており、一般的なものはEventTimeTrigger、ProcessTimeTrigger、CountTriggerです。例えば、WindowsのEvent TimeタイプはEventTimeTriggerに対応し、基本的な原理は、現在のWatermarkがウィンドウのEndTimeを超えるかどうかを判断することです。基本原則は、現在の透かしがウィンドウの EndTime を超えているかどうかを判断し、超えていればウィンドウ内のデータの計算がトリガーされ、超えていなければトリガーされません。上記で分析した組み込みのWindowAssignerのデフォルトのトリガーについては、それぞれのソースコードで確認できます:

TumblingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }.EventTimeTrigger
TumblingProcessingTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }.ProcessingTimeTrigger
SlidingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }.EventTimeTrigger
TumblingProcessingTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }.ProcessingTimeTrigger
SlidingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }.EventTimeTrigger
TumblingProcessingTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }.ProcessingTimeTrigger
SlidingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); }.NeverTrigger

これらのトリガーはすべてTrigger抽象クラスを継承しており、具体的な継承関係は以下の通りです:

これらの組み込みトリガーに関する具体的な説明は以下の通りです:

EventTimeTrigger現在のWatermarkがウィンドウのEndTimeを超えているかどうか。超えている場合はウィンドウ内のデータの計算がトリガーされ、超えていない場合は計算がトリガーされません;
ProcessTimeTrigger現在の処理時間がウィンドウのEndTimeを超えるかどうか。超える場合はウィンドウ内のデータの計算がトリガーされ、その逆の場合は計算がトリガーされません;
ContinuousEventTimeTrigger周期的なトリガーウィンドウのインターバル時間、またはウィンドウの終了時間が現在の EventTime よりも短い場合、トリガーウィンドウを計算します;
ContinuousProcessingTimeTrigger周期的なトリガーウィンドウのインターバル時間、またはウィンドウの終了時間が現在のProcessTime未満であることに応じてトリガーウィンドウを計算します;
CountTriggerウィンドウ内のデータエントリ数が設定されたしきい値を超えているかどうかに基づいて、ウィンドウ計算をトリガするかどうかを決定します;
DeltaTriggerウィンドウ内のデータから計算されたデルタ指標が指定されたしきい値を超えるかどうかに基づいて、ウィンドウ計算がトリガされるかどうかを決定します。
PurgingTriggerどのようなトリガーも、パラメータとしてパージタイプのトリガーに変換することができ、計算が完了した後、データはクリーンアップされます。

抽象クラスTriggerに関するソースコードの説明は以下の通りです:

/**
 * @param <T> 要素のデータ型
 * @param <W> Window 
 */
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
	private static final long serialVersionUID = -L;
	/**
	 * このメソッドは、各要素がウィンドウに割り当てられるときに呼び出され、TriggerResultsの列挙を返す。
	 * 列挙は多くのトリガータイプを含んでいる、FIRE_AND_PURGEFIREPURGE
	 *
	 * @param element ウィンドウへの要素
	 * @param timestamp ウィンドウ要素に入るタイムスタンプ
	 * @param window  
	 * @param ctx タイマーコールバック関数を登録できるコンテキストオブジェクト
	 * @return
	 * @throws Exception
	 */
	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
	/**
	 * TriggerContextを使ってprocessing-timeタイマーがトリガーされるとメソッドが呼び出される
	 *
	 * @param time トリガー・タイマー・タイムスタンプ
	 * @param window タイマートリガーwindow
	 * @param ctx タイマーコールバック関数を登録できるコンテキストオブジェクト
	 * @return
	 * @throws Exception
	 */
	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
	/**
	 * TriggerContextを使ってevent-timeタイマーがトリガーされるとメソッドが呼び出される
	 *
	 * @param time トリガー・タイマー・タイムスタンプ
	 * @param window タイマートリガーwindow
	 * @param ctx タイマーコールバック関数を登録できるコンテキストオブジェクト
	 * @return
	 * @throws Exception
	 */
	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
	/**
	 * トリガがトリガステートのマージをサポートする場合、トリガはtrue
	 *
	 * @return
	 */
	public boolean canMerge() {
		return false;
	}
	/**
	 * 複数のウィンドウが1つのウィンドウにマージされるとき、そのメソッドは
	 *
	 * @param window マージ後window
	 * @param ctx タイマーコールバック関数を登録し、ステートにもアクセスできるコンテキストオブジェクト
	 * @throws Exception
	 */
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		throw new UnsupportedOperationException("This trigger does not support merging.");
	}
	/**
	 * トリガーによって保持されているウィンドウの状態を全てクリアする
	 * ウィンドウが破棄されたら、メソッド
	 *
	 * @param window
	 * @param ctx
	 * @throws Exception
	 */
	public abstract void clear(W window, TriggerContext ctx) throws Exception;
	/**
	 * Contextタイマーのコールバック関数を登録し、ステートを処理するためのメソッドパラメータでTriggerに渡される
	 */
	public interface TriggerContext {
		// 現在の処理時間を返す
		long getCurrentProcessingTime();
		MetricGroup getMetricGroup();
		// 現在のウォーターラインのタイムスタンプを返す
		long getCurrentWatermark();
		// ウィンドウの登録processing-timeウィンドウ関数のタイマー
		void registerProcessingTimeTimer(long time);
		// EventTimeタイマーを登録する
		void registerEventTimeTimer(long time);
		// 削除するprocessing-timeウィンドウ関数のタイマー
		void deleteProcessingTimeTimer(long time);
		// EventTimeタイマーを削除する
		void deleteEventTimeTimer(long time);
		/**
		 * ウィンドウの状態と現在のトリガーのキーを取り出す
		 */
		<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
		// getPartitionedState関数と同様、このメソッドは廃止された。
		@Deprecated
		<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
		// getPartitionedState関数と同様、このメソッドは廃止された。
		@Deprecated
		<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
	}
	// TriggerContext 
	public interface OnMergeContext extends TriggerContext {
		// 各ウィンドウの状態をマージする、状態はマージをサポートしなければならない。
		<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
	}
}

上のソースコードからわかるように、トリガーが呼び出されるたびに、TriggerResultオブジェクトが生成されます。CONTINUE、FIRE_AND_PURGE、FIRE、PURGEの4種類の動作がありますが、各タイプの具体的な意味については、まずTriggerResultのソースコードを見てください:

/**
 * トリガーメソッドの結果のタイプは、ウィンドウに対して実行されるアクションを決定する。window function
 * ウィンドウを破棄する必要があるかどうか
 * 注意:トリガーがFIREまたはFIRE_AND_PURGEウィンドウ内に要素がない場合、ウィンドウ関数は呼び出されない。
 */
public enum TriggerResult {
	// 何もしない、現在は計算をトリガーしない。
	CONTINUE(false, false),
	// ウィンドウ関数を実行し、結果を出力し、その後全ての状態をクリアする。
	FIRE_AND_PURGE(true, true),
	// ウィンドウ関数を実行し、結果を出力する。ウィンドウはクリアされず、データは保持され続ける。
	FIRE(true, false),
 
	// 計算をトリガーせずに内部ウィンドウデータをクリアする
	PURGE(false, true);
	
}

分類

Evictorsはオプションのコンポーネントで、主な役割はWindowFuctionに入る前と入った後のデータをクリアすることです。Flinkには3つの組み込みEvictorsがあります:それぞれCountEvictor、DeltaEvictor、TimeEvitorです。デフォルト値はありません。

  • CountEvictor:ウィンドウ内の要素数を一定に保ち、指定されたウィンドウ要素数を超えるデータは、ウィンドウがカウントされる前に除外されます;
  • DeltaEvictor: DeltaFunctionを定義し、閾値を指定することで、Windowsの要素と最新の要素の間のデルタの大きさを計算し、閾値を超える場合は現在のデータ要素を除外します;
  • TimeEvictor:時間間隔を指定することで、現在のウィンドウで最新の要素の時間をIntervalから差し引き、その結果より小さいデータをすべて削除します。

Evictors の継承関係図を以下に示します:

Evictorsインターフェイスのソースコードは以下の通りです:

/**
 * WindowFunctionの計算後にウィンドウ要素をクリアする。
 * @param <T> 要素のデータ型
 * @param <W> ウィンドウの種類
 */
@PublicEvolving
public interface Evictor<T, W extends Window> extends Serializable {
	/**
	 * ウィンドウ関数呼び出しにおける要素の選択的カリング
	 * @param elements ウィンドウの要素
	 * @param size ウィンドウの要素数
	 * @param window  
	 * @param evictorContext
	 */
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
	/**
	 * ウィンドウ関数の後に呼び出される要素の選択的カリング
	 * @param elements ウィンドウの要素.
	 * @param size ウィンドウの要素数.
	 * @param window  
	 * @param evictorContext
	 */
	void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
	// Evictorメソッドのパラメータに渡される値
	interface EvictorContext {
		// 現在に戻るprocessing time
		long getCurrentProcessingTime();
		MetricGroup getMetricGroup();
		// ウォーターラインの現在のタイムスタンプを返す
		long getCurrentWatermark();
	}
}

まとめ

この記事では、まずウィンドウの基本的な概念、分類、簡単な使い方を紹介します。次に、Flinkの組み込みウィンドウアサイナを一つずつ説明し、図解とコードスニペットを示します。そして、ウィンドウ関数の分類と詳細な使用例を含むFlinkのウィンドウ関数を紹介します。最後に、ウィンドウのライフサイクルに関わるコンポーネントを分析し、各コンポーネントのソースコードを分析します。

公共ビッグデータ技術とデジタルウェアハウス、ビッグデータ情報パッケージの受信情報を返信します。

Read next

025 Javaツールに関する質問:バージョン管理ツール

はじめに\n\nGitとSVNの違いは?\nGitは分散型ですが、SVNは分散型ではありません。\nGitはコンテンツをメタデータとして保存しますが、SVNはコンテンツをファイルとして保存します。\nGit のコンテンツの完全性は SVN より優れています。\nSVNは、指定された一つの中央リポジトリしか持つことができません。一方 Git は

Dec 29, 2020 · 3 min read