blog

パート XIV|次元テーブルJOINのFlink SQL

ディメンションテーブルは、ナンバーウェアハウスの概念の1つです。ディメンションテーブルのディメンション属性は、データを観察する視点です。オフラインのナンバーウェアハウスを構築する場合、ディメンションテ...

Jun 12, 2020 · 10 min. read
シェア

ディメンションテーブルは、ナンバーウェアハウスにおける概念で、ディメンションテーブルのディメンション属性は、データを見る視点です。オフラインのナンバーウェアハウスを構築する場合、ディメンションテーブルは通常、スターモデルを構築するためにファクトテーブルと関連付けられます。リアルタイム・ナンバー・ウェアハウスにおいても、ディメンション・テーブルとファクト・テーブルの概念は同じで、ファクト・テーブルは通常kafkaに格納され、ディメンション・テーブルは通常外部デバイスに格納されます。各ストリーミング・データに対して、外部ディメンジョン・テーブル・データ・ソースを関連付け、リアルタイム計算のためのデータ相関クエリを提供できます。ディメンジョン・テーブルは常に変化している可能性があり、ディメンジョン・テーブルを JOIN するときは、このレコードがディメンジョン・テーブルのスナップショットに関連付けられた瞬間を指定する必要があります。Flink SQL JOIN は、ディメンジョン・テーブルの現在のスナップショットの関連付けのみをサポートし、ファクト・テーブルの行時間に対応するディメンジョン・テーブルのスナップショットはサポートしないことに注意してください。これについては、この記事を参照してください:

  • Flink SQLを使ってテーブルを作成する方法
  • Kafkaデータソーステーブルの定義方法
  • MySQLデータソース・テーブルの定義方法
  • Temporal Table Joinとは
  • 次元テーブルJOINのケース

Flink SQL

:この記事のすべての操作は、Flink SQL cli

テーブルの作成構文

