blog

パートVII|FlinkテーブルAPIとSQLプログラミングガイド time属性の(3)

時間プロパティの紹介\n処理時間\nイベント時間\n時間プロパティの紹介\nFlink TableAPI と SQL における時間ベースの操作には、時間セマンティクスの指定が必要です。\n時間属性は、...

Aug 27, 2020 · 6 min. read
シェア
  • 時間プロパティの紹介
  • 処理時間
  • イベント時間

時間プロパティの紹介

Flink TableAPI & SQL の時間ベースの操作では、時間セマンティクスを指定する必要があり、テーブルは指定されたタイムスタンプに基づいて論理的な時間属性を提供できます。

時間属性はテーブル・スキャマの一部であり、DDLを使用してテーブルを作成するとき、DataStreamをテーブルに変換するとき、またはTableSourceを使用するときに定義されます。いったん時間属性が定義されると、その時間属性はフィールドへの参照とみなすことができ、フィールドを時間ベースの操作で使用することができます。

時間属性はタイムスタンプのようなもので、アクセスして計算に参加することができます。時間属性が計算に参加する場合、その時間属性は通常のタイムスタンプに原子化されます。通常のタイムスタンプはFlinkの時間と水のラインと互換性がなく、時間ベースの操作では使用できません。

Flink TableAPI & SQL が必要とする時間属性は、Datastream プログラムで以下のように指定できます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //  
// 以下を選択できる。:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

処理時間

最も単純な時間セマンティクスの1つですが、一貫性のある結果を保証しないローカル・マシン・タイムに基づいて、この時間セマンティクスを使用すると、タイムスタンプを抽出し、透かしを生成する必要はありません。処理時間属性を定義するには、以下の3つの方法があります。

DDL ステートメントがテーブルを作成するときの処理時間を定義します。

時刻を扱う属性は、DDL 文の計算カラムとして定義することができ、以下のように PROCTIME() 関数を使用する必要があります:

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 user_action_time AS PROCTIME() -- 追加フィールドを処理時間属性として宣言する
) WITH (
 ...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分単位のスクロール・ウィンドウ

DataStreamからTableへの変換時の処理時間の定義

DataStreamをテーブルに変換する場合、スキーマ定義で.proctime属性を使って時間属性を指定し、以下のように他のスキーマ・フィールドの末尾に配置することができます:

DataStream<Tuple2<String, String>> stream = ...;
// 追加論理フィールドを処理時間属性として宣言する
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

TableSource の使用

DefinedProctimeAttribute TableSourceをカスタマイズし、以下のようにインターフェイスを実装します:

// 処理時間属性を持つテーブル・ソースを定義する。
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name" , "data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}
	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// ストリームの作成
		DataStream<Row> stream = ...;
		return stream;
	}
	@Override
	public String getProctimeAttribute() {
 // このフィールドは3番目のフィールドとしてスキーマに追加される。
		return "user_action_time";
	}
}
// テーブルソースを登録する
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

イベント時間

レコードの特定のタイムスタンプに基づき、データが文字化けしたり遅れたりしても、結果の一貫性が保証されます。処理時間属性を定義するには、以下の3つの方法があります。

イベント時刻を固定したテーブルを作成するDDL文

イベント時間属性は、WATERMARK文を使用して以下のように定義できます:

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 user_action_time TIMESTAMP(3),
 -- ユーザーを宣言する_action_timeイベント時間属性として、5S遅延を許可する
 WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
 ...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

データストリームからテーブルへの変換時のイベント時間の定義

スキーマを定義する際、イベント時間属性は.rowtime属性で指定され、タイムスタンプは水位線とともにDataStreamで指定する必要があります。例えば、データセットでは、イベント時刻属性はevent_timeであり、その時点でテーブルのイベント時刻フィールドに'event_time. rowtime'で指定することができます。

現在、FlinkはEventTimeフィールドを定義するために以下の2つの方法をサポートしています:

//  :
// タイムスタンプの抽出と透かしの割り当て
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 追加の論理フィールドをイベント時間属性として宣言する
// テーブル・スキーマの最後に、ユーザー_action_time.rowtimeイベント時間属性を定義する
// システムはTableEnvironmentのイベント時間プロパティを取得する。
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");
//  :
// 最初のフィールドからタイムスタンプを抽出し、透かしを割り当てる。
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 最初のフィールドはタイムスタンプを抽出するために されているが、対応するフィールドをイベント時間プロパティとして直接使用することができる。
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
// 使用:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

TableSource の使用

// 行時間属性を持つテーブル・ソースを定義する
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name", "data", "user_action_time"};
		TypeInformation[] types =
		 new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
		return Types.ROW(names, types);
	}
	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
 // ユーザーに基づいてストリームを作成する_action_time属性割り当ての透かし
		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
		return stream;
	}
	@Override
	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
 // タグ ユーザー_action_timeイベント時間属性としてのフィールド
 // ユーザーを作成する_action_time時間属性フィールドを特定する記述子
		RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
			"user_action_time",
			new ExistingField("user_action_time"),
			new AscendingTimestamps());
		List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
		return listRowtimeAttrDescr;
	}
}
// register 
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

概要

この記事では、Flink Table API と SQL で時間セマンティクスを使用する方法について説明します。 使用できる時間セマンティクスには、処理時間とイベント時間の 2 種類があります。それぞれの時間セマンティクスの使い方を詳しく説明します。

公共ビッグデータ技術とデジタルウェアハウス、ビッグデータ情報パッケージの受信情報を返信します。

Read next

5つのスキルのPrestoパフォーマンスチューニング

Prestoは、分散型クエリエンジンであり、それ自体はデータを格納しませんが、様々なデータソースにアクセスすることができ、データソース間でカスケードクエリをサポートしています。 ディスカバリーサーバー: SQLステートメントを解析し、実行プランを生成し、実行のためにワーカーノードに実行タスクを配布します。 ディスカバリー・サーバー: ワーカー・ノードがディスカバリー・サーバーに...

Aug 27, 2020 · 3 min read