JobManager 高可用性 (HA)

ジョブマネージャーは全てのFlinkのデプロイを調整します。それはスケジューリング およびリソース管理の両方に責任があります。

デフォルトでは、1つのFlinkあたり1つのジョブマネージャーがあります。これは単一障害点 (SPOF)を作ります: もしジョブマネージャーがクラッシュすると、新しいプログラムはサブミットされずプログラムを実行できません。

ジョブマネージャーの高可用性を使って、ジョブマネージャーの障害から回復することができ、それによってSPOFを取り除きます。standalone および YARN clustersの両方のための高可用性を設定することができます。

スタンドアローン クラスタ 高可用性

スタンドアローン クラスタのジョブマネージャーの高可用性の全体的な考えは、常に1つの先導するジョブマネージャーと リーダーが障害の場合にリーダーシップを受け継ぐ複数のスタンドバイ ジョブマネージャーがあるというものです。これは単一障害点が無く、スタンドバイ ジョブマネージャーがリーダーシップを取り次第進めることができることを保証します。スタンドバイとマスター ジョブマネージャー インスタンスの間に明確な違いはありません。各ジョブマネージャーはマスターあるいはスタンドバイの役割をすることができます。

例として、以下の3つのジョブマネージャ インスタンスを持つセットアップを考えます:

設定

ジョブマネージャーの高可用性を有効にするには、high-availabilityモードzookeeperにし、ZooKeeper quorum を設定し、全てのジョブマネージャーのホストとそれらのweb UIポートを使ってmasters fileをセットアップします。

Flinkは分散調整のために全ての実行中のジョブマネージャーのインスタンス間でZooKeeper を利用します。ZooKeeperはFlinkのための別個のサービスで、リーダーの選定と軽量で一貫性のある状態のストレージを使ってとても信頼できる分散調整を提供します。Zookeeperについてのもっと詳しい情報はZooKeeper の開始ガイド を調べてください。Flink includes scripts to bootstrap a simple ZooKeeper installation.

マスターファイル (マスター)

HA-クラスターを開始するために、conf/masters内の mastersファイルを設定します:

  • masters file: masters fileは全てのホストを含みます。この上でジョブマネージャーが開始され、ポート番号はwebユーザインタフェースがバインドされます。

    jobManagerAddress1:webUIPort1
    [...]
    jobManagerAddressX:webUIPortX
    

デフォルトでは、ジョブマネージャーは内部処理通信のためにランダムなポートを取り上げるでしょう。high-availability.jobmanager.port キーを使ってこれを変更することができます。このキーは1つのポート(例えば50010)、範囲(50000-50025)、あるいは両方の組み合わせ (50010,50011,50020-50025,50050-50075)を受け付けます。

HA-クラスターを開始するために、以下の設定キーを conf/flink-conf.yamlに追加します:

  • high-availability mode (必須): high-availability mode は高可用性モードを有効にするためにconf/flink-conf.yaml の中で zookeeperに設定される必要があります。

    high-availability: zookeeper
  • ZooKeeper quorum (必須): ZooKeeper quorum はZooKeeperのサーバのリプリケートされたグループです。これは分散調整サービスを提供します。

    high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181

    addressX:port はZooKeeperサーバを参照します。これはFlinkによって指定されたアドレスとポートで到達可能です。

  • ZooKeeper root (推奨): root ZooKeeper node。これは全てのクラスタの名前空間ノードが配置される場所です。

    high-availability.zookeeper.path.root: /flink
    
    
  • ZooKeeper namespace (推奨): namespace ZooKeeper node。クラスタのために必要とされる全てのデータが配置される場所です。

    high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster

    重要: 複数のFlink HA クラスタを実行している場合は、各クラスタのために手動で個々の名前空間を設定する必要があります。デフォルトでは、YarnクラスタとYarnセッションが自動的にYarnアプリケーションidに基づいて名前空間を生成します。手動での設定はYarn内でのこの挙動を上書きます。今度は、-z CLI オプションを使って名前空間を指定することで手動の設定を上書きします。

  • 状態バックエンドとストレージディレクトリ (必須): ジョブマネージャーのメタデータはstate backendの中に保持され、この状態へのポインタだけがZooKeeperに格納されます。現在のところ、ファイルシステムの状態バックエンドだけがHAモードでサポートされます。

    high-availability.zookeeper.storageDir: hdfs:///flink/recovery

    state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints

    storageDirはジョブマネージャーの障害を回復するために必要な全てのデータを格納します。

マスターおよびZooKeeperのquorumを設定した後で、提供されたクラスタのスタートアップ スクリプトを使うことができます。それらはHA-クラスタを開始するでしょう。スクリプトを呼ぶ時にはZooKeeper quorum が実行中でなければならない 事を忘れないで、開始している各HAクラスタのための個々のZooKeeperのルート パスを設定を必ずするようにしてください。

例: 2つのジョブマネージャーを持つスタンドアローンクラスタ

  1. Configure high availability mode and ZooKeeper quorum in conf/flink-conf.yaml:

    high-availability: zookeeper
    high-availability.zookeeper.quorum: localhost:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
    high-availability.zookeeper.storageDir: hdfs:///flink/recovery

