重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

JobManager 高可用性 (HA)

ジョブマネージャーは全てのFlinkのデプロイを調整します。それはスケジューリング およびリソース管理の両方に責任があります。

デフォルトでは、1つのFlinkあたり1つのジョブマネージャーがあります。これは単一障害点 (SPOF)を作ります: もしジョブマネージャーがクラッシュすると、新しいプログラムはサブミットされずプログラムを実行できません。

ジョブマネージャーの高可用性を使って、ジョブマネージャーの障害から回復することができ、それによってSPOFを取り除きます。standalone および YARN clustersの両方のための高可用性を設定することができます。

スタンドアローン クラスタ 高可用性

スタンドアローン クラスタのジョブマネージャーの高可用性の全体的な考えは、常に1つの先導するジョブマネージャーと リーダーが障害の場合にリーダーシップを受け継ぐ複数のスタンドバイ ジョブマネージャーがあるというものです。これは単一障害点が無く、スタンドバイ ジョブマネージャーがリーダーシップを取り次第進めることができることを保証します。スタンドバイとマスター ジョブマネージャー インスタンスの間に明確な違いはありません。各ジョブマネージャーはマスターあるいはスタンドバイの役割をすることができます。

例として、以下の3つのジョブマネージャ インスタンスを持つセットアップを考えます:

設定

ジョブマネージャーの高可用性を有効にするには、recovery modezookeeperにし、ZooKeeper quorum を設定し、全てのジョブマネージャーのホストとそれらのweb UIポートを使ってmasters fileをセットアップします。

Flinkは分散調整のために全ての実行中のジョブマネージャーのインスタンス間でZooKeeper を利用します。ZooKeeperはFlinkのための別個のサービスで、リーダーの選定と軽量で一貫性のある状態のストレージを使ってとても信頼できる分散調整を提供します。Zookeeperについてのもっと詳しい情報はZooKeeper の開始ガイド を調べてください。Flink includes scripts to bootstrap a simple ZooKeeper installation.

マスターファイル (マスター)

HA-クラスターを開始するために、conf/masters内の mastersファイルを設定します:

  • masters file: masters fileは全てのホストを含みます。この上でジョブマネージャーが開始され、ポート番号はwebユーザインタフェースがバインドされます。

    jobManagerAddress1:webUIPort1
    [...]
    jobManagerAddressX:webUIPortX
    

デフォルトでは、ジョブマネージャーは内部処理通信のためにランダムなポートを取り上げるでしょう。recovery.jobmanager.port キーを使ってこれを変更することができます。このキーは1つのポート(例えば50010)、範囲(50000-50025)、あるいは両方の組み合わせ (50010,50011,50020-50025,50050-50075)を受け付けます。

HA-クラスターを開始するために、以下の設定キーを conf/flink-conf.yamlに追加します:

  • Recovery mode (必須): recovery mode は高可用性モードを有効にするためにconf/flink-conf.yaml の中で zookeeperに設定される必要があります。

    recovery.mode: zookeeper
  • ZooKeeper quorum (必須): ZooKeeper quorum はZooKeeperのサーバのリプリケートされたグループです。これは分散調整サービスを提供します。

    recovery.zookeeper.quorum: address1:2181[,...],addressX:2181

    addressX:port はZooKeeperサーバを参照します。これはFlinkによって指定されたアドレスとポートで到達可能です。

  • ZooKeeper root (推奨): root ZooKeeper node。これは全てのクラスタの名前空間ノードが配置される場所です。

    recovery.zookeeper.path.root: /flink
    
    
  • ZooKeeper namespace (推奨): namespace ZooKeeper node。クラスタのために必要とされる全てのデータが配置される場所です。

    recovery.zookeeper.path.namespace: /default_ns # important: customize per cluster

    重要: 複数のFlink HA クラスタを実行している場合は、各クラスタのために手動で個々の名前空間を設定する必要があります。デフォルトでは、YarnクラスタとYarnセッションが自動的にYarnアプリケーションidに基づいて名前空間を生成します。手動での設定はYarn内でのこの挙動を上書きます。今度は、-z CLI オプションを使って名前空間を指定することで手動の設定を上書きします。

  • 状態バックエンドとストレージディレクトリ (必須): ジョブマネージャーのメタデータはstate backendの中に保持され、この状態へのポインタだけがZooKeeperに格納されます。現在のところ、ファイルシステムの状態バックエンドだけがHAモードでサポートされます。

    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
    recovery.zookeeper.storageDir: hdfs:///flink/recovery

    storageDirはジョブマネージャーの障害を回復するために必要な全てのデータを格納します。