CREATE TABLE [catalog_name.][db_name.]table_name
 (
 { <column_definition> | <computed_column_definition> }[ , ...n]
 [ <watermark_definition> ]
 )
 [COMMENT table_comment]
 [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
 WITH (key1=val1, key2=val2, ...)
-- テーブルフィールドの定義
<column_definition>:
 column_name column_type [COMMENT column_comment]
-- 計算カラムの定義
<computed_column_definition>:
 column_name AS computed_column_expression [COMMENT column_comment]
-- 水位線を定義する
<watermark_definition>:
 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

解説

COMPUTED COLUMN(カラムの計算)

column_name AS computed_column_expression計算列は、データ・ソース・テーブルに物理的に格納されていない、生成された仮想列です。計算列は、元のデータソース・テーブルのフィールド、演算子、および組み込み関数から生成できます。例えば、使用した金額の計算列を定義するには、テーブルの価格 * 数量の計算を使用できます。

計算列は、Define Time Attributes関数でproc AS PROCTIME()という構文で処理時間属性を定義するためによく使用されます。さらに、元のイベント時刻がTIMESTAMP(3)型でなかったり、JSON文字列内に存在する可能性があるため、計算列はイベント時刻抽出列として使用することができます。

叫びのキュー

1.データソースを読み込んだ後に計算されるソーステーブル上の計算列を定義します。計算列は SELECT クエリ文に従う必要があります;

2.計算列は、INSERT文では挿入できません。INSERT文は、計算列ではなく、ターゲット・テーブルの実際のスキーマのみを含むことができます。

水位線

水位線は、以下のシンタックスでテーブルのイベント時間属性を定義します。

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

どのrowtime_column_nameは、テーブルがすでにイベント時刻フィールドに存在することを示す、それはイベント時刻フィールドがTIMESTAMP(3)型でなければならないことに注意する価値がある、つまり、yyyy - MM - ddのHH:mm:ssの形など、データ型のこの形式でない場合は、計算列の定義によって変換する必要があります。

watermark_strategy_expressionこの式の戻りデータ型はTIMESTAMP(3)型でなければなりません。

Flinkは、ウォーターライン生成のために一般的に使用される多くの戦略を提供します:

  • 厳密に単調増加する透かし:構文は次のとおりです。

    WATERMARK FOR rowtime_column AS rowtime_column
    

つまり、タイムスタンプを水位線として直接使用します。

  • 増分水位線:シンタックスは次のとおりです。

    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
    
  • 乱れた水位線:シンタックス

    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
    -- 例えば、5秒間の無秩序を許容する
    WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
    

パーティショニング

特定のフィールドに基づいてパーティショニングされたテーブルを作成し、各パーティションはファイルパスに対応します。

WITH

叫びのキュー

注:テーブル作成時に指定するテーブル名には3つの形式があります:

カタログ名.データベース名.テーブル名

データベース名.テーブル名

テーブル名

最初の形式では、テーブルは'catalog_name'という名前のカタログと'db_name'dという名前のデータベースのメタデータに登録されます;

2番目の形式では、テーブルは現在の実行環境のカタログと'db_name'という名前のデータベースのメタデータに登録されます;

3番目の形式:現在の実行環境のカタログとデータベースのメタデータにテーブルが登録されます。

Kafkaデータテーブルの定義

kafkaは、リアルタイムのナンバーウェアハウスを構築するために一般的に使用されるデータストレージデバイスです。 Flink SQLを使用してkafkaデータソーステーブルを作成するための構文は次のとおりです:

CREATE TABLE MyKafkaTable (
 ...
) WITH (
 'connector.type' = 'kafka', -- Joinタイプ 
 'connector.version' = '0.11',--  : 利用可能なKafkaのバージョンは以下の通り: 0.8/0.9/0.10/0.11/universal
 'connector.topic' = 'topic_name', --  : トピック名
 'connector.properties.zookeeper.connect' = 'localhost:2181', --  : zkJoinアドレス
 'connector.properties.bootstrap.servers' = 'localhost:9092', --  : KafkaJoinアドレス
 'connector.properties.group.id' = 'testGroup', -- : 消費者グループ
 --  : , earliest-offset/latest-offset/group-offsets/specific-offsets
 'connector.startup-mode' = 'earliest-offset', 
 --  : オフセットが特定のオフセットとして指定される場合、各パーティションの特定の場所が指定される。
 'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
 'connector.sink-partitioner' = '...', --  : sink ,fixed/round-robin/custom
 --  : パーティショナーをカスタマイズする場合、パーティショナーのクラス名を指定する。
 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
 'format.type' = '...', --  : フォーマットを指定する , csv/json/avroをサポートする
 -- 更新モードを指定し、append/retract/upsertをサポートする。
 'update-mode' = 'append',
)

叫びのキュー

  • 特定のオフセット位置を指定:デフォルトは、現在の消費者グループによって提出されたオフセットで消費を開始します。
  • シンクパーティショニング:デフォルトでは、できるだけ多くのパーティションに書き込みますが、独自のパーティショニング戦略を定義することもできます。ラウンドロビン・パーティショナーを使用すると、不均一なパーティショニングを避けることができますが、Flinkインスタンスとkafkaブローカー間のネットワーク接続が多くなります。
  • 一貫性の保証:デフォルトのシンクのセマンティクスは一度だけ
  • Kafka 0.10+は タイムスタンプ付き:kafka 0.10以降、kafkaメッセージには、メッセージのメタデータとして、レコードがkafkaトピックに書き込まれた時刻を示すタイムスタンプが付き、このタイムスタンプはイベント時刻属性として使用できます。
  • **カフカ0.11+**バージョン:1.7以降のFlinkは、コネクタとしてカフカのユニバーサルバージョンの使用をサポートし、カフカ0.11のバージョン以降と互換性があります。

MySQLデータテーブルの定義

CREATE TABLE MySQLTable (
 ...
) WITH (
 'connector.type' = 'jdbc', --  : jdbc 
 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', --  : JDBC url
 'connector.table' = 'jdbc_table_name', --  :  
 --  : JDBC driver,設定されていない場合は、url経由で自動的に抽出される。 
 'connector.driver' = 'com.mysql.jdbc.Driver', 
 'connector.username' = 'name', --  : データベースユーザー名
 'connector.password' = 'password',--  : データベースパスワード
 -- オプションで、入力によって分割されるフィールドの名前を指定できる。.
 'connector.read.partition.column' = 'column_name',
 -- オプション、パーティション数.
 'connector.read.partition.num' = '50', 
 -- オプション、最初のパーティションの最小値.
 'connector.read.partition.lower-bound' = '500',
 -- オプション、最後のパーティションの最大値
 'connector.read.partition.upper-bound' = '1000', 
 -- オプションで、一度に抽出するデータの行数を指定する。デフォルトは0であり、この設定は無視される。
 'connector.read.fetch-size' = '100', 
 -- オプションで、キャッシュされるデータの最大行数を調べ、それ以上の場合は設定を変更し、古いデータはクリアされる。
 'connector.lookup.cache.max-rows' = '5000', 
 -- オプションで、キャッシュが生き残るための最大時間を調べることができる。.max-rowsキャッシュ.ttlまた
 'connector.lookup.cache.ttl' = '10s', 
 -- オプション、データクエリの最大再試行回数
 'connector.lookup.max-retries' = '3', 
 -- オプションで、フラッシュ行の最大数にデータを書き込む、デフォルトの5000は、設定を変更するよりも、ブラシデータをトリガする 
 'connector.write.flush.max-rows' = '5000', 
 --オプションで、データをフラッシュする間隔を指定できる。この間隔を超えると、非同期スレッドを通してデータがフラッシュされる。 
 'connector.write.flush.interval' = '2s', 
 -- オプション、データ書き込み失敗時の最大リトライ回数
 'connector.write.max-retries' = '3' 
)

Temporal Table Join

使用構文

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1

注釈現在のところ、INNER JOIN と LEFT JOIN のみがサポートされています。 接合の際、FOR SYSTEM_TIME AS OF を使用する必要があります。FOR SYSTEM_TIME AS OF table1.proctime左テーブルのレコードを右ディメンジョン・テーブルと結合するときに、現在の処理時間ディメンジョン・テーブルに対応するスナップショット・データのみが一致することを示すために使用します。

サンプル

SELECT
 o.amout, o.currency, r.rate, o.amount * r.rate
FROM
 Orders AS o
 JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
 ON r.currency = o.currency

使用上の注意

  • Blinkプランナーのみがサポートされています。
  • SQLのみ、Table APIは現在サポートされていません。
  • イベント時刻に基づくテンポラル・テーブル・ジョインは現在サポートされていません。
  • ディメンジョン・テーブルは変更され続ける可能性があり、JOIN 動作の後でディメンジョン・テーブルのデータが変更されても、関連するディメンジョン・テーブルのデータは同期的に変更されません。
  • ディメンジョン・テーブルとディメンジョン・テーブルは JOIN できません。
  • ディメンジョン・テーブルは、主キーを指定する必要があります。ディメンジョン・テーブルを JOIN する場合、ON 条件にすべての主キーに相当するものが含まれている必要があります。

テンポラルテーブルJOINのケース

背景

Kafkaにはpv、buy、cart、favなどのユーザー行動データがあり、MySQLには都道府県と地域のディメンショナルテーブルデータがあります。この2つのテーブルをJOINして、各地域の購入行動数をカウントします。

ステップ

ディメンジョン・テーブルは、ディメンジョン・テーブルのデータ・ソースを作成するために、以下のように MySQL に格納されます:

CREATE TABLE dim_province (
 province_id BIGINT, -- 州ID
 province_name VARCHAR, -- 県名
	region_name VARCHAR -- 地域名
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://192.3:3306/mydw',
 'connector.table' = 'dim_province',
 'connector.driver' = 'com.mysql.jdbc.Driver',
 'connector.username' = 'root',
 'connector.password' = '123qwe',
 'connector.lookup.cache.max-rows' = '5000',
 'connector.lookup.cache.ttl' = '10min'
);

ファクトテーブルはkafkaに格納され、データはJSON形式のユーザーのクリック行動です:

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}

