blog

Flink SQLにおけるJava SPIメカニズム

JavaのSPIメカニズム、つまりJava Service Provider Interfaceは、Javaが提供する「インターフェースプログラミング+ポリシーモデル+設定ファイル」の組み合わせによる...

Mar 26, 2020 · 11 min. read
シェア

Java SPIメカニズム入門

Java SPIのメカニズムは、Javaのサービスプロバイダのインターフェイスは、Javaに基づいている動的ロード機構の "インターフェイスプログラミング+ポリシーモデル+設定ファイル "の組み合わせを提供します。呼び出し元が有効にする必要性の実際の使用に基づいて、拡張またはポリシーの既存の実装のフレームワークを置き換えることができます。Javaでは、SPIの考えに基づいて、ServiceLoaderの特定の実装を提供し、このクラスの使用は簡単にサービス指向の登録と発見を達成することができます、サービスの提供と使用のデカップリングを完了します。

などのJava SPIのメカニズム一般的な例、:

  • データベースドライバインターフェイスの実装クラスのロード:JDBCは、OracleDriver、SQLServerDriver、ドライバなどのデータベースドライバのさまざまな種類のロードの実際の使用に基づいてすることができます。
  • slf4jログファサードインターフェイス実装クラスのロード:slf4jログファサードは、ロギングフレームワークではありません、あなたは、Log4j、Logbackなどのロギングフレームワークのバインディングを完了するために、適格なロギングフレームワークのインターフェイスの実装クラスをロードするJava SPIメカニズムを使用する必要があります。

FlinkアプリケーションのJava SPIメカニズム

特別な注意:この記事は、flinkソースコードバージョン1.9を扱っています。

tEnv.connect(new Kafka().version('0.11').bar(topic).foo().properties(props)).c(schema).b(format).a('record');

上記の手順でKafkaとの接続を確立し、読み取りデータの構造とフォーマットを指定し、最後にregisterTableSourceを使用してテーブルソースの登録作業を完了します。コードをたどると、内部で TableFactoryService#find() メソッドを呼び出して対象となる TableSourceFactory インスタンスを見つけ、createTableSource() メソッドを呼び出して Kafka011TableSource インスタンスを作成していることがわかります。

# TableFactoryUtil.java
private static <T> TableSource<T> findAndCreateTableSource(Map<String, String> properties) {
 try {
	return TableFactoryService
	 .find(TableSourceFactory.class, properties)
	 .createTableSource(properties);
 } catch (Throwable t) {
	throw new TableException("findAndCreateTableSource failed.", t);
 }
}
# TableFactoryService
public static <T extends TableFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
 return findSingleInternal(factoryClass, propertyMap, Optional.empty());
}
# TableFactoryService.java
private static <T extends TableFactory> T findSingleInternal(
 Class<T> factoryClass,Map<String, String> properties,Optional<ClassLoader> classLoader) {
 
 List<TableFactory> tableFactories = discoverFactories(classLoader);
 List<T> filtered = filter(tableFactories, factoryClass, properties);
 ...
}

TableFactoryService#findSingleInternal()メソッドの内部では、主に2つのメソッドを使用していることがわかります。() メソッドは、条件を満たす TableFactory 実装クラスをフィルタリングするために使用されます。明らかに、Java SPI メカニズムの使用は discoverFactories() メソッドの内部です。

#TableFactoryService.java
private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
 try {
 List<TableFactory> result = new LinkedList<>();
 if (classLoader.isPresent()) {
 ServiceLoader
 .load(TableFactory.class, classLoader.get())
 .iterator()
 .forEachRemaining(result::add);
 } else {
 defaultLoader.iterator().forEachRemaining(result::add);
 }
	return result;
 } catch (ServiceConfigurationError e) {
 LOG.error("Could not load service provider for table factories.", e);
 throw new TableException("Could not load service provider for table factories.", e);
 }
}
private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);

defaultLoaderは静的なクラス変数であり、このためFlink SQL 1.9のコードにはバグがあるかもしれません。