マスターおよびZooKeeperのquorumを設定した後で、提供されたクラスタのスタートアップ スクリプトを使うことができます。それらはHA-クラスタを開始するでしょう。スクリプトを呼ぶ時にはZooKeeper quorum が実行中でなければならない 事を忘れないで、開始している各HAクラスタのための個々のZooKeeperのルート パスを設定を必ずするようにしてください。

例: 2つのジョブマネージャーを持つスタンドアローンクラスタ

  1. リカバリーモードとZooKeeper quorumをconf/flink-conf.yamlの中に設定します:

    recovery.mode: zookeeper
    recovery.zookeeper.quorum: localhost:2181
    recovery.zookeeper.path.root: /flink
    recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
    recovery.zookeeper.storageDir: hdfs:///flink/recovery
  2. conf/mastersの中でマスターを設定します:

    localhost:8081
    localhost:8082
  3. conf/zoo.cfgの中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):

    server.0=localhost:2888:3888
  4. ZooKeeper quorumを開始する:

    $ bin/start-zookeeper-quorum.sh
    Starting zookeeper daemon on host localhost.
  5. HA-clusterを開始する:

    $ bin/start-cluster.sh
    Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
    ローカルホストのホスト上でジョブマネージャー デーモンを開始します。
    ローカルホストのホスト上でジョブマネージャー デーモンを開始します。
    ローカルホストのホスト上でタスクマネージャー デーモンを開始します。
  6. ZooKeeper quorum とクラスタを停止する:

    $ bin/stop-cluster.sh
    Stopping taskmanager daemon (pid: 7647) on localhost.
    ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7495)を停止します。
    ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7349)を停止します。
    $ bin/stop-zookeeper-quorum.sh
    Stopping zookeeper daemon (pid: 7101) on host localhost.

YARN クラスタ 高可用性

高可用性YARNクラスタを実行する場合、複数のジョブマネージャー (ApplicationMaster) インスタンスを実行しませんが、障害時にYARNによって再起動される1つを実行します。正確な挙動は使用しているバージョンに依存します。

設定

アプリケーション マスターの最大試行 (yarn-site.xml)

yarn-site.xmlの中であなたの YARN セットアップについてのアプリケーションマスターの最大試行数を設定する必要があります:

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
 アプリケーションマスターの実行試行の最大数。
 </description>
</property>

現在のYARNバージョンのデフォルトは2です (1つのジョブマネージャーの障害は許容できることを意味します)。

HA設定(上を見てください)に加えて、conf/flink-conf.yamlに最大試行を設定する必要があります:

yarn.application-attempts: 10

このことは、YARNがアプリケーションを失敗するまでアプリケーションを10回再起動することができることを意味します。yarn.resourcemanager.am.max-attemptsがアプリケーションの再起動の上限であることを覚えておくことが重要です。したがって、Flink内で設定されたアプリケーションの試行数は、YARNが開始された時のYARNクラスタの設定を超えることができません。

コンテナのシャットダウンの挙動

  • YARN 2.3.0 < version < 2.4.0. アプリケーションマスタが失敗した場合に全てのコンテナが再起動されます。
  • YARN 2.4.0 < version < 2.6.0. タスクマネージャーはアプリケーションマスタの失敗を超えて維持され続けます。これはスタートアップの時間が早く、コンテナリソースを再び取得するまでユーザが待つ必要が無いという利点があります。
  • YARN 2.6.0 <= version: FlinkのAkkaがタイムアウト値に試行障害検証間隔を設定します。The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it’s application attempts.

Note: Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container restarts from a restarted Application Master/Job Manager container. See FLINK-4142 for details. We recommend using at least Hadoop 2.5.0 for high availability setups on YARN.

Example: Highly Available YARN Session

  1. リカバリーモードとZooKeeper quorumをconf/flink-conf.yamlの中に設定します:

    recovery.mode: zookeeper
    recovery.zookeeper.quorum: localhost:2181
    recovery.zookeeper.path.root: /flink
    recovery.zookeeper.path.namespace: /cluster_one # important: customize per cluster
    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
    recovery.zookeeper.storageDir: hdfs:///flink/recovery
    yarn.application-attempts: 10
  2. conf/zoo.cfgの中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):

    server.0=localhost:2888:3888
  3. ZooKeeper quorumを開始する:

    $ bin/start-zookeeper-quorum.sh
    Starting zookeeper daemon on host localhost.
  4. HA-clusterを開始する:

    $ bin/yarn-session.sh -n 2

Bootstrap ZooKeeper

If you don’t have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink.

There is a ZooKeeper configuration template in conf/zoo.cfg. You can configure the hosts to run ZooKeeper on with the server.X entries, where X is a unique ID of each server:

server.X=addressX:peerPort:leaderPort
[...]
server.Y=addressY:peerPort:leaderPort

The script bin/start-zookeeper-quorum.sh will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from conf/zoo.cfg and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.

TOP
inserted by FC2 system