カフカデータソーステーブルを以下のように作成します:

CREATE TABLE user_behavior (
 user_id BIGINT, -- ユーザーID
 item_id BIGINT, -- 製品ID
 cat_id BIGINT, -- カテゴリーID
 action STRING, -- ユーザーの動作
	province INT, -- ユーザーがいる州
	ts BIGINT, -- ユーザーの行動発生のタイムスタンプ
 proctime as PROCTIME(), -- 計算カラムから処理時間カラムを生成する
	eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- イベント時間
 WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND -- eventTimeに透かしを定義する
) WITH (
 'connector.type' = 'kafka', -- Kafkaコネクタを使う
 'connector.version' = 'universal', -- kafka バージョン、ユニバーサルサポート 0.11 上記のバージョン
 'connector.topic' = 'user_behavior', -- kafka 
 'connector.startup-mode' = 'earliest-offset', -- オフセット、開始オフセットから読み込む
	'connector.properties.group.id' = 'group1', -- 消費者グループ
 'connector.properties.zookeeper.connect' = 'kms-2:2181,kms-3:2181,kms-4:2181', -- zookeeper  
 'connector.properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker  
 'format.type' = 'json' -- データソースのフォーマットはjsonである。
);

地域の売上を表すMySQL結果テーブルの作成

