blog

CanalとFlinkをベースにしたデータのリアルタイムインクリメンタル同期の実装(2つ)

本記事では、主にBinlogリアルタイム収集とオフライン処理Binlogビジネスデータを復元するための2つの側面に焦点を当て、Hive数ウェアハウスに正確かつ効率的にDBデータを実現する方法を紹介しま...

Dec 8, 2020 · 8 min. read
シェア

本稿では、主にBinlogリアルタイム収集とオフライン処理Binlog復元ビジネスデータの2つの側面に焦点を当て、Hive数ウェアハウスに正確かつ効率的にDBデータを実現する方法を紹介します。

背景

データウェアハウスのモデリングでは、何も処理されていない生のビジネスレイヤーデータをODS(Operational Data Store)データと呼びます。インターネット企業では、ODSデータには業務ログデータと業務DBデータの2種類が一般的です。業務DBデータについては、MySQLなどのリレーショナルデータベースから業務データを収集し、Hiveに取り込むことがデータウェアハウス制作の重要なポイントになります。MySQLデータをHiveに正確かつ効率的に同期するにはどうすればよいでしょうか。MySQLに直接接続してテーブル内のデータを選択し、中間ストレージとしてローカルファイルに保存して、最後にそのファイルをHiveテーブルにロードします。このソリューションの利点は実装が簡単なことですが、ビジネスの発展に伴い、欠点が徐々に露呈しています:

  • パフォーマンスのボトルネック:ビジネス規模の拡大に伴い、MySQLから選択→ローカルファイルに保存→Hiveにロードというデータフローに時間がかかるようになり、下流の倉庫生産の時間要件を満たすことができなくなっています。
  • MySQLから直接大量のデータを選択すると、MySQLへの影響が非常に大きくなり、クエリが遅くなったり、ビジネスラインの通常のサービスに影響が出やすくなります。
  • Hive独自の構文はUpdateやDeleteなどのSQLプリミティブをサポートしていないため、Update/Deleteが発生するMySQLのデータではうまく動作しません。

これらの問題を完全に解決するために、我々は徐々にCDC +マージ技術的なソリューション、すなわち、リアルタイムBinlog収集+オフライン処理Binlog復元ビジネスデータのような一連のソリューションに目を向けてきました。 Binlogは、MySQLで発生したすべてのデータの変更を記録するMySQLのバイナリログであり、MySQLクラスタのマスターとスレーブの同期は、Binlogに基づいています。MySQL クラスタのマスターとスレーブの同期は Binlog に基づいています。

実装のアイデア

まず、KafkaからHDFSへのBinlogデータのプルにはFlinkが使用されます。

その後、各ODSテーブルについて、まずMySQLからHiveにストックデータを読み込むために1回限りのスナップショットを作成する必要があり、その下層プロセスではMySQLへの直接接続を使用してデータをSelectし、Sqoopを使用して一度に完全にインポートすることができます。

最後に、各ODSテーブルについて、ストックデータとその日の増分から生成されたBinlogに基づいて毎日マージが実行され、ビジネスデータが復元されます。

Binlogは、ストリーミング、Binlogのリアルタイム収集を通じて、データ処理のニーズの一部は、1日1回のバッチ処理からリアルタイムストリーミングに広がっています。MySQLのパフォーマンスとアクセス圧の両方から、大幅に改善されます。 Binlog自体がデータの変化の種類を記録し、いくつかの意味処理を通じて、正確なデータの削減を実現することができます。

実装シナリオ

FlinkKafkaのbinlogログの処理

kafkaソースを使用して、読み込まれたデータに対してJSON解析が実行され、解析されたフィールドは、次のコードに示すように、Hiveのスキーマ形式に準拠した文字列にスプライスされます:

package com.etl.kafka2hdfs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Map;
import java.util.Properties;
/**
 *  @Created with IntelliJ IDEA.
 *  
 *  @Date: 
 *  @Time: 12:52
 *  
 */
