blog

flinkのチュートリアル - 新しい展開モードでflink 1.11を説明する - アプリケーションモード

現在、flinkのために、本番環境では、一般的に2つのデプロイモード、セッションモード、ジョブごとのモードを持っています。 このモードは、事前にyarnまたはk8sでflinkクラスタを起動し、タスク...

Oct 23, 2020 · 7 min. read
シェア
  • 背景
  • ジョブごとのモデルの問題点
  • アプリケーション・パターンの紹介
  • プログラムによるタスクの提出
  • アプリケーションパターンのソースコード解析

背景

現在、flinkには本番環境へのデプロイモードとして、セッションモードとジョブごとのモードがあります。

セッションモード

このモードでは、あらかじめyarnまたはk8s上でflinkクラスタを起動しておき、このクラスタにタスクを投入します。このモードでは、クラスタ内のタスクが同じリソースを使用するため、タスクに問題が発生してクラスタ全体がハングアップした場合、クラスタ内のすべてのタスクを再起動する必要があり、クラスタに大きな悪影響を及ぼします。

ジョブモデルごと

クラスタのリソースの分離を考慮すると、タスクの一般的な生産はジョブモードごとに選択します。つまり、各タスクはflinkクラスタを起動し、クラスタは互いに独立して実行され、互いに影響を与えず、各クラスタは構成から独立して設定できます。

ジョブごとのモデルの問題点

現在、ジョブごとのモデルでは、jarパッケージの解析とJobGraphの生成はクライアント上で実行され、生成されたJobGraphはクラスタにサブミットされます。多くの企業は独自のリアルタイム・コンピューティング・プラットフォームを持っており、ユーザはこれらのプラットフォームを使ってflinkタスクを投入することができます。タスクが特に大きい場合、JobGraphを生成してクラスタに投入するこれらの操作は、リアルタイム・プラットフォームがあるマシン上で実行されることになり、サーバに大きな負担をかけることになります。

さらに、このタスク投入モードでは、ローカルのflink all jarパッケージが最初に対応するテンポラリディレクトリのhdfsにアップロードされますが、これも多くのネットワークオーバーヘッドをもたらすため、タスクが特に大きい場合はプラットフォームのスループットが急落します。

アプリケーション・パターンの紹介

そこで、flinkのジョブごとのデプロイにおけるいくつかの問題を解決するために、flinkは新しいデプロイモードであるアプリケーションモードを導入しました。 現在、アプリケーションモードはYarnとK8sのデプロイをサポートしています。 Yarnアプリケーションモードは、ジョブの実行に必要なすべての依存関係をクライアント側のFlinkマスターにアップロードし、マスター側でジョブを投入します。

また、リモートユーザーのjarパッケージもサポートしており、例えば、hdfs上にjarを置くことができ、jarのアップロードに必要な時間をさらに短縮し、ジョブのデプロイ時間を短縮します。

具体的なコマンドは以下の通りです:

/bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost/flink/libs" \
hdfs://localhost/user-jars/HelloWold.jar

プログラムによるタスクの提出

あなたは、リアルタイムコンピューティングプラットフォームを行うには、プログラムを介してクラスタにタスクを送信する必要がある場合は、この時間は、クラスタへのflinkタスクの提出を実現するためにAPIのセットをパッケージ化する必要があり、主な生産環境はまだyarnの大部分ですので、今日はyarnクラスタにapiアプリケーションメソッドの方法でタスクを提出する方法について説明します。

  • 関連する設定パラメータの定義

 //flinkためにflinkの設定を取得するには、flinkクラスタのローカル設定ディレクトリを使用する必要がある。
 String configurationDirectory = "/Users/user/work/flink/conf/";
 //flinkクラスタ関連のjarパッケージディレクトリ
 String flinkLibs = "hdfs://hadoopcluster/data/flink/libs";
 //ユーザージャー
 String userJarPath = "hdfs://hadoopcluster/data/flink/user-lib/TopSpeedWindowing.jar";
 String flinkDistJar = "hdfs://hadoopcluster/data/flink/libs/flink-yarn_2..0.jar";
  • flinkの設定を取得

ここでは、実際にそのようなヤーンキュー名など、多くの設定パラメータを設定することができます、あなたは自分のニーズに応じて設定することができます。

// flinkの設定を取得する
 Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
 configurationDirectory);
 
 //アプリケーションモードを設定する
 flinkConfiguration.set(
 DeploymentOptions.TARGET,
 YarnDeploymentTarget.APPLICATION.getName());
 //yarn application name
 flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobName");
 
 .........
  • ユーザー・ジャーのパラメータとメイン・クラスの設定
// ユーザーjarとメインクラスのパラメータを設定する
 ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
  • クラスタへのタスク投入
 YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
 flinkConfiguration,
 yarnConfiguration,
 yarnClient,
 clusterInformationRetriever,
 true);
 ClusterClientProvider<ApplicationId> clusterClientProvider = null;
 try {
 clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
 clusterSpecification,
 appConfig);
 } catch (ClusterDeploymentException e){
 e.printStackTrace();
 }

アプリケーションパターンのソースコード解析

スクリプトを介して上記で提出されたflink binディレクトリから入口を参照してください起動するflinkコマンドの下で、このファイルのコードの最後の行を見て、つまり、入口のクラスのタスクを提出する:org.apache.flink.client.cli.flinkは、ヤーンクラスタにタスクを提出する方法です。flinkはyarnクラスタにタスクを投入する方法です。

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