state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints

  1. conf/mastersの中でマスターを設定します:

    localhost:8081
    localhost:8082
  2. conf/zoo.cfgの中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):

    server.0=localhost:2888:3888
  3. ZooKeeper quorumを開始する:

    $ bin/start-zookeeper-quorum.sh
    Starting zookeeper daemon on host localhost.
  4. HA-clusterを開始する:

    $ bin/start-cluster.sh
    Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
    ローカルホストのホスト上でジョブマネージャー デーモンを開始します。
    ローカルホストのホスト上でジョブマネージャー デーモンを開始します。
    ローカルホストのホスト上でタスクマネージャー デーモンを開始します。
  5. ZooKeeper quorum とクラスタを停止する:

    $ bin/stop-cluster.sh
    Stopping taskmanager daemon (pid: 7647) on localhost.
    ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7495)を停止します。
    ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7349)を停止します。
    $ bin/stop-zookeeper-quorum.sh
    Stopping zookeeper daemon (pid: 7101) on host localhost.

YARN クラスタ 高可用性

高可用性YARNクラスタを実行する場合、複数のジョブマネージャー (ApplicationMaster) インスタンスを実行しませんが、障害時にYARNによって再起動される1つを実行します。正確な挙動は使用しているバージョンに依存します。

設定

アプリケーション マスターの最大試行 (yarn-site.xml)

yarn-site.xmlの中であなたの YARN セットアップについてのアプリケーションマスターの最大試行数を設定する必要があります:

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    アプリケーションマスターの実行試行の最大数。
  </description>
</property>

現在のYARNバージョンのデフォルトは2です (1つのジョブマネージャーの障害は許容できることを意味します)。

HA設定(上を見てください)に加えて、conf/flink-conf.yamlに最大試行を設定する必要があります:

yarn.application-attempts: 10

このことは、YARNがアプリケーションを失敗するまでアプリケーションを10回再起動することができることを意味します。yarn.resourcemanager.am.max-attemptsがアプリケーションの再起動の上限であることを覚えておくことが重要です。したがって、Flink内で設定されたアプリケーションの試行数は、YARNが開始された時のYARNクラスタの設定を超えることができません。

コンテナのシャットダウンの挙動

  • YARN 2.3.0 < version < 2.4.0. アプリケーションマスタが失敗した場合に全てのコンテナが再起動されます。
  • YARN 2.4.0 < version < 2.6.0. タスクマネージャーはアプリケーションマスタの失敗を超えて維持され続けます。これはスタートアップの時間が早く、コンテナリソースを再び取得するまでユーザが待つ必要が無いという利点があります。
  • YARN 2.6.0 <= version: FlinkのAkkaがタイムアウト値に試行障害検証間隔を設定します。The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. これは長く続くジョブがそのアプリケーションの試行を消耗させることを防ぐでしょう。

注意: Hadoop YARN 2.4.0 には、再起動されたアプリケーションのマスター/ジョブ マネージャーのコンテナ を再起動することができないという、大きなバグ (2.5.0で修正されました) があります。詳細はFLINK-4142 を見てください。YARN上での高可用性のために少なくともHadoop 2.5.0 を使うことをお勧めします。

例: 高可用性 YARN セッション

  1. conf/flink-conf.yaml 内の Configure HA mode and ZooKeeper quorum:

    high-availability: zookeeper
    high-availability.zookeeper.quorum: localhost:2181
    high-availability.zookeeper.storageDir: hdfs:///flink/recovery
    high-availability.zookeeper.path.root: /flink
    high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster
    state.backend: filesystem
    state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
    yarn.application-attempts: 10
  2. conf/zoo.cfgの中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):

    server.0=localhost:2888:3888
  3. ZooKeeper quorumを開始する:

    $ bin/start-zookeeper-quorum.sh
    Starting zookeeper daemon on host localhost.
  4. HA-clusterを開始する:

    $ bin/yarn-session.sh -n 2

Zookeeperのセキュリティのための設定

ZooKeeperがKerberosを使ってセキュアモードで動作している場合、必要に応じてflink-conf.yaml内の以下の設定を上書きすることができます:

zookeeper.sasl.service-name: zookeeper     # デフォルトは "zookeeper"。ZooKeeperの定員が設定されていた場合
                                           # 異なるサービス名を使う場合、ここで提供することができます。
zookeeper.sasl.login-context-name: Client  # デフォルトは "Client"。その値は以下の値の1つと一致する必要があります
                                           # "security.kerberos.login.contexts"の中の設定。

KerberosセキュリティのためのFlinkの設定についての詳しい情報はここを見てください。Flinkがどのように内部的にKerberosベースのセキュリティをセットアップするかについての更なる詳細はここで 見つけることもできます。

Bootstrap ZooKeeper

実行中のZooKeeperのインストレーションが無い場合、ヘルパースクリプトを使うことができます。これはFlinkに同梱されます。

conf/zoo.cfg内にZooKeeperの設定テンプレートがあります。server.X エントリを使ってZooKeeperを実行するサーバを設定することができます。Xは各サーバのユニークなIDです:

server.X=addressX:peerPort:leaderPort
[...]
server.Y=addressY:peerPort:leaderPort

スクリプト bin/start-zookeeper-quorum.sh は設定された各ホスト上でZooKeeperを開始するでしょう。開始されたプロセスはFlinkラッパーを経由してZooKeeperを起動します。これはconf/zoo.cfg から設定を読み込み、利便性のために幾つかの必須の設定値が設定されるようにします。プロダクションのセットアップでは、独自のZooKeeperインストレーションを管理することをお勧めします。

TOP
inserted by FC2 system