ジョブマネージャーは全てのFlinkのデプロイを調整します。それはスケジューリング およびリソース管理の両方に責任があります。
デフォルトでは、1つのFlinkあたり1つのジョブマネージャーがあります。これは単一障害点 (SPOF)を作ります: もしジョブマネージャーがクラッシュすると、新しいプログラムはサブミットされずプログラムを実行できません。
ジョブマネージャーの高可用性を使って、ジョブマネージャーの障害から回復することができ、それによってSPOFを取り除きます。standalone および YARN clustersの両方のための高可用性を設定することができます。
スタンドアローン クラスタのジョブマネージャーの高可用性の全体的な考えは、常に1つの先導するジョブマネージャーと リーダーが障害の場合にリーダーシップを受け継ぐ複数のスタンドバイ ジョブマネージャーがあるというものです。これは単一障害点が無く、スタンドバイ ジョブマネージャーがリーダーシップを取り次第進めることができることを保証します。スタンドバイとマスター ジョブマネージャー インスタンスの間に明確な違いはありません。各ジョブマネージャーはマスターあるいはスタンドバイの役割をすることができます。
例として、以下の3つのジョブマネージャ インスタンスを持つセットアップを考えます:
ジョブマネージャーの高可用性を有効にするには、high-availabilityモード を zookeeperにし、ZooKeeper quorum を設定し、全てのジョブマネージャーのホストとそれらのweb UIポートを使ってmasters fileをセットアップします。
Flinkは分散調整のために全ての実行中のジョブマネージャーのインスタンス間でZooKeeper を利用します。ZooKeeperはFlinkのための別個のサービスで、リーダーの選定と軽量で一貫性のある状態のストレージを使ってとても信頼できる分散調整を提供します。Zookeeperについてのもっと詳しい情報はZooKeeper の開始ガイド を調べてください。Flink includes scripts to bootstrap a simple ZooKeeper installation.
HA-クラスターを開始するために、conf/masters
内の mastersファイルを設定します:
masters file: masters fileは全てのホストを含みます。この上でジョブマネージャーが開始され、ポート番号はwebユーザインタフェースがバインドされます。
jobManagerAddress1:webUIPort1 [...] jobManagerAddressX:webUIPortX
デフォルトでは、ジョブマネージャーは内部処理通信のためにランダムなポートを取り上げるでしょう。high-availability.jobmanager.port
キーを使ってこれを変更することができます。このキーは1つのポート(例えば50010
)、範囲(50000-50025
)、あるいは両方の組み合わせ (50010,50011,50020-50025,50050-50075
)を受け付けます。
HA-クラスターを開始するために、以下の設定キーを conf/flink-conf.yaml
に追加します:
high-availability mode (必須): high-availability mode は高可用性モードを有効にするためにconf/flink-conf.yaml
の中で zookeeperに設定される必要があります。
high-availability: zookeeper
ZooKeeper quorum (必須): ZooKeeper quorum はZooKeeperのサーバのリプリケートされたグループです。これは分散調整サービスを提供します。
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
各addressX:port はZooKeeperサーバを参照します。これはFlinkによって指定されたアドレスとポートで到達可能です。
ZooKeeper root (推奨): root ZooKeeper node。これは全てのクラスタの名前空間ノードが配置される場所です。
high-availability.zookeeper.path.root: /flink
ZooKeeper namespace (推奨): namespace ZooKeeper node。クラスタのために必要とされる全てのデータが配置される場所です。
high-availability.zookeeper.path.namespace: /default_ns # important: customize per cluster
重要: 複数のFlink HA クラスタを実行している場合は、各クラスタのために手動で個々の名前空間を設定する必要があります。デフォルトでは、YarnクラスタとYarnセッションが自動的にYarnアプリケーションidに基づいて名前空間を生成します。手動での設定はYarn内でのこの挙動を上書きます。今度は、-z CLI オプションを使って名前空間を指定することで手動の設定を上書きします。
状態バックエンドとストレージディレクトリ (必須): ジョブマネージャーのメタデータはstate backendの中に保持され、この状態へのポインタだけがZooKeeperに格納されます。現在のところ、ファイルシステムの状態バックエンドだけがHAモードでサポートされます。
high-availability.zookeeper.storageDir: hdfs:///flink/recovery
state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
storageDir
はジョブマネージャーの障害を回復するために必要な全てのデータを格納します。
マスターおよびZooKeeperのquorumを設定した後で、提供されたクラスタのスタートアップ スクリプトを使うことができます。それらはHA-クラスタを開始するでしょう。スクリプトを呼ぶ時にはZooKeeper quorum が実行中でなければならない 事を忘れないで、開始している各HAクラスタのための個々のZooKeeperのルート パスを設定を必ずするようにしてください。
Configure high availability mode and ZooKeeper quorum in conf/flink-conf.yaml
:
high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster high-availability.zookeeper.storageDir: hdfs:///flink/recovery
state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints
conf/masters
の中でマスターを設定します:
localhost:8081 localhost:8082
conf/zoo.cfg
の中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):
server.0=localhost:2888:3888
ZooKeeper quorumを開始する:
$ bin/start-zookeeper-quorum.sh Starting zookeeper daemon on host localhost.
HA-clusterを開始する:
$ bin/start-cluster.sh Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. ローカルホストのホスト上でジョブマネージャー デーモンを開始します。 ローカルホストのホスト上でジョブマネージャー デーモンを開始します。 ローカルホストのホスト上でタスクマネージャー デーモンを開始します。
ZooKeeper quorum とクラスタを停止する:
$ bin/stop-cluster.sh Stopping taskmanager daemon (pid: 7647) on localhost. ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7495)を停止します。 ローカルホストのホスト上でジョブマネージャー デーモン (pid: 7349)を停止します。 $ bin/stop-zookeeper-quorum.sh Stopping zookeeper daemon (pid: 7101) on host localhost.
高可用性YARNクラスタを実行する場合、複数のジョブマネージャー (ApplicationMaster) インスタンスを実行しませんが、障害時にYARNによって再起動される1つを実行します。正確な挙動は使用しているバージョンに依存します。
yarn-site.xml
の中であなたの YARN セットアップについてのアプリケーションマスターの最大試行数を設定する必要があります:
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
アプリケーションマスターの実行試行の最大数。
</description>
</property>
現在のYARNバージョンのデフォルトは2です (1つのジョブマネージャーの障害は許容できることを意味します)。
HA設定(上を見てください)に加えて、conf/flink-conf.yaml
に最大試行を設定する必要があります:
yarn.application-attempts: 10
このことは、YARNがアプリケーションを失敗するまでアプリケーションを10回再起動することができることを意味します。yarn.resourcemanager.am.max-attempts
がアプリケーションの再起動の上限であることを覚えておくことが重要です。したがって、Flink内で設定されたアプリケーションの試行数は、YARNが開始された時のYARNクラスタの設定を超えることができません。
注意: Hadoop YARN 2.4.0 には、再起動されたアプリケーションのマスター/ジョブ マネージャーのコンテナ を再起動することができないという、大きなバグ (2.5.0で修正されました) があります。詳細はFLINK-4142 を見てください。YARN上での高可用性のために少なくともHadoop 2.5.0 を使うことをお勧めします。
conf/flink-conf.yaml
内の Configure HA mode and ZooKeeper quorum:
high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink high-availability.zookeeper.path.namespace: /cluster_one # important: customize per cluster state.backend: filesystem state.backend.fs.checkpointdir: hdfs:///flink/checkpoints yarn.application-attempts: 10
conf/zoo.cfg
の中でZooKeeper サーバを設定します(現在のところ、マシーンあたり1つだけZooKeeperを実行することができます):
server.0=localhost:2888:3888
ZooKeeper quorumを開始する:
$ bin/start-zookeeper-quorum.sh Starting zookeeper daemon on host localhost.
HA-clusterを開始する:
$ bin/yarn-session.sh -n 2
ZooKeeperがKerberosを使ってセキュアモードで動作している場合、必要に応じてflink-conf.yaml
内の以下の設定を上書きすることができます:
zookeeper.sasl.service-name: zookeeper # デフォルトは "zookeeper"。ZooKeeperの定員が設定されていた場合 # 異なるサービス名を使う場合、ここで提供することができます。 zookeeper.sasl.login-context-name: Client # デフォルトは "Client"。その値は以下の値の1つと一致する必要があります # "security.kerberos.login.contexts"の中の設定。
KerberosセキュリティのためのFlinkの設定についての詳しい情報はここを見てください。Flinkがどのように内部的にKerberosベースのセキュリティをセットアップするかについての更なる詳細はここで 見つけることもできます。
実行中のZooKeeperのインストレーションが無い場合、ヘルパースクリプトを使うことができます。これはFlinkに同梱されます。
conf/zoo.cfg
内にZooKeeperの設定テンプレートがあります。server.X
エントリを使ってZooKeeperを実行するサーバを設定することができます。Xは各サーバのユニークなIDです:
server.X=addressX:peerPort:leaderPort [...] server.Y=addressY:peerPort:leaderPort
スクリプト bin/start-zookeeper-quorum.sh
は設定された各ホスト上でZooKeeperを開始するでしょう。開始されたプロセスはFlinkラッパーを経由してZooKeeperを起動します。これはconf/zoo.cfg
から設定を読み込み、利便性のために幾つかの必須の設定値が設定されるようにします。プロダクションのセットアップでは、独自のZooKeeperインストレーションを管理することをお勧めします。