public class HdfsSink {
 public static void main(String[] args) throws Exception {
 String fieldDelimiter = ",";
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 // checkpoint
 env.enableCheckpointing();
 //env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
 env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
 CheckpointConfig config = env.getCheckpointConfig();
 config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
 // source
 Properties props = new Properties();
 props.setProperty("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
 // only required for Kafka 0.8
 props.setProperty("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
 props.setProperty("group.id", "test123");
 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
 "qfbap_ods.code_city", new SimpleStringSchema(), props);
 consumer.setStartFromEarliest();
 DataStream<String> stream = env.addSource(consumer);
 // transform
 SingleOutputStreamOperator<String> cityDS = stream
 .filter(new FilterFunction<String>() {
 // DDLオペレーションをフィルタリングする
 @Override
 public boolean filter(String jsonVal) throws Exception {
 JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
 return record.getString("isDdl").equals("false");
 }
 })
 .map(new MapFunction<String, String>() {
 @Override
 public String map(String value) throws Exception {
 StringBuilder fieldsBuilder = new StringBuilder();
 // JSONデータを解析する
 JSONObject record = JSON.parseObject(value, Feature.OrderedField);
 // 最新のフィールド値を取得する
 JSONArray data = record.getJSONArray("data");
 // イテレーション、1つの要素のみを持つフィールド値のJSON配列
 for (int i = 0; i < data.size(); i++) {
 // JSON配列のi番目の要素にアクセスする
 JSONObject obj = data.getJSONObject(i);
 if (obj != null) {
 fieldsBuilder.append(record.getLong("id")); // シリアルID
 fieldsBuilder.append(fieldDelimiter); // フィールドセパレーター
 fieldsBuilder.append(record.getLong("es")); //ビジネス・タイムスタンプ
 fieldsBuilder.append(fieldDelimiter);
 fieldsBuilder.append(record.getLong("ts")); // ログのタイムスタンプ
 fieldsBuilder.append(fieldDelimiter);
 fieldsBuilder.append(record.getString("type")); // 操作タイプ
 for (Map.Entry<String, Object> entry : obj.entrySet()) {
 fieldsBuilder.append(fieldDelimiter);
 fieldsBuilder.append(entry.getValue()); // テーブルのフィールドデータ
 }
 }
 }
 return fieldsBuilder.toString();
 }
 });
 //cityDS.print();
 //stream.print();
 // sink
 // 以下の条件のいずれかが満たされると、新しいファイルがローリングベースで生成される。
 RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
 .withRolloverInterval(60L * 1000L) //新しいファイルへのローリング書き込み時間、デフォルトは60秒。
 .withMaxPartSize( * 128L) //各ファイルの最大サイズを設定する。デフォルトは128Mで、ここでは128Mに設定する。
 .withInactivityInterval(60L * 1000L) //デフォルトは60秒、データがタイムアウトの非アクティブな状態に書き込まれていない新しいファイルをスクロールする
 .build();
 
 StreamingFileSink<String> sink = StreamingFileSink
 //.forRowFormat(new Path("file:///E://binlog_db/city"), new SimpleStringEncoder<String>())
 .forRowFormat(new Path("hdfs://kms-1:8020/binlog_db/code_city_delta"), new SimpleStringEncoder<String>())
 .withBucketAssigner(new EventTimeBucketAssigner())
 .withRollingPolicy(rollingPolicy)
 .withBucketCheckInterval(1000) // バケットのチェック間隔、ここでは1Sとする
 .build();
 cityDS.addSink(sink);
 env.execute();
 }
}

HDFSへのFlink Sinkでは、StreamingFileSinkが従来のBucketingSinkに取って代わり、アップストリームデータをHDFSの異なるディレクトリに格納します。 DateTimeBucketAssignerそのコアロジックはバケッティングで、デフォルトのバケッティングは処理時間によるバケッティングを意味する , です。処理時間とは、メッセージがFlinkプログラムに到着する時間のことで、これは要件を満たしていません。したがって、メッセージボディからイベント時間を解析し、ルールに従ってバケット名を生成するコードを書く必要があります:

package com.etl.kafka2hdfs;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
 *  @Created with IntelliJ IDEA.
 *  
 *  @Date: 
 *  @Time: 12:49
 *  
 */
public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
 @Override
 public String getBucketId(String element, Context context) {
 String partitionValue;
 try {
 partitionValue = getPartitionValue(element);
 } catch (Exception e) {
 partitionValue = "";
 }
 return "dt=" + partitionValue;//パーティションディレクトリ名
 }
 @Override
 public SimpleVersionedSerializer<String> getSerializer() {
 return SimpleVersionedStringSerializer.INSTANCE;
 }
 private String getPartitionValue(String element) throws Exception {
 // 最後にスプライスされた文字列のesフィールドの値を取る。
 long eventTime = Long.parseLong(element.split(",")[1]);
 Date eventDate = new Date(eventTime);
 return new SimpleDateFormat("yyyyMMdd").format(eventDate);
 }
}

MySQLデータをオフラインでリストア

上記の手順の後、Binlogログレコードは、HDFSの対応するパーティションに書き込むことができ、その後、増分データとストックデータに応じて最新のデータを復元する必要があります。Hiveテーブルは、ファイルシステムの変更をサポートしていないHDFSに格納されているため、データの変更を書き込むためにいくつかの追加の作業が必要です。一般的に使用される方法には、JOIN、Hiveトランザクション、またはHBase、kuduへの切り替えなどがあります。

もし昨日の株式データcode_city、code_city_deltaの今日の増分データは、FULL OUTER JOINすることができます、株式と増分データは、単一の最新のデータテーブルにマージされ、明日の株式データとして:

INSERT OVERWRITE TABLE code_city
SELECT 
 COALESCE( t2.id, t1.id ) AS id,
 COALESCE ( t2.city, t1.city ) AS city,
 COALESCE ( t2.province, t1.province ) AS province,
 COALESCE ( t2.event_time, t1.event_time ) AS event_time 
FROM
 code_city t1
 FULL OUTER JOIN (
SELECT
 id,
 city,
 province,
 event_time 
FROM
 (-- 最後の状態データを取得する
SELECT
 id,
 city,
 province,
 dml_type,
 event_time,
 row_number ( ) over ( PARTITION BY id ORDER BY event_time DESC ) AS rank 
FROM
 code_city_delta 
WHERE
 dt = '' -- パーティショニングされたデータ
 ) temp 
WHERE
 rank = 1 
 ) t2 ON t1.id = t2.id;

概要

この記事は、主にBinlogのストリーミング収集とBinlogベースのODSのデータ削減の2つの側面から、リアルタイムETLを実現するためにFlinkを介して導入されたBinlogのログに加えて、kudu、HBaseおよびトランザクション操作をサポートする他のNoSQLに書き込むことができますので、データテーブルの削減ステップを保存することができます。この記事は、データのリアルタイムインクリメンタル同期を達成するために運河とFlinkに基づいている"、2番目の記事は、カフカの実装ステップに書き込まBinlogログを解析運河については、"データのリアルタイムインクリメンタル同期を達成するために運河とFlinkに基づいて "を参照してください。

参照

[1CanalとFlinkに基づくリアルタイムインクリメンタルデータ同期

Read next

JVM-ランタイムデータ領域-ローカルメソッドインタフェース

簡単に言えば、ネイティブメソッドとは、Javaの非Javaコードインターフェイスを呼び出すもので、ネイティブメソッドとはそのようなJavaメソッドのことです。エクスター...

Dec 7, 2020 · 3 min read