public static <S> ServiceLoader<S> load(Class<S> service) {
 ClassLoader cl = Thread.currentThread().getContextClassLoader();
 return ServiceLoader.load(service, cl);
}
public static <S> ServiceLoader<S> load(Class<S> service,ClassLoader loader) {
 return new ServiceLoader<>(service, loader);
}
# service => TableFactory, loader => AppClassLoader, acc => null
private ServiceLoader(Class<S> svc, ClassLoader cl) { 
 service = Objects.requireNonNull(svc, "Service interface cannot be null");
 loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
 acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
 reload();
}

ServiceLoaderのコンストラクタでは、service、loader、acc変数の代入が完了しています。

// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
// The current lazy-lookup iterator
private LazyIterator lookupIterator;
public void reload() {
 providers.clear();
 lookupIterator = new LazyIterator(service, loader);
}

reload()メソッドでは、まずproviders変数に格納されているデータがクリアされ、LazyIteratorのインスタンスが作成されます。providers変数には、servicesフォルダ内のTableFactoryの実装クラスのインスタンスが格納されています。プロバイダの検索を完全に遅らせます。

private class LazyIterator implements Iterator<S> {
 Class<S> service;
 ClassLoader loader;
 Enumeration<URL> configs = null; # プロジェクト内のすべての依存関係の名前を保存するために使用される
 Iterator<String> pending = null; # 各依存のサービスフォルダを保持する TableFactory 実装クラスのフルパス名
 String nextName = null; # 現在の TableFactory 実装クラスのフルパス名。
 
 # service -> TableFactory, loader -> AppClassLoader
 private LazyIterator(Class<S> service, ClassLoader loader) {
 this.service = service;
 this.loader = loader;
 }
 private boolean hasNextService() {
 ...
 }
 private S nextService() {
 ...
 }
 public boolean hasNext() {
 ...
 }
 public S next() {
 ...
 }
 public void remove() {
 	...
 }
}

defaultLoader 変数を見た後、リストの下に進みます。

defaultLoader.iterator().forEachRemaining(result::add);
# ServiceLoader.java
public Iterator<S> iterator() {
 return new Iterator<S>() {
 Iterator<Map.Entry<String,S>> knownProviders 
 = providers.entrySet().iterator();
 public boolean hasNext() {
 if (knownProviders.hasNext())
 return true;
 return lookupIterator.hasNext();
 }
 public S next() {
 if (knownProviders.hasNext())
 return knownProviders.next().getValue();
 return lookupIterator.next();
 }
 public void remove() {
 throw new UnsupportedOperationException();
 }
 };
}

defaultLoader.iterator() メソッドが Iterator インターフェイスの内部クラスを作成し、knownProviders のインスタンスを作成して、hasNext()、next()、remove() などのメソッドを提供していることがわかります。iterator() メソッドを見た後は、forEachRemaining() に進みます。

#Iterator.java
default void forEachRemaining(Consumer<? super E> action) {
 Objects.requireNonNull(action);
 while (hasNext())
 action.accept(next());
}

このメソッドでは、前述の Iterator インターフェイスの内部クラスの hasNext() メソッドや next() メソッドを実際に呼び出しています。まず、hasNext() メソッドの実装を見てみましょう。

