優先順位
- Jdbc: Jdbcメソッドでは、大量のデータを書き込むと非常に時間がかかります。
- Greenplum-Spark Connector:Spark並列処理をベースとし、Greenplumへの並列書き込み、並列読み込みインターフェースを提供。この記事の次の部分の焦点でもあります。
Greenplum-Spark Connector データアーキテクチャを読む
読み方は以下の通り:
- Spark Driverはjdbc経由でGreenplumマスターに接続し、指定されたテーブルのメタデータを読み込みます。そして、指定されたパーティションフィールドとパーティション数に基づいて、セグメントをどのように割り当てるかを決定します。
Greenplum-Spark Connector 書き込みデータフロー
CREATE READABLE EXTERNAL TABLE"public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42" (LIKE "public"."rank_a1")LOCATION ('gpfdist://.145:44772/spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42')FORMAT 'CSV'(DELIMITER AS '|' NULL AS '')ENCODING 'UTF-8'
3.GSCはSpark Executor側でjdbc経由でGreenplumマスターに接続し、データの元となる実テーブルへのinsert文を実行します:
INSERT INTO "public"."rank_a1"SELECT *FROM "public"."spark_9dc823a6fa48df60_3d9d854163f8f07a_1_42"
この外部テーブルからのデータが現在のエクゼキュータ・サーバに着地するかどうかは不明です。おそらくエクゼキュータ・サーバ上にはなく、Http経由で直接Greenplumセグメントに渡されるのだと思います。
Greenplum-Spark Connector
greenplum-spark_2.11-1.6.2.jar
2.Mavenで導入:
<dependency> <groupId>io.pivotal.greenplum.spark</groupId> <artifactId>greenplum-spark_2.11</artifactId> <version>1.6.2</version> </dependency>
3.スパークコミットの紹介
Greenplum-Spark Connector
Greenplumからのデータ読み込み
1.DataFrameReader.load()メソッド:
val gscReadOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table1", "partitionColumn" -> "id")val gpdf = spark.read.format("greenplum") .options(gscReadOptionMap) .load()
2. spark.read.greenplum()メソッド:
val url = "jdbc:postgresql://gpmaster.domain:15432/tutorial"val tblname = "avgdelay"val jprops = new Properties()jprops.put("user", "user2")jprops.put("password", "changeme")jprops.put("partitionColumn", "airlineid")val gpdf = spark.read.greenplum(url, tblname, jprops)
しかし、この方法では暗黙の変換を導入する必要があり、これも公式サイトには記載されていません。
Greenplumへのデータ書き込み
1.データの例を書きます:
val gscWriteOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table2",)dfToWrite.write.format("greenplum") .options(gscWriteOptionMap) .save()
GSC経由でGreenplumテーブルに書き込む際、テーブルが既に存在する場合、あるいはテーブルに既にデータが存在する場合、その出力モードはDataFrameWriter.mode(SaveMode savemode)で指定できます。関連するモードの動作は以下の通りです:
2.GSC自動テーブル構築
2.1 作成されたGreenplumテーブルは、GSCによって生成された以下のテーブル作成ステートメントに示されるように、分配列を持ちません:
CREATE TABLE "public"."rank_a1" ("id" INTEGER NOT NULL, "rank" TEXT, "year" INTEGER NOT NULL, "gender" INTEGER NOT NULL, "count" INTEGER NOT NULL);
2.2 作成されたGreenplumテーブルのフィールド名は、Spark DataFrameのフィールド名を使用します。
2.3 GSC が自動的にテーブルを作成する際、フィールド名に二重引用符が追加され、Greenplum は大文字と小文字を区別します。
3.事前に手動でテーブルを作成
3.1 Spark DataFrameのフィールド名からGreenplumテーブルの対応するフィールドにデータを書き込みます。GSCは大文字と小文字を区別してマッピングを行います。
3.2 Greenplumに書き込まれるフィールドのデータ型は、対応するSpark DataFrameと一致しています。
3.3 スパークデータの列がNULLデータを含む場合、対応するGreenplumテーブルの列がNOT NULLとして指定されていないことを確認してください。
3.4 Greenplumテーブルは、Spark DataFrameとは異なる順序のフィールドで構築することができます。しかし、GreenplumテーブルはSpark DataFrameに存在しないフィールドを含むことはできません。以下に例を示します:
// Greenplum CREATE TABLE publicのフィールド.rank_a1 ( id int4 NOT NULL, "rank" text NULL, "year" int4 NOT NULL, gender int4 NOT NULL, count int4 NOT NULL)DISTRIBUTED BY (id);// Spark DataFrameフィールド var df = Seq((2, "a|b", 2, 2, 2),(3, "a|b", 3, 3, 3)).toDF("id", "rank", "year", "gender")// データをパブリックに書き込む場合.rank_a1テーブルは以下のエラーを報告する Thread "main" javaの例外.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.Old column names (5): _1, _2, _3, _4, _5New column names (4): id, rank, year, gender at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:435) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:44) at com.lt.spark.greenplum.GreenplumWrite$.main(GreenplumWrite.scala:14) at com.lt.spark.greenplum.GreenplumWrite.main(GreenplumWrite.scala)
3.5 指定されたユーザがテーブルに対して読み取り権限と書き込み権限を持っていること、およびテーブルが自動的に構築され、テーブルを構築するための権限が必要であることを確認します。
Troubleshooting
1.港湾関連問題
2.グリーンプラムの接続問題
疲労困憊のプロセス:
2.1 Greenplumデータの最大接続数を照会ます:
postgres=# show max_connections; max_connections----------------- 250(1 row)
2.2 Greenplumデータベースへの現在の接続数を照会ます:
postgres=# SELECT count(*) FROM pg_stat_activity;
2.3 指定されたユーザーのGreenplumデータへの接続数を照会します:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE datname='tutorial';postgres=# SELECT count(*) FROM pg_stat_activity WHERE usename='user1';
2.4 Greenplum データベースへの空き接続数とアクティブ接続数を照会ます:
postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query='<IDLE>';postgres=# SELECT count(*) FROM pg_stat_activity WHERE current_query!='<IDLE>';
2.5 クエリ接続 Greenplumデータベース名、ユーザ名、クライアントアドレス、クライアントIP、現在のクエリ文:
postgres=# SELECT datname, usename, client_addr, client_port, current_query FROM pg_stat_activity;
3.Greenplumデータベースのデータ長エラー
Greenplum 4.x または 5.x を使用している場合、"data line too long" エラーが発生することがあります。これは、Greenplum データベースのパラメータ "gp_max_csv_line_length" のデフォルト値が 1M であるためで、このパラメータ値を変更するには、Greenplum マスターにログインする必要があります。このパラメータの値を変更するには、Greenplumマスターにログインする必要があります。 例は次のとおりです。gpconfigでこのパラメータの値を5Mに変更します:
gpadmin@gpmaster$ gpconfig -c gp_max_csv_line_length -v 5242880gpadmin@gpmaster$ gpstop -u
タイプマッピング表
1.グリーンプラム→スパーク
2.スパーク→グリーンプラム




