- 背景
- 代表例
- ソースコードの解析
背景
1.11.0では、リレーショナルデータベースの読み書きや変更ログの読み込みをFlinkのソース/シンクに依存している場合、対応するスキーマを手動で作成する必要がありますが、問題があります。データベースのスキーマが変更されると、型の一致を保つために対応するFlinkタスクを手動で更新する必要があり、不一致があると実行時エラーでジョブが失敗します。不一致があると、ジョブは実行時エラーで失敗します。この操作は冗長で面倒です。
実際、Flinkに接続する外部システムはどれも上記のような問題を抱えている可能性があり、1.11.0ではリレーショナルデータベースとのインターフェースの問題を解決することに重点が置かれています。JDBCカタログの基本的なインターフェイスとPostgresカタログの実装が提供されており、将来的に他の種類のリレーショナルデータベースとのインターフェイスが容易になります。
バージョン1.11.0以降では、Flink SQLを使用する際、DDLを入力する代わりにテーブルスキーマを自動的に取得することができます。また、スキーマのミスマッチエラーは事前にチェックされ、コンパイル段階で報告されるため、実行時エラーによるジョブの失敗を回避することができます。
代表例
- pomの紹介
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version></version>
</dependency>
String catalogName = "mycatalog";
String defaultDatabase = "postgres";
String username = "postgres";
String pwd = "postgres";
String baseUrl = "jdbc:postgresql://localhost:5432/";
PostgresCatalog postgresCatalog = (PostgresCatalog) JdbcCatalogUtils.createCatalog(
catalogName,
defaultDatabase,
username,
pwd,
baseUrl);
postgresデータベースにアクセスしてテーブル名を指定する際のフルパス名は、以下の形式でなければなりません:
<catalog>.<db>.`<schema.table>`
schema がデフォルトで public である場合、デフォルト値を使用する場合は public を省略できます。例えば、以下のクエリ文:
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
デフォルト以外のスキーマの場合は省略できません:
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
- 一般的な操作
tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tEnv.useCatalog(postgresCatalog.getName());
- すべてのデータベースをリストアップします:
System.out.println("list databases :");
String[] databases = tEnv.listDatabases();
Stream.of(databases).forEach(System.out::println);
- 全てのテーブルをリストします。
tEnv.useDatabase(defaultDatabase);
System.out.println("list tables :");
String[] tables = tEnv.listTables(); // postgresCatalogを使用することもできる。.listTables(defaultDatabase);
Stream.of(tables).forEach(System.out::println);
- 全ての関数のリスト
System.out.println("list functions :");
String[] functions = tEnv.listFunctions();
Stream.of(functions).forEach(System.out::println);
- テーブルのスキーマを取得します。
CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
defaultDatabase,
"table1"));
TableSchema tableSchema = catalogBaseTable.getSchema();
System.out.println("tableSchema --------------------- :");
System.out.println(tableSchema);
- テーブルデータの照会
List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
.execute()
.collect());
results.stream().forEach(System.out::println);
- データの挿入
tEnv.executeSql("insert into table1 values (3,'c')");
ソースコードの解析
AbstractJdbcCatalog
このクラスは、主にjdbcカタログいくつかのパブリック操作で抽象化を行うことです。現在1つだけのメソッドの実際の関数:getPrimaryKeyは、他の方法は、主にクラスの特別な処理のいくつかの他の実装のカタログのため、そのような同様のテーブルを作成したり、テーブルを変更するようにサポートされていません、jdbcカタログの使用は、主にいくつかのDML操作を行うことですので、listViewは、単に空のリストを返します。いくつかのDML操作を行うことです。
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
PostgresCatalog
この中で、主にgetTable、listTables、listDatabasesなど、オペレーティングデータベースのいくつかの一般的に使用されるメソッドの実装では、実際には、簡単に言えば、それはpostgresのメタデータベースから対応する情報をクエリすることであり、その後、呼び出し元に返されるflink関連のオブジェクトに組み立てます。単純なメソッド listDatabases を例にとってみましょう:
メタデータテーブルpg_databaseから全てのテーブル名を照会、組み込みデータベース、つまりtemplate0とtemplate1を削除し、リストオブジェクトにカプセル化して返します。
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
.........................





