This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
ネイティブKubernetes #
このページでは、FlinkをKubernetesにネイティブにデプロイする方法について説明します。
はじめに #
はじめにのセクションでは、Kubernetes上で完全に機能するFlinkクラスタをセットアップする方法を説明します。
はじめに #
Kubernetesは、コンピュータアプリケーションデプロイメント、スケーリング、管理を自動化するための人気のあるコンテナオーケストレーションシステムです。 FlinkのネイティブKubernetes統合により、実行中のKubernetesクラスタ上にFlinkを直接デプロイできます。 さらに、FlinkはKubernetesと直接通信できるため、必要なリソースに応じてTaskManagerを動的に割り当てたり割り当てを解除したりできます。
Apache Flinkは、Kubernetes上でFlinkクラスタを管理するためのKubernetesオペレータも提供します。スタンドアローンとネイティブデプロイメントの両方をサポートし、kubernetes上のFlinkリソースのデプロイメント、設定、ライフサイクル管理を大幅に簡素化します。
詳細については、Flink Kubernetesオペレータドキュメントを参照してください。
準備 #
はじめにセクションでは、次の要件を満たすKubernetesクラスタが実行されていることを前提としています。
- Kubernetes >= 1.9.
- KubeConfigはポッドとサービスの一覧表示、作成、削除にアクセスでき、
~/.kube/config
経由で設定できます。kubectl auth can-i <list|create|edit|delete> pods
を実行して権限を検証できます。 - 有効化されたKubernetes DNS。
- ポッドを作成、削除するためのRBAC権限を持つ
デフォルト
のサービスアカウント。
Kubernetesクラスタのセットアップに問題がある場合は、Kubernetesクラスタをセットアップする方法を参照してください。
Kubernetes上でのFlinkセッションの開始 #
Kubernetesクラスタを実行し、kubectl
がそれを指すように設定されると、セッションモードでFlinkクラスタを開始できます
# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
# (2) Submit example job
$ ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
In default, Flink’s Web UI and REST endpoint are exposed as ClusterIP
service. To access the service, please refer to Accessing Flink’s Web UI for instructions.
おめでとう!KubernetesにFlinkをデプロイしてFlinkアプリケーションを正常に実行できました。
デプロイメントモード #
プロダクション環境で使うには、アプリケーションモードでFlinkアプリケーションをデプロイすることをお勧めします。これらのモードはアプリケーションの分離を強化するからです。
アプリケーションモード #
アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
アプリケーションモードはユーザコードがFlinkイメージと一緒にバンドルされている必要があります。クラスタ上のユーザコードのmain()
メソッドを実行するからです。
アプリケーションモードは、アプリケーションの終了後に全てのFlinkコンポーネントが適切にクリーンアップされるようにします。
Flinkコミュニティは、ユーザコードをバンドルするために使える基本Dockerイメージを提供します:
FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar
custom-image-name
でDockerイメージを作成して公開した後で、次のコマンドを使ってアプリkーションクラスタを開始できます:
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image.ref=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
注意 local
はアプリケーションモードでサポートされる唯一のスキームです。
kubernetes.cluster-id
オプションはクラスタ名を指定し、一意である必要があります。
個のオプションを指定しない場合、Flinkはランダムな名前を生成します。
kubernetes.container.image.ref
オプションはポッドを開始するイメージを指定します。
アプリケーションクラスタがデプロイされると、それとやりとりできるようになります:
# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>
bin/flink
にkey-valueペア-Dkey=value
を渡すことで、conf/flink-conf.yaml
で設定される設定を上書きできます。
セッションモード #
セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
このページの上部にあるはじめにガイドで、セッションクラスタのデプロイメントについて説明しました。
セッションモードは次の2つのモードで実行できます:
-
detachedモード (デフォルト):
kubernetes-session.sh
はKubernetes上にFlinkをデプロイし、終了します。 -
attachedモード (
-Dexecution.attached=true
):kubernetes-session.sh
は存続したままで、実行中のFlinkクラスタを制御するためのコマンドを入力できます。 例えば、stop
は実行中のセッションクラスタを停止します。 全てのサポートされるコマンドを一覧表示するには、help
を入力してください。
クラスタID my-first-flink-cluster
を持つ実行中のセッションクラスタにre-attachするには、次のコマンドを使います:
$ ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dexecution.attached=true
bin/kubernetes-session.sh
にkey-valueペア-Dkey=value
を渡すことで、conf/flink-conf.yaml
で設定される設定を上書きできます。
実行中のセッションクラスタの停止 #
クラスタID my-first-flink-cluster
を持つ実行中のセッションクラスタを停止するには、delete the Flink deploymentか、次のコマンドを使います:
$ echo 'stop' | ./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dexecution.attached=true
Kubernetes上のFlinkのリファレンス #
Kubernetes上のFlinkの設定 #
Kubernetes固有の設定オプションは設定ページにリストされています。
FlinkはFabric8 Kubernetes clientを使ってKubernetes APIServerと通信し、Kubernetesリソース(例えば、デプロイメント、ポッド、ConfigMap、サービスなど)を作成/削除するだけでなく、ポッドとConfigMapsを監視します。 上記のFlink設定オプションを除き、Fabric8 Kubernetesクライアントの一部のエキスパートオプションはシステムプロパティまたは環境変数を介して設定できます。
例えば、ユーザは次のFlink設定オプションを使って、同時最大リクエストを設定できます。
containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=200"
FlinkのWeb UIへのアクセス #
FlinkのWeb UIとRESTエンドポイントは、kubernetes.rest-service.exposed.type設定オプションを介していくつかの方法で公開できます。
- ClusterIP: cluster内部IPでサービスを公開します。
サービスはクラスタ内でのみ到達可能です。
JobManager UIにアクセスする場合、またはジョブを既存のセッションに送信する場合、ローカルプロキシを開始する必要があります。
その後、
localhost:8081
を使って、Flinkジョブをセッションに送信したり、ダッシュボードを表示したりできます。
$ kubectl port-forward service/<ServiceName> 8081
-
NodePort: 静的ポート(
NodePort
)で各ノードのIPのサービスを公開します。<NodeIP>:<NodePort>
を使って、JobManagerサービスに接続できます。 -
LoadBalancer: クラウドプロバイダのロードバランサを使ってサービスを外部に公開します。 クラウドプロバイダとKubernetesはロードバランサの準備に時間がかかるため、クライアントログに
NodePort
JobManager Webインタフェースが記録される場合があります。kubectl get services/<cluster-id>-rest
を使って、EXTERNAL-IPを取得し、ロードバランサ JobManager Webインタフェースを手動で構築できますhttp://<EXTERNAL-IP>:8081
。
詳細は、Kubernetesでのサービスの公開の公式ドキュメントを参照してください。
環境によっては、LoadBalancer
RESTサービス公開タイプを使ってFlinkクラスタを起動すると、クラスタをパブリックにアクセスできるようになります(通常は任意のコードを実行できます)。
ログ #
Kubernetes統合は、conf/log4j-console.properties
とconf/logback-console.xml
をConfigMapとしてポッドに公開します。
これらのファイルの変更は新しく起動されたクラスタに表示されます。
ログへのアクセス #
デフォルトでは、JobManagerとTaskManagerはログをコンソールと各ポッドの/opt/flink/log
に同時に出力します。
STDOUT
とSTDERR
出力はコンソールにのみリダイレクトされます。
以下からアクセスできます
$ kubectl logs <pod-name>
ポッドが実行中の場合は、kubectl exec -it <pod-name> bash
を使ってトンネリングしてログを表示したり、プロセスをデバッグしたりすることもできます。
TaskManagersのログへのアクセス #
Flinkは、リソースを無駄にしないように、アイドル状態のTaskManagersの割り当てを自動的に解除します。 この動作により、それぞれのポッドのログへのアクセスが困難になる可能性があります。 ログファイルの調査をする時間を増やすために、resourcemanager.taskmanager-timeoutを設定することで、アイドリング状態のTaskManagersが解放されるまでの時間を長くすることができます。
ログレベルを動的に変更 #
ロガーを自動的に設定の変更を検知するに設定した場合、それぞれのConfigMap(クラスタIDがmy-first-flink-cluster
と仮定します)を変更してログレベルを動的に適合させることができます:
$ kubectl edit cm flink-config-my-first-flink-cluster
プラグインの使用 #
pluginsを使うには、それらをFlink JobManager/TaskManagerのポッドの正しい位置にコピーする必要があります。 ボリュームをマウントしたり独自のDockerイメージをビルドしたりせずに、組み込みのプラグインを使うことができます。 例えば、次のコマンドを使ってFlinkセッションクラスタ用のS3プラグインを有効にできます。
$ ./bin/kubernetes-session.sh
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.19-SNAPSHOT.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.19-SNAPSHOT.jar
独自のDockerイメージ #
独自のDockerイメージを使いたい場合は、設定オプションkubernetes.container.image.ref
を介して設定できます。
Flinkコミュニティは、優れた出発点となるリッチなFlink Dockerイメージを提供しています。
プラグインを有効にして依存関係を追加し、依存関係やその他のオプションを追加する方法については、FlinkのDockerイメージをカスタマイズする方法を参照してください。
シークレットの使用 #
Kubernetes Secretsは、パスワード、トークン、キーなどの少量の機密データを含むオブジェクトです。 そのような情報は、ポッド仕様またはイメージに入れられる可能性があります。 Flink on Kubernetesは次の2つの方法でシークレットを使えます:
-
シークレットをポッドからのファイルとして使う;
-
シークレットを環境変数として使う;
シークレットをポッドからのファイルとして使う #
次のコマンドは、開始されたポッドのパス/path/to/secret
にシークレットmysecret
をマウントします。
$ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret
シークレットmysecret
のユーザ名とパスワードは、ファイル/path/to/secret/username
と/path/to/secret/password
に保存されています。
詳細については、official Kubernetes documentationを参照してください。
シークレットを環境変数として使う #
次のコマンドは、開始されたポッドの環境変数としてシークレットmysecret
を公開します。
$ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
env:SECRET_USERNAME,secret:mysecret,key:username;\
env:SECRET_PASSWORD,secret:mysecret,key:password
環境変数SECRET_USERNAME
はユーザ名を含み、環境変数SECRET_PASSWORD
はシークレットmysecret
のパスワードを含みます。
詳細については、official Kubernetes documentationを参照してください。
Kubernetesでの高可用性 #
Kubernetesでの高可用性のために、既存の高可用性サービスを使うことができます。
スタンドバイJobManagerを起動するために、kubernetes.jobmanager.replicasの値を1より大きい値に設定します。 より早い回復を達成するのに役立ちます。 スタンドバイJobManagerを起動する時に高可用性を有効にする必要があることに注意してください。
手動リソースクリーンアップ #
FlinkはKubernetes OwnerReference’sを使って全てのクラスタコンポーネントをクリーンアップします。
ConfigMap
、Service
、Pod
を含む全てのFlinkが作成したリソースは、OwnerReference
がdeployment/<cluster-id>
に設定されています。
デプロイメントが削除された場合、全ての関係するリソースは自動的に削除されます。
$ kubectl delete deployment/<cluster-id>
サポートされるKubernetesバージョン #
現在、全てのKubernetesバージョン>= 1.9
がサポートされます。
名前空間 #
Kubernetesの名前空間は、resource quotasを介して複数のユーザ間でクラスタリソースを分割します。 Kubernetes上のFlinkは名前空間を使ってFlinkクラスタを起動します。 名前空間はkubernetes.namespaceを介して設定できます。
RBAC #
ロールベースのアクセス制御(RBAC)は、企業内のユーザのロールに基づいてコンピュータリソースまたはネットワークリソースへのアクセスを制御する方法です。 ユーザは、JobManagerがKubernetesクラスタ内のKubernetes APIサーバにアクセスするために使うRBACロールとサービスアカウントを設定できます。
全ての名前空間にはデフォルトのサービスアカウントがあります。ただし、default
サービスアカウントにはKubernetesクラスタ内のポッドを作成または削除する権限が無い場合があります。
ユーザは、default
のサービスアカウントの権限を更新するか、適切なロールがバインドされている別のサービスアカウントを指定する必要がある場合があります。
$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
default
サービスアカウントを使いたくない場合は、次のコマンドを使って新しいサービスアカウントflink-service-account
を作成し、ロールバインドを設定します。
次に設定オプション-Dkubernetes.service-account=flink-service-account
を使って、JobManagerポッドがflink-service-account
サービスアカウントを作成してTaskManagerとリーダーのConfigMapを作成/削除するようにします。
また、これにより、TaskManagerがリーダーのConfigMapsを監視して、JobManagerとResourceManagerのアドレスを取り出すことができます。
$ kubectl create serviceaccount flink-service-account
$ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account
詳細については、RBAC認証の公式Kubernetesドキュメントを参照してください。
Podテンプレート #
Flinkによりユーザはテンプレートファイルを介してJobManagerとTaskMangerポッドを定義できます。これにより、FlinkのKubernetes設定オプションで直接サポートされていない高度な機能をサポートできるようになります。
kubernetes.pod-template-file.default
を使って、ポッドの定義を含むローカルファイルを指定します。これは、JobManagerとTaskManagerを初期化するために使われます。
メインコンテナはflink-main-container
という名前で定義する必要があります。
詳細については、podテンプレートの例を参照してください。
Flinkによって上書きされるフィールド #
podテンプレートの一部のフィールドはFlinkによって上書きされます。 有向フィールド値を解決する仕組みは次のように分類できます:
-
Flinkによる定義: ユーザは設定できません。
-
ユーザによる定義: ユーザが自由にこの値を設定できます。Flinkフレームワークは追加の値を設定せず、有効な値は設定オプションとテンプレートから引き出されます。
優先順位: 最初に明示的な設定オプション値が取得され、次にpodテンプレートないの値が取得され、何も指定されていない場合は最後に設定オプションのデフォルト値が取得されます。
-
Flinkとの結合: Flinkは設定の値とユーザ定義の値と結合します(“ユーザによる定義"の優先順位を参照)。同じ名前のフィールドの場合、Flink値が優先されます。
上書きされるpodフィールドの完全なリストについては、以下の表を参照してください。 表にリストされていないpodテンプレートで定義されている全てのフィールドは、影響を受けません。
Pod Metadata
キー | 分類 | 関連する設定オプション | 説明 |
---|---|---|---|
名前 | Flinkによる定義 | JobManagerのpod名は、}}#kubernetes-cluster-id">kubernetes.cluster-idで定義されるデプロイメントで上書きされます。
TaskManagerのpod名は、FlinkのResourceManagerで生成されるパターン<clusterID>-<attempt>-<index> で上書きされます。 |
|
名前空間 | ユーザ定義 | }}#kubernetes-namespace">kubernetes.namespace | JobManagerデプロイメントと TaskManager podsは両方ともユーザ定義の名前空間で作成されます。 |
ownerReferences | Flinkによる定義 | JobManagerとTaskManager podsの所有者参照は常にJobManagerデプロイメントに設定されます。 デプロイメントがいつ削除されるかを制御するには、}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.referenceを使ってください。 | |
注釈 | ユーザ定義 | }}#kubernetes-jobmanager-annotations">kubernetes.jobmanager.annotations }}#kubernetes-taskmanager-annotations">kubernetes.taskmanager.annotations | Flinkは、Flink設定オプションで指定される追加のアノテーションを追加します。 |
ラベル | Flinkによってマージされます | }}#kubernetes-jobmanager-labels">kubernetes.jobmanager.labels }}#kubernetes-taskmanager-labels">kubernetes.taskmanager.labels | Flinkはユーザ定義値にいくつかの内部ラベルを追加します。 |
Pod Spec
キー | 分類 | 関連する設定オプション | 説明 |
---|---|---|---|
imagePullSecrets | ユーザ定義 | }}#kubernetes-container-image-pull-secrets">kubernetes.container.image.pull-secrets | Flinkは、Flink設定オプションによって指定される追加のプルシークレットを追加します。 |
nodeSelector | ユーザ定義 | }}#kubernetes-jobmanager-node-selector">kubernetes.jobmanager.node-selector }}#kubernetes-taskmanager-node-selector">kubernetes.taskmanager.node-selector | Flinkは、Flink設定オプションによって指定される追加のノードセレクタを追加します。 |
tolerations | ユーザ定義 | }}#kubernetes-jobmanager-tolerations">kubernetes.jobmanager.tolerations }}#kubernetes-taskmanager-tolerations">kubernetes.taskmanager.tolerations | Flinkは、Flink設定オプションで指定される追加の許容範囲を追加します。 |
restartPolicy | Flinkによる定義 | JobManagerの場合は"always"、TaskManager podの場合は"never"
JobManager podは常にデプロイメントによって再起動されます。また、TaskManager podは再起動しないでください。 |
|
serviceAccount | ユーザ定義 | }}#kubernetes-service-account">kubernetes.service-account | JobManagerとTaskManager podsはユーザ定義のサービスアカウントを使って作成されます。 |
volumes | Flinkによってマージされます | Flinkは、Flink設定とHadoop設定を送り出すために必要ないくつかの内部ConfigMap値(例えば、flink-config-volume、hadoop-config-volume)を追加します。 |
Main Container Spec
キー | 分類 | 関連する設定オプション | 説明 |
---|---|---|---|
env | Flinkによってマージされます | }}#forwarding-environment-variables">containerized.master.env.{ENV_NAME} }}#forwarding-environment-variables">containerized.taskmanager.env.{ENV_NAME} | Flinkはユーザ定義値にいくつかの内部環境変数を追加します。 |
image | ユーザ定義 | }}#kubernetes-container-image-ref">kubernetes.container.image.ref | コンテナイメージは、ユーザ定義値の定義された優先順位に従って解決されます。 |
imagePullPolicy | ユーザ定義 | }}#kubernetes-container-image-pull-policy">kubernetes.container.image.pull-policy | コンテナイメージのプルポリシーはユーザ定義値の定義された優先順位に従って解決されます。 |
名前 | Flinkによる定義 | コンテナ名はFlinkによって"flink-main-container"で上書きされます。 | |
resources | ユーザ定義 | Memory: }}#jobmanager-memory-process-size">jobmanager.memory.process.size }}#taskmanager-memory-process-size">taskmanager.memory.process.size CPU: }}#kubernetes-jobmanager-cpu">kubernetes.jobmanager.cpu }}#kubernetes-taskmanager-cpu">kubernetes.taskmanager.cpu |
メモリとcpuリソース(リクエストと制限を含む)は、Flink設定オプションによって上書きされます。他の全てのリソース(例えば一時ストレージ)は保持されます。 |
containerPorts | Flinkによってマージされます | Flinkはいくつかの内部コンテナポート(例えば、rest、jobmanager-rpc、blob、taskmanager-rpc)を追加します。 | |
volumeMounts | Flinkによってマージされます | Flinkは、Flink設定とHadoop設定を送り出すために必要ないくつかのボリュームマウント(例えば、flink-config-volume、hadoop-config-volume)を追加します。 |
Podテンプレートの例 #
pod-template.yaml
apiVersion: v1
kind: Pod
metadata:
name: jobmanager-pod-template
spec:
initContainers:
- name: artifacts-fetcher
image: busybox:latest
# Use wget or other tools to get user jars from remote storage
command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
containers:
# Do not change the main container name
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
limits:
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/volumes/hostpath
name: flink-volume-hostpath
- mountPath: /opt/flink/artifacts
name: flink-artifact
- mountPath: /opt/flink/log
name: flink-logs
# Use sidecar container to push logs to remote storage or do some other debugging things
- name: sidecar-log-collector
image: sidecar-log-collector:latest
command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
volumeMounts:
- mountPath: /flink-logs
name: flink-logs
volumes:
- name: flink-volume-hostpath
hostPath:
path: /tmp
type: Directory
- name: flink-artifact
emptyDir: { }
- name: flink-logs
emptyDir: { }
ユーザのjarsとクラスパス #
FlinkをKubernetesにネイティブにデプロイすると、次のjarがユーザjarとして認識され、ユーザクラスパスに追加されます:
- セッションモード: 起動コマンドで指定されたJARファイル。
- アプリケーションモード: 起動コマンドで指定されたJARファイルと、Flinkの
usrlib
フォルダの全てのJARファイル。
詳細については、クラスローディングのデバッグを参照してください。