CREATE TABLE region_sales_sink (
 region_name STRING, -- 地域名
 buyt BIGINT --  
) WITH (
 
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://192.3:3306/mydw',
 'connector.table' = 'top_region', -- MySQLに挿入されるデータのテーブル
 'connector.driver' = 'com.mysql.jdbc.Driver',
 'connector.username' = 'root',
 'connector.password' = '123qwe',
 'connector.write.flush.interval' = '1s'
);

ユーザー行動データと県ディメンジョン・テーブル・データの結合

CREATE VIEW user_behavior_detail AS
SELECT
 u.user_id, 
 u.item_id,
 u.cat_id,
 u.action, 
 p.province_name,
 p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province = p.province_id;

地域の売上を計算し、計算結果をMySQLに書き込みます。

INSERT INTO region_sales_sink
SELECT 
 region_name,
 COUNT(*) buyt
FROM user_behavior_detail
WHERE action = 'buy'
GROUP BY region_name;

結果表示:

Flink SQL> select * from region_sales_sink; -- Flink SQL cliで見る

mysql> select * from top_region; -- MySQLデータを見る

まとめ

この記事では、Temporal Table Joinを使ったFlinkSQLの次元テーブル結合に焦点を当てます。まず、Flink SQLのテーブル作成の基本構文を紹介し、その詳細について説明します。次に、MySQLと同様にKafka用のデータソーステーブルを作成する方法を説明します。次に、Temporal Table Joinの基本概念と使用構文を紹介します。最後に、次元テーブルJOINの完全な例を示します。

公共ビッグデータ技術とデジタルウェアハウス、ビッグデータ情報パッケージを取得するための情報に返信します。

Read next

リートコード33 回転ソートされた配列の検索 メモ

もし num[mid] > num[r] ならば、それは正常でないことを意味します。正常なソート配列は左が小さく右が大きいはずですから、ピボットポイントが mid の右にあることを示しています。l = mid + 1 のように、l を右にシフトして絞り込みます;

Jun 12, 2020 · 2 min read