public boolean hasNext() {
 # プログラムは初めてTableFactory実装クラスを探すので、プロバイダは最初にclear()で処理される。
 # 一方、knownProvidersは = providers.entrySet().iterator();
 # つまり、既知のプロバイダーということだ。.hasNext()現在のTableFactory実装クラスのセットのクエリ中は偽である。
 # lookupIteratorを入力する.hasNext() 
 if (knownProviders.hasNext())
 return true;
 return lookupIterator.hasNext();
}
# lookupIterator
public boolean hasNext() {
 # Flink SQL が TableFactory インターフェース実装クラスに照会るとき、acc は常に null になる。
 if (acc == null) {
 return hasNextService();
 } 
 ...
}
# lookupIterator
private boolean hasNextService() {
 # nextName 次にクエリされる TableFactory 実装クラスのフルパス名を示す。
 if (nextName != null) {
 return true;
 }
 # プログラムが最初にTableFactory実装クラスを探したときは、NULLだった。
 if (configs == null) {
 try {
 # PREFIX = META-INF/services/
 # service.getName()はTableFactoryのフルパス名である。
 # また、JavaのSPIメカニズムがインターフェースの実装クラスを読み取ることもここで説明されている。
 String fullName = PREFIX + service.getName();
 # クラスローダーを使って、パスに基づいてリソース情報をロードする。
 # を含むすべてのプロジェクトにロードされる。-INF/services/org.apache.flink.table.factories.TableFactoryのdependency jarアドレスである。
 # classLoaderといった情報が変数configsに保存される。
 # システムが jar 内の TableFactory 実装クラスをインスタンス化するとき、configs を渡す。.next()メソッドは、次のjarにあるサービス・ファイルの内容を読み込む。
 # configsデータ構造を以下に示す。
 if (loader == null)
 configs = ClassLoader.getSystemResources(fullName);
 else
 configs = loader.getResources(fullName);
 } catch (IOException x) {
 fail(service, "Error locating configuration files", x);
 }
 }
 
 # pending変数は、依存 jar から読み込んだ TableFactory 実装クラスのフルパス名を格納するために使用される。
 # このイテレーター・データ・インターフェイスは、ペンディング中の.next() 
 # そして、nextNameに結果のTableFactory実装クラスのフルパス名を代入する。
 # ここで pending が NULL の場合、TableFactory インターフェース実装クラスの読み取りが初めて実行されることを示す。
 # !pending.hasNext() = trueということは、依存関係のあるjarのservicesフォルダの内容が読み込まれるときに
 # 次の依存の壷から読み続けたい。
 while ((pending == null) 
 !pending.hasNext()) {
 # すべての依存関係が完了したら、コンフィグを実行する。.hasMoreElements()は偽を返す。
 # 今回で、この一連のTableFactory実装クラス・クエリーは終了ということでもある。
 if (!configs.hasMoreElements()) {
 return false;
 }
 # parse()メソッドは、依存jarのservicesフォルダにあるTableFactoryインターフェイス実装クラスのフルパス名を読み込み、pending変数に保存するために使用される。
 # このメソッドで注意すべき点は、TableFactoryインターフェイス実装クラスのフルパス名Aがすでにプロバイダに格納されている場合、たとえ現在の依存関係jarがまだそのフルパス名Aを含んでいても
 # この場合、フルパス名Aは保留変数に追加されない。これにより、プロバイダに格納されているTableFactory実装クラスの一意なインスタンスが確保される。
 # 複数の依存関係のservicesフォルダに、同じ実装クラスのフルパス名が含まれていても、である。
 pending = parse(service, configs.nextElement());
 }
 # pendingに保存されているTableFactoryインターフェイス実装クラスのフルパス名を読み込み、nextName変数に保存する。
 nextName = pending.next();
 return true;
}
この hasNext() メソッドでは、以下の処理を行います:
  • リソースのパス名に基づいてクラスローダを使用してリソース情報をロードし、それを configs 変数に代入します。
  • configs 変数から依存 jar を取得し、この依存 jar から TableFactory インターフェイスのフルパス名を読み取り、これらのフルパス名を pending 変数に保存します。
  • pending 変数から TableFactory インターフェイスのフル パス名を取得し、next() メソッドで使用するために nextName 変数に保存します。

次に、next() メソッドを見てみましょう。

public S next() {
 if (knownProviders.hasNext())
 return knownProviders.next().getValue();
 return lookupIterator.next();
}
# lookupIterator
public S next() {
 # Flink SQL が TableFactory インターフェース実装クラスに照会るとき、acc は常に null になる。
 if (acc == null) {
 return nextService();
 }
 ...
}
# lookupIterator
private S nextService() {
 # hasNextService()メソッドは上記のもので、今回はnextNameである。 != null,であれば、trueを返す。
 if (!hasNextService())
 throw new NoSuchElementException();
 String = nextName;
 # nextNameTableFactory インターフェース実装クラスのフルパス名の次の代入に null を代入する。
 nextName = null;
 Class<?> c = null;
 ...
 # クラスを利用する.forName()リフレクションは、TableFactory インターフェース実装クラスのフルパス名に基づいている。
 # そして、フルパス名に基づいてクラスをインスタンス化する。
 c = Class.forName, false, loader);
 ...
 S p = service.cast(c.newInstance());
 # インスタンス化されたTableFactoryインターフェイス実装クラスをproviders変数に保存する。
 providers.put, p);
 return p;
 ... 
}

これで、TableFactory インターフェイス実装クラスの 1 つの検索が終了しました。次にTableFactoryインターフェイス実装クラスのクエリ作業の数で依存jarであり、それは現在のプロジェクトは、すべてのMETA-INF/services/org.apache.flink.table.factoryが含まれています。一般的な作業は同じであり、ここでは繰り返されません。

Flink SQL 1.9のJavaのSPI機構の問題点

Flinkには対応するコネクタジョブflink-connector-A、flink-connector-Bがあり、どちらもMETA-INF/services/org.apache.flink.table.Factories.TableFactoryファイルを含んでいます。Factories.TableFactoryファイルがあります。このとき、同じクラスタ内で、flink-connector-Aから消費するFlink SQLプログラム1を起動し、flink-connector-2から消費するFlink SQLプログラム2を起動すると、次のような例外が発生します:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
 Could not find a suitable table factory for
 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
...
50395:The following factories have been considered:
...
51110:org.apache.flink.streaming.connectors.kafka.A
...

明らかに、Flink SQL プログラム 2 が照会した TableFactory インターフェイスの実装クラス情報は、実際には Flink SQL プログラム 1 が照会した情報です。この問題の原因は defaultLoader 変数にあります。

# TableFactoryService
private static final ServiceLoader<TableFactory> defaultLoader = ServiceLoader.load(TableFactory.class);

TableFactoryService クラスには、static で変更される内部変数とメソッドがあります。Flink SQL プログラム 1 で TableFactory インタフェース実装クラスを調べた後、defaultLoader 変数は変数 providers を参照し、その変数 providers にはその時点で読み込まれた TableFactory インタフェースのインスタンスクラスが格納されています。Flink SQL プログラム 2 が TableFactory インタフェース実装クラスを検索する準備ができたら、Flink SQL プログラム 1 の defaultLoader を直接使用します。

defaultLoader.iterator().forEachRemaining(result::add);
default void forEachRemaining(Consumer<? super E> action) {
 Objects.requireNonNull(action);
 while (hasNext())
 action.accept(next());
}
public Iterator<S> iterator() {
 return new Iterator<S>() {
 # この時点でknownProvidersによって保存されているデータは、Flink SQLプログラム1によって読み込まれたTableFactoryインターフェース実装クラスのインスタンスである。
 Iterator<Map.Entry<String,S>> knownProviders
 = providers.entrySet().iterator();
 public boolean hasNext() {
 # この時点で、kownProviders.hasNext() 为true
 if (knownProviders.hasNext())
 return true;
 return lookupIterator.hasNext();
 }
 public S next() {
 # この時点で、knownProviders.hasNext() 为true
 if (knownProviders.hasNext())
 # knownProvidersからTableFactoryインターフェイス実装クラスのインスタンスを読み込む
 return knownProviders.next().getValue();
 return lookupIterator.next();
 }
 public void remove() {
 throw new UnsupportedOperationException();
 }
 };
}

この時点で、Flink SQL プログラム 2 が、Flink SQL プログラム 1 から照会された TableFactory インタフェース実装クラスデータを読み込む理由は明らかです。ありがたいことに、このバグはFlink 1.10で修正されました。

private static List<TableFactory> discoverFactories(Optional<ClassLoader> classLoader) {
 try {
	 List<TableFactory> result = new LinkedList<>();
	 ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader());
	 ServiceLoader
	 .load(TableFactory.class, cl)
	 .iterator()
	 .forEachRemaining(result::add);
	 return result;
	} catch (ServiceConfigurationError e) {
	 LOG.error("Could not load service provider for table factories.", e);
	 throw new TableException("Could not load service provider for table factories.", e);
	}
}

記事の終わり

Read next

大きな整数の加算、減算、乗算、除算

大きな数の格納\n配列の添え字は、大きな数の下位桁から上位桁までを格納するために使用されます。\n\n大きな整数の加算\n\n加算処理\nA[i] + B[i] + t は各位の和。\nsum % 10は結果の各位の数、sum / 10は次の位の数。

Mar 26, 2020 · 3 min read