- 背景
- ジョブごとのモデルの問題点
- アプリケーション・パターンの紹介
- プログラムによるタスクの提出
- アプリケーションパターンのソースコード解析
背景
現在、flinkには本番環境へのデプロイモードとして、セッションモードとジョブごとのモードがあります。
セッションモード
このモードでは、あらかじめyarnまたはk8s上でflinkクラスタを起動しておき、このクラスタにタスクを投入します。このモードでは、クラスタ内のタスクが同じリソースを使用するため、タスクに問題が発生してクラスタ全体がハングアップした場合、クラスタ内のすべてのタスクを再起動する必要があり、クラスタに大きな悪影響を及ぼします。
ジョブモデルごと
クラスタのリソースの分離を考慮すると、タスクの一般的な生産はジョブモードごとに選択します。つまり、各タスクはflinkクラスタを起動し、クラスタは互いに独立して実行され、互いに影響を与えず、各クラスタは構成から独立して設定できます。
ジョブごとのモデルの問題点
現在、ジョブごとのモデルでは、jarパッケージの解析とJobGraphの生成はクライアント上で実行され、生成されたJobGraphはクラスタにサブミットされます。多くの企業は独自のリアルタイム・コンピューティング・プラットフォームを持っており、ユーザはこれらのプラットフォームを使ってflinkタスクを投入することができます。タスクが特に大きい場合、JobGraphを生成してクラスタに投入するこれらの操作は、リアルタイム・プラットフォームがあるマシン上で実行されることになり、サーバに大きな負担をかけることになります。
さらに、このタスク投入モードでは、ローカルのflink all jarパッケージが最初に対応するテンポラリディレクトリのhdfsにアップロードされますが、これも多くのネットワークオーバーヘッドをもたらすため、タスクが特に大きい場合はプラットフォームのスループットが急落します。
アプリケーション・パターンの紹介
そこで、flinkのジョブごとのデプロイにおけるいくつかの問題を解決するために、flinkは新しいデプロイモードであるアプリケーションモードを導入しました。 現在、アプリケーションモードはYarnとK8sのデプロイをサポートしています。 Yarnアプリケーションモードは、ジョブの実行に必要なすべての依存関係をクライアント側のFlinkマスターにアップロードし、マスター側でジョブを投入します。
また、リモートユーザーのjarパッケージもサポートしており、例えば、hdfs上にjarを置くことができ、jarのアップロードに必要な時間をさらに短縮し、ジョブのデプロイ時間を短縮します。
具体的なコマンドは以下の通りです:
/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar
プログラムによるタスクの提出
あなたは、リアルタイムコンピューティングプラットフォームを行うには、プログラムを介してクラスタにタスクを送信する必要がある場合は、この時間は、クラスタへのflinkタスクの提出を実現するためにAPIのセットをパッケージ化する必要があり、主な生産環境はまだyarnの大部分ですので、今日はyarnクラスタにapiアプリケーションメソッドの方法でタスクを提出する方法について説明します。
関連する設定パラメータの定義
//flinkためにflinkの設定を取得するには、flinkクラスタのローカル設定ディレクトリを使用する必要がある。
String configurationDirectory = "/Users/user/work/flink/conf/";
//flinkクラスタ関連のjarパッケージディレクトリ
String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
//ユーザージャー
String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
String flinkDistJar = "hdfs://hadoopcluster/data/flink/libs/flink-yarn_2..0.jar";
- flinkの設定を取得
ここでは、実際にそのようなヤーンキュー名など、多くの設定パラメータを設定することができます、あなたは自分のニーズに応じて設定することができます。
// flinkの設定を取得する
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
//アプリケーションモードを設定する
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
//yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");
.........
- ユーザー・ジャーのパラメータとメイン・クラスの設定
// ユーザーjarとメインクラスのパラメータを設定する
ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
- クラスタへのタスク投入
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
yarnClient,
clusterInformationRetriever,
true);
ClusterClientProvider<ApplicationId> clusterClientProvider = null;
try {
clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
} catch (ClusterDeploymentException e){
e.printStackTrace();
}
アプリケーションパターンのソースコード解析
スクリプトを介して上記で提出されたflink binディレクトリから入口を参照してください起動するflinkコマンドの下で、このファイルのコードの最後の行を見て、つまり、入口のクラスのタスクを提出する:org.apache.flink.client.cli.flinkは、ヤーンクラスタにタスクを提出する方法です。flinkはyarnクラスタにタスクを投入する方法です。
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
入り口
CliFrontendのメインメソッドでは、多くのことが行われています。
- flinkの設定ディレクトリを取得
- flink をロードするための設定
- コマンドライン引数の読み込みと解析
- 特定の操作は、CliFrontend.parseParametersメソッドを通して実行されます。
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
System.exit(retCode);
}
特定のアクションを実行
parseParametersメソッドでは、実行される操作が解析され、switchを使用して実行されるメソッド(この場合はrunApplicationメソッド)に移動します。
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
..........
}
runApplication
このメソッドでは、主に、入力されたコマンドラインパラメータを使用して、フリンクのコンフィギュレーションオブジェクトConfigurationと、アプリケーションモードで必要なコンフィギュレーションApplicationConfigurationを構築します。
// flinkの設定オブジェクトは、コマンドラインパラメータから渡された構築される。
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
//エントリクラスとjarパッケージのパラメータを含む設定を構築する ApplicationConfiguration
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
deployer.run(effectiveConfiguration, applicationConfiguration);
ClusterDescriptorの構築
アプリケーションクラスタの展開
ここでは例として、YarnClusterDescriptor#deployApplicationClusterメソッドに、このメソッドでは、いくつかの簡単なチェックの後を参照してください、プライベートメソッドを呼び出すYarnClusterDescriptor#deployInternal、このdeployInternalは、メソッドを提供するパブリック関数は、他のデプロイメントモードを見ることができます、yarnセッションモードは、ジョブモードごとに、このメソッドは、単にパラメータを呼び出されます。deployInternalは、このdeployInternalは、メソッドを提供するためのパブリック関数は、他のデプロイメントモードを見ることができます、yarnセッションモードは、ジョブモードごとに、このメソッドと呼ばれ、パラメータが異なるだけです。
その方法を簡単に説明します:
/**
* This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
*
* @param clusterSpecification いくつかの設定パラメータ
* @param applicationName yarn job
* @param yarnClusterEntrypoint
* @param jobGraph アプリケーションのjobGraphは空にすることができる。
* @param detached 分離モード
*/
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
この方法では、デプロイモードに応じて必要なチェックを行い、yarnコンテナの動作を開始します。例えば、ジョブモードごとに、flink jarパッケージのアップロードなどがこのメソッドで行われます。さらに、このメソッドはApplicationMaster/JobManagerが正常にデプロイされるまでブロックし続け、ユーザプログラムを実行するためのエントリクラスApplicationClusterEntryPointに入ります。
アプリケーションクラスターエントリポイント
ヤーンコンポーネントの起動が完了すると、ユーザーのプログラムの実行を開始します:
- 必要なジャーやリソースのダウンロード
- リーダー選挙を実施し、主要メソッドの運営者を決定します。
- ユーザープログラム終了時のクラスタ終了
- HAとフォールトトレランスの保証
アプリケーションモードは、yarnクラスタにタスクを送信するには、おおよそのプロセスは、まずここで、flinkのタスクの実行プロセスを話すだろう、導入に専用の記事を書くことに続いて。