入り口

CliFrontendのメインメソッドでは、多くのことが行われています。

  1. flinkの設定ディレクトリを取得
  2. flink をロードするための設定
  3. コマンドライン引数の読み込みと解析
  4. 特定の操作は、CliFrontend.parseParametersメソッドを通して実行されます。
 // 1. find the configuration directory
 final String configurationDirectory = getConfigurationDirectoryFromEnv();
 // 2. load the global configuration
 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
 // 3. load the custom command lines
 final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
 configuration,
 configurationDirectory);
 try {
 final CliFrontend cli = new CliFrontend(
 configuration,
 customCommandLines);
 SecurityUtils.install(new SecurityConfiguration(cli.configuration));
 int retCode = SecurityUtils.getInstalledContext()
 .runSecured(() -> cli.parseParameters(args));
 System.exit(retCode);
 }

特定のアクションを実行

parseParametersメソッドでは、実行される操作が解析され、switchを使用して実行されるメソッド(この場合はrunApplicationメソッド)に移動します。

 switch (action) {
 case ACTION_RUN:
 run(params);
 return 0;
 case ACTION_RUN_APPLICATION:
 runApplication(params);
 return 0;
 case ACTION_LIST:
 list(params);
 return 0;
 ..........
 } 

runApplication

このメソッドでは、主に、入力されたコマンドラインパラメータを使用して、フリンクのコンフィギュレーションオブジェクトConfigurationと、アプリケーションモードで必要なコンフィギュレーションApplicationConfigurationを構築します。

 // flinkの設定オブジェクトは、コマンドラインパラメータから渡された構築される。
 final Configuration effectiveConfiguration = getEffectiveConfiguration(
 activeCommandLine, commandLine, programOptions, Collections.singletonList(uri.toString()));
 
 //エントリクラスとjarパッケージのパラメータを含む設定を構築する ApplicationConfiguration
 final ApplicationConfiguration applicationConfiguration =
 new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
 
 deployer.run(effectiveConfiguration, applicationConfiguration);

ClusterDescriptorの構築

アプリケーションクラスタの展開

ここでは例として、YarnClusterDescriptor#deployApplicationClusterメソッドに、このメソッドでは、いくつかの簡単なチェックの後を参照してください、プライベートメソッドを呼び出すYarnClusterDescriptor#deployInternal、このdeployInternalは、メソッドを提供するパブリック関数は、他のデプロイメントモードを見ることができます、yarnセッションモードは、ジョブモードごとに、このメソッドは、単にパラメータを呼び出されます。deployInternalは、このdeployInternalは、メソッドを提供するためのパブリック関数は、他のデプロイメントモードを見ることができます、yarnセッションモードは、ジョブモードごとに、このメソッドと呼ばれ、パラメータが異なるだけです。

その方法を簡単に説明します:

 /**
 * This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
 *
 * @param clusterSpecification いくつかの設定パラメータ
 * @param applicationName yarn job 
 * @param yarnClusterEntrypoint  
 * @param jobGraph アプリケーションのjobGraphは空にすることができる。
 * @param detached 分離モード
 */
 private ClusterClientProvider<ApplicationId> deployInternal(
 ClusterSpecification clusterSpecification,
 String applicationName,
 String yarnClusterEntrypoint,
 @Nullable JobGraph jobGraph,
 boolean detached) throws Exception {

この方法では、デプロイモードに応じて必要なチェックを行い、yarnコンテナの動作を開始します。例えば、ジョブモードごとに、flink jarパッケージのアップロードなどがこのメソッドで行われます。さらに、このメソッドはApplicationMaster/JobManagerが正常にデプロイされるまでブロックし続け、ユーザプログラムを実行するためのエントリクラスApplicationClusterEntryPointに入ります。

アプリケーションクラスターエントリポイント

ヤーンコンポーネントの起動が完了すると、ユーザーのプログラムの実行を開始します:

  • 必要なジャーやリソースのダウンロード
  • リーダー選挙を実施し、主要メソッドの運営者を決定します。
  • ユーザープログラム終了時のクラスタ終了
  • HAとフォールトトレランスの保証

アプリケーションモードは、yarnクラスタにタスクを送信するには、おおよそのプロセスは、まずここで、flinkのタスクの実行プロセスを話すだろう、導入に専用の記事を書くことに続いて。

Read next

ポップアップウィンドウのコンポーネントを議論する

ユーザーのインタラクション行動から判断すると、「モーダルポップアップ」と「非モーダルポップアップ」の2つに分けることができます:モーダルポップアップは、ユーザーが現在のポップアップページから飛び出す前に、少なくとも1ステップの操作を実行する必要があり、非モーダルポップアップは、ユーザーがタッチなどのインタラクティブな行動を出す必要がありません。挙動。インターネット製品の開発と反復を通じて、ポップアップウィンドウは

Oct 23, 2020 · 6 min read

バブルソートについて話す

Oct 23, 2020 · 3 min read

vue-cli3の最適化

Oct 23, 2020 · 3 min read

J41コンストラクタ

Oct 23, 2020 · 4 min read

JS関数

Oct 23, 2020 · 4 min read