blog

パートVIII|Flink 1.10 Hive統合クイックスタート

Hiveはビッグデータ分野で最も早く登場したSQLエンジンで、豊富な機能と幅広いユーザベースを備えています。Spark SQLやImpalaなどの後発のSQLエンジンは、ある程度Hiveと統合する機能...

Jun 30, 2020 · 5 min. read
シェア

Hiveはビッグデータ分野で最も早くから使われているSQLエンジンで、豊富な機能と幅広いユーザーベースを備えています。Spark SQLやImpalaなど、後に登場したSQLエンジンはいずれもHiveとの統合をある程度実現しており、ユーザーは既存のデータウェアハウスやジョブマイグレーションなどを簡単に利用できます。

Flinkは1.9以来、Hiveの統合をサポートしていますが、バージョン1.9はベータ版であり、本番環境での使用は推奨されていません。Flinkのバージョン1.10の最新バージョンでは、Blinkの統合をマークHiveとの本番レベルの統合の完了を発表し、Hiveは、データウェアハウスシステムの絶対的なコアとして、オフラインデータのETL計算とデータ管理の大部分を負担し、将来的にはHiveのFlinkの完璧なサポートを楽しみにしています。

HiveCatalogはHiveメタストアのインスタンスに接続され、メタデータの永続性を提供します。Flinkを使用してHiveと対話するには、ユーザーはHiveCatalogを構成し、HiveCatalogを介してHiveのメタデータにアクセスする必要があります。

依存関係の追加

Hive と統合するには、Table API プログラムまたは SQL Client で SQL と統合するために、追加の依存 jar パッケージを Flink の lib ディレクトリに追加する必要があります。あるいは、これらの依存関係をフォルダに配置し、それぞれTable APIプログラムまたはSQL Clientの-Cまたは-lオプションを使用してクラスパスに追加することもできます。この記事では、最初の方法、つまり jar パッケージを $FLINK_HOME/lib ディレクトリに直接コピーする方法を使用します。この記事では、Hiveのバージョン2.3.4(異なるバージョンのHiveについては、公式Webサイトを参照して、異なるjarパッケージの依存関係を選択できます)を使用しており、以下のように合計3つのjarパッケージが必要です:

  • flink-connector-hive_2.11-1.10.0.jar
  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
  • hive-exec-2.3.4.jar

剣を抜いたり、困惑して辺りを見回したりしないでください。

ビルダー

Mavenの依存関係の追加

<!-- Flink Dependency -->
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-hive_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
 <version>1.10.0</version>
 <scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
 <groupId>org.apache.hive</groupId>
 <artifactId>hive-exec</artifactId>
 <version>${hive.version}</version>
 <scope>provided</scope>
</dependency> 

コード例

package com.flink.sql.hiveintegration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
 *  @Created with IntelliJ IDEA.
 *  
 *  
 *  @Time: 13:22
 *  
 */
public class FlinkHiveIntegration {
 public static void main(String[] args) throws Exception {
 EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .useBlinkPlanner() // BlinkPlannerを使う
 .inBatchMode() // Batchモード、デフォルトはStreamingMode
 .build();
 //StreamingModeを使う
 /* EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .useBlinkPlanner() // BlinkPlannerを使う
 .inStreamingMode() // StreamingMode
 .build();*/
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 String name = "myhive"; // CatalogName、一意な名前を定義する
 String defaultDatabase = "qfbap_ods"; // デフォルトのデータベース名
 String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; // hive-site.xml 
 String version = "2.3.4"; // Hive 
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 tableEnv.registerCatalog("myhive", hive);
 tableEnv.useCatalog("myhive");
 // データベースの作成、現在のところハイブテーブルの作成はサポートしていない
 String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";
 tableEnv.sqlUpdate(createDbSql); 
 }
}

Flink SQL ClientHiveの統合

FlinkのテーブルAPIとSQL APIはSQLで書かれたクエリを扱うことができますが、これらのクエリはJavaまたはScalaで書かれたプログラムに組み込む必要があります。さらに、これらのプログラムはクラスタにコミットする際にビルドツールでパッケージ化する必要があります。このため、Java/ScalaプログラマによるFlinkの使用は多かれ少なかれ制限されます。

SQLクライアントは、JavaやScalaのコードを一行も記述することなく、テーブルプログラムの記述、デバッグ、およびFlinkクラスタへのコミットを簡単に行うことができるように設計されています。Flink SQLクライアントを使用してHiveにアクセスするには、sql-client-defaults.yamlファイルを設定する必要があります。

sql-client-defaults.yaml

現在、HiveTableSink はストリーミング書き込みをサポートしていません。実行モードをバッチ モードに変更する必要があります:

org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

修正が必要な設定は以下の通りです:

#...省略された設定項目...
#==============================================================================
# Catalogs
#==============================================================================
# カタログを設定する。.
catalogs: # empty list
 - name: myhive
 type: hive
 hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
 hive-version: 2.3.4
 default-database: qfbap_ods
#...省略された設定項目...
#==============================================================================
# Execution properties
#==============================================================================
# Properties that change the fundamental execution behavior of a table program.
execution:
 # select the implementation responsible for planning table programs
 # possible values are 'blink' (used by default) or 'old'
 planner: blink
 # 'batch' or 'streaming' execution
 type: batch

Flink SQL Cliの起動

bin/sql-client.sh embedded

起動後、このcliの下でSQLコマンドを実行することで、Hiveのテーブルにアクセスできます:

-- コマンドラインヘルプ
Flink SQL> help
-- カタログの現在のセッションを見る。,default_catalogデフォルトの
Flink SQL> show catalogs;
default_catalog
myhive
-- カタログを使う
Flink SQL> use catalog myhive;
-- カタログの現在のデータベースを見る
Flink SQL> show databases;
-- データベースを作る
Flink SQL> create database testdb;
-- データベースを削除する
Flink SQL> drop database testdb;
--  
Flink SQL> create table tbl(id int,name string);
--  
Flink SQL> drop table tbl;
--  
Flink SQL> select * from code_city;
-- データを挿入する
Flink SQL> insert overwrite code_city select id,city,province,event_time from code_city_delta ;
Flink SQL> INSERT into code_city values(1,'南京','江蘇','');

まとめ

この記事では、Flinkの最新バージョンを例に、FlinkとHiveのハンズオン統合について説明します。まず、コードによるHiveとの統合を紹介し、次にHiveにアクセスするためのFlink SQLクライアントの使い方を紹介し、そこで遭遇する落とし穴を説明し、最後にFlink SQL Cliの詳細な使い方を説明します。Flink SQLは今後のバージョンアップでますます完成度を高めていくと思いますので、今後のFlinkのHiveへの完璧な対応を楽しみにしています。

公共ビッグデータ技術とデジタルウェアハウス、ビッグデータ情報パッケージの受信情報を返信します。

Read next

CSSで、子要素が親要素のopacity属性を継承する問題を解決する方法。

解決策は、ここで2つのオプションは、rgba()の使用は、間接的に不透明度の値を設定し、この属性がダウンして継承されません、または不透明度が子要素によって継承されるので、その後、達成するために同じレベルの要素にopacity属性は、これらの2つの特定の次の例は言う

Jun 30, 2020 · 2 min read