Native Kubernetes
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

ネイティブKubernetes #

このページでは、FlinkをKubernetesにネイティブにデプロイする方法について説明します。

はじめに #

はじめにのセクションでは、Kubernetes上で完全に機能するFlinkクラスタをセットアップする方法を説明します。

はじめに #

Kubernetesは、コンピュータアプリケーションデプロイメント、スケーリング、管理を自動化するための人気のあるコンテナオーケストレーションシステムです。 FlinkのネイティブKubernetes統合により、実行中のKubernetesクラスタ上にFlinkを直接デプロイできます。 さらに、FlinkはKubernetesと直接通信できるため、必要なリソースに応じてTaskManagerを動的に割り当てたり割り当てを解除したりできます。

Apache Flinkは、Kubernetes上でFlinkクラスタを管理するためのKubernetesオペレータも提供します。スタンドアローンとネイティブデプロイメントの両方をサポートし、kubernetes上のFlinkリソースのデプロイメント、設定、ライフサイクル管理を大幅に簡素化します。

詳細については、Flink Kubernetesオペレータドキュメントを参照してください。

準備 #

はじめにセクションでは、次の要件を満たすKubernetesクラスタが実行されていることを前提としています。

  • Kubernetes >= 1.9.
  • KubeConfigはポッドとサービスの一覧表示、作成、削除にアクセスでき、~/.kube/config経由で設定できます。kubectl auth can-i <list|create|edit|delete> podsを実行して権限を検証できます。
  • 有効化されたKubernetes DNS。
  • ポッドを作成、削除するためのRBAC権限を持つデフォルトのサービスアカウント。

Kubernetesクラスタのセットアップに問題がある場合は、Kubernetesクラスタをセットアップする方法を参照してください。

Kubernetes上でのFlinkセッションの開始 #

Kubernetesクラスタを実行し、kubectlがそれを指すように設定されると、セッションモードでFlinkクラスタを開始できます

# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

# (2) Submit example job
$ ./bin/flink run \
    --target kubernetes-session \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    ./examples/streaming/TopSpeedWindowing.jar

# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
In default, Flink’s Web UI and REST endpoint are exposed as ClusterIP service. To access the service, please refer to Accessing Flink’s Web UI for instructions.

おめでとう!KubernetesにFlinkをデプロイしてFlinkアプリケーションを正常に実行できました。

Back to top

デプロイメントモード #

プロダクション環境で使うには、アプリケーションモードでFlinkアプリケーションをデプロイすることをお勧めします。これらのモードはアプリケーションの分離を強化するからです。

アプリケーションモード #

アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

アプリケーションモードはユーザコードがFlinkイメージと一緒にバンドルされている必要があります。クラスタ上のユーザコードのmain()メソッドを実行するからです。 アプリケーションモードは、アプリケーションの終了後に全てのFlinkコンポーネントが適切にクリーンアップされるようにします。

Flinkコミュニティは、ユーザコードをバンドルするために使える基本Dockerイメージを提供します:

FROM flink
RUN mkdir -p $FLINK_HOME/usrlib
COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar

custom-image-nameでDockerイメージを作成して公開した後で、次のコマンドを使ってアプリkーションクラスタを開始できます:

$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster \
    -Dkubernetes.container.image.ref=custom-image-name \
    local:///opt/flink/usrlib/my-flink-job.jar

注意 localはアプリケーションモードでサポートされる唯一のスキームです。

kubernetes.cluster-id オプションはクラスタ名を指定し、一意である必要があります。 個のオプションを指定しない場合、Flinkはランダムな名前を生成します。

kubernetes.container.image.refオプションはポッドを開始するイメージを指定します。

アプリケーションクラスタがデプロイされると、それとやりとりできるようになります:

# List running job on the cluster
$ ./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job
$ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-first-application-cluster <jobId>

bin/flinkにkey-valueペア-Dkey=valueを渡すことで、conf/flink-conf.yamlで設定される設定を上書きできます。

セッションモード #

セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

このページの上部にあるはじめにガイドで、セッションクラスタのデプロイメントについて説明しました。

セッションモードは次の2つのモードで実行できます:

  • detachedモード (デフォルト): kubernetes-session.shはKubernetes上にFlinkをデプロイし、終了します。

  • attachedモード (-Dexecution.attached=true): kubernetes-session.shは存続したままで、実行中のFlinkクラスタを制御するためのコマンドを入力できます。 例えば、stopは実行中のセッションクラスタを停止します。 全てのサポートされるコマンドを一覧表示するには、helpを入力してください。

クラスタID my-first-flink-clusterを持つ実行中のセッションクラスタにre-attachするには、次のコマンドを使います:

$ ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true

bin/kubernetes-session.shにkey-valueペア-Dkey=valueを渡すことで、conf/flink-conf.yamlで設定される設定を上書きできます。

実行中のセッションクラスタの停止 #

クラスタID my-first-flink-clusterを持つ実行中のセッションクラスタを停止するには、delete the Flink deploymentか、次のコマンドを使います:

$ echo 'stop' | ./bin/kubernetes-session.sh \
    -Dkubernetes.cluster-id=my-first-flink-cluster \
    -Dexecution.attached=true

Back to top

Kubernetes上のFlinkのリファレンス #

Kubernetes上のFlinkの設定 #

Kubernetes固有の設定オプションは設定ページにリストされています。

FlinkはFabric8 Kubernetes clientを使ってKubernetes APIServerと通信し、Kubernetesリソース(例えば、デプロイメント、ポッド、ConfigMap、サービスなど)を作成/削除するだけでなく、ポッドとConfigMapsを監視します。 上記のFlink設定オプションを除き、Fabric8 Kubernetesクライアントの一部のエキスパートオプションはシステムプロパティまたは環境変数を介して設定できます。

例えば、ユーザは次のFlink設定オプションを使って、同時最大リクエストを設定できます。

containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 200
env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=200"

FlinkのWeb UIへのアクセス #

FlinkのWeb UIとRESTエンドポイントは、kubernetes.rest-service.exposed.type設定オプションを介していくつかの方法で公開できます。

  • ClusterIP: cluster内部IPでサービスを公開します。 サービスはクラスタ内でのみ到達可能です。 JobManager UIにアクセスする場合、またはジョブを既存のセッションに送信する場合、ローカルプロキシを開始する必要があります。 その後、localhost:8081を使って、Flinkジョブをセッションに送信したり、ダッシュボードを表示したりできます。
$ kubectl port-forward service/<ServiceName> 8081
  • NodePort: 静的ポート(NodePort)で各ノードのIPのサービスを公開します。 <NodeIP>:<NodePort>を使って、JobManagerサービスに接続できます。

  • LoadBalancer: クラウドプロバイダのロードバランサを使ってサービスを外部に公開します。 クラウドプロバイダとKubernetesはロードバランサの準備に時間がかかるため、クライアントログにNodePort JobManager Webインタフェースが記録される場合があります。 kubectl get services/<cluster-id>-restを使って、EXTERNAL-IPを取得し、ロードバランサ JobManager Webインタフェースを手動で構築できますhttp://<EXTERNAL-IP>:8081

詳細は、Kubernetesでのサービスの公開の公式ドキュメントを参照してください。

環境によっては、LoadBalancer RESTサービス公開タイプを使ってFlinkクラスタを起動すると、クラスタをパブリックにアクセスできるようになります(通常は任意のコードを実行できます)。

ログ #

Kubernetes統合は、conf/log4j-console.propertiesconf/logback-console.xmlをConfigMapとしてポッドに公開します。 これらのファイルの変更は新しく起動されたクラスタに表示されます。

ログへのアクセス #

デフォルトでは、JobManagerとTaskManagerはログをコンソールと各ポッドの/opt/flink/logに同時に出力します。 STDOUTSTDERR出力はコンソールにのみリダイレクトされます。 以下からアクセスできます

$ kubectl logs <pod-name>

ポッドが実行中の場合は、kubectl exec -it <pod-name> bashを使ってトンネリングしてログを表示したり、プロセスをデバッグしたりすることもできます。

TaskManagersのログへのアクセス #

Flinkは、リソースを無駄にしないように、アイドル状態のTaskManagersの割り当てを自動的に解除します。 この動作により、それぞれのポッドのログへのアクセスが困難になる可能性があります。 ログファイルの調査をする時間を増やすために、resourcemanager.taskmanager-timeoutを設定することで、アイドリング状態のTaskManagersが解放されるまでの時間を長くすることができます。

ログレベルを動的に変更 #

ロガーを自動的に設定の変更を検知するに設定した場合、それぞれのConfigMap(クラスタIDがmy-first-flink-clusterと仮定します)を変更してログレベルを動的に適合させることができます:

$ kubectl edit cm flink-config-my-first-flink-cluster

プラグインの使用 #

pluginsを使うには、それらをFlink JobManager/TaskManagerのポッドの正しい位置にコピーする必要があります。 ボリュームをマウントしたり独自のDockerイメージをビルドしたりせずに、組み込みのプラグインを使うことができます。 例えば、次のコマンドを使ってFlinkセッションクラスタ用のS3プラグインを有効にできます。

$ ./bin/kubernetes-session.sh
    -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.19-SNAPSHOT.jar \
    -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.19-SNAPSHOT.jar

独自のDockerイメージ #

独自のDockerイメージを使いたい場合は、設定オプションkubernetes.container.image.refを介して設定できます。 Flinkコミュニティは、優れた出発点となるリッチなFlink Dockerイメージを提供しています。 プラグインを有効にして依存関係を追加し、依存関係やその他のオプションを追加する方法については、FlinkのDockerイメージをカスタマイズする方法を参照してください。

シークレットの使用 #

Kubernetes Secretsは、パスワード、トークン、キーなどの少量の機密データを含むオブジェクトです。 そのような情報は、ポッド仕様またはイメージに入れられる可能性があります。 Flink on Kubernetesは次の2つの方法でシークレットを使えます:

  • シークレットをポッドからのファイルとして使う;

  • シークレットを環境変数として使う;

シークレットをポッドからのファイルとして使う #

次のコマンドは、開始されたポッドのパス/path/to/secretにシークレットmysecretをマウントします。

$ ./bin/kubernetes-session.sh -Dkubernetes.secrets=mysecret:/path/to/secret

シークレットmysecretのユーザ名とパスワードは、ファイル/path/to/secret/username/path/to/secret/passwordに保存されています。 詳細については、official Kubernetes documentationを参照してください。

シークレットを環境変数として使う #

次のコマンドは、開始されたポッドの環境変数としてシークレットmysecretを公開します。

$ ./bin/kubernetes-session.sh -Dkubernetes.env.secretKeyRef=\
    env:SECRET_USERNAME,secret:mysecret,key:username;\
    env:SECRET_PASSWORD,secret:mysecret,key:password

環境変数SECRET_USERNAMEはユーザ名を含み、環境変数SECRET_PASSWORDはシークレットmysecretのパスワードを含みます。 詳細については、official Kubernetes documentationを参照してください。

Kubernetesでの高可用性 #

Kubernetesでの高可用性のために、既存の高可用性サービスを使うことができます。

スタンドバイJobManagerを起動するために、kubernetes.jobmanager.replicasの値を1より大きい値に設定します。 より早い回復を達成するのに役立ちます。 スタンドバイJobManagerを起動する時に高可用性を有効にする必要があることに注意してください。

手動リソースクリーンアップ #

FlinkはKubernetes OwnerReference’sを使って全てのクラスタコンポーネントをクリーンアップします。 ConfigMapServicePodを含む全てのFlinkが作成したリソースは、OwnerReferencedeployment/<cluster-id>に設定されています。 デプロイメントが削除された場合、全ての関係するリソースは自動的に削除されます。

$ kubectl delete deployment/<cluster-id>

サポートされるKubernetesバージョン #

現在、全てのKubernetesバージョン>= 1.9がサポートされます。

名前空間 #

Kubernetesの名前空間は、resource quotasを介して複数のユーザ間でクラスタリソースを分割します。 Kubernetes上のFlinkは名前空間を使ってFlinkクラスタを起動します。 名前空間はkubernetes.namespaceを介して設定できます。

RBAC #

ロールベースのアクセス制御(RBAC)は、企業内のユーザのロールに基づいてコンピュータリソースまたはネットワークリソースへのアクセスを制御する方法です。 ユーザは、JobManagerがKubernetesクラスタ内のKubernetes APIサーバにアクセスするために使うRBACロールとサービスアカウントを設定できます。

全ての名前空間にはデフォルトのサービスアカウントがあります。ただし、defaultサービスアカウントにはKubernetesクラスタ内のポッドを作成または削除する権限が無い場合があります。 ユーザは、defaultのサービスアカウントの権限を更新するか、適切なロールがバインドされている別のサービスアカウントを指定する必要がある場合があります。

$ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

defaultサービスアカウントを使いたくない場合は、次のコマンドを使って新しいサービスアカウントflink-service-accountを作成し、ロールバインドを設定します。 次に設定オプション-Dkubernetes.service-account=flink-service-accountを使って、JobManagerポッドがflink-service-accountサービスアカウントを作成してTaskManagerとリーダーのConfigMapを作成/削除するようにします。 また、これにより、TaskManagerがリーダーのConfigMapsを監視して、JobManagerとResourceManagerのアドレスを取り出すことができます。

$ kubectl create serviceaccount flink-service-account
$ kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink-service-account

詳細については、RBAC認証の公式Kubernetesドキュメントを参照してください。

Podテンプレート #

Flinkによりユーザはテンプレートファイルを介してJobManagerとTaskMangerポッドを定義できます。これにより、FlinkのKubernetes設定オプションで直接サポートされていない高度な機能をサポートできるようになります。 kubernetes.pod-template-file.defaultを使って、ポッドの定義を含むローカルファイルを指定します。これは、JobManagerとTaskManagerを初期化するために使われます。 メインコンテナはflink-main-containerという名前で定義する必要があります。 詳細については、podテンプレートの例を参照してください。

Flinkによって上書きされるフィールド #

podテンプレートの一部のフィールドはFlinkによって上書きされます。 有向フィールド値を解決する仕組みは次のように分類できます:

  • Flinkによる定義: ユーザは設定できません。

  • ユーザによる定義: ユーザが自由にこの値を設定できます。Flinkフレームワークは追加の値を設定せず、有効な値は設定オプションとテンプレートから引き出されます。

    優先順位: 最初に明示的な設定オプション値が取得され、次にpodテンプレートないの値が取得され、何も指定されていない場合は最後に設定オプションのデフォルト値が取得されます。

  • Flinkとの結合: Flinkは設定の値とユーザ定義の値と結合します(“ユーザによる定義"の優先順位を参照)。同じ名前のフィールドの場合、Flink値が優先されます。

上書きされるpodフィールドの完全なリストについては、以下の表を参照してください。 表にリストされていないpodテンプレートで定義されている全てのフィールドは、影響を受けません。

Pod Metadata

キー 分類 関連する設定オプション 説明
名前 Flinkによる定義 JobManagerのpod名は、}}#kubernetes-cluster-id">kubernetes.cluster-idで定義されるデプロイメントで上書きされます。 TaskManagerのpod名は、FlinkのResourceManagerで生成されるパターン<clusterID>-<attempt>-<index>で上書きされます。
名前空間 ユーザ定義 }}#kubernetes-namespace">kubernetes.namespace JobManagerデプロイメントと TaskManager podsは両方ともユーザ定義の名前空間で作成されます。
ownerReferences Flinkによる定義 JobManagerとTaskManager podsの所有者参照は常にJobManagerデプロイメントに設定されます。 デプロイメントがいつ削除されるかを制御するには、}}#kubernetes-jobmanager-owner-reference">kubernetes.jobmanager.owner.referenceを使ってください。
注釈 ユーザ定義 }}#kubernetes-jobmanager-annotations">kubernetes.jobmanager.annotations }}#kubernetes-taskmanager-annotations">kubernetes.taskmanager.annotations Flinkは、Flink設定オプションで指定される追加のアノテーションを追加します。
ラベル Flinkによってマージされます }}#kubernetes-jobmanager-labels">kubernetes.jobmanager.labels }}#kubernetes-taskmanager-labels">kubernetes.taskmanager.labels Flinkはユーザ定義値にいくつかの内部ラベルを追加します。

Pod Spec

キー 分類 関連する設定オプション 説明
imagePullSecrets ユーザ定義 }}#kubernetes-container-image-pull-secrets">kubernetes.container.image.pull-secrets Flinkは、Flink設定オプションによって指定される追加のプルシークレットを追加します。
nodeSelector ユーザ定義 }}#kubernetes-jobmanager-node-selector">kubernetes.jobmanager.node-selector }}#kubernetes-taskmanager-node-selector">kubernetes.taskmanager.node-selector Flinkは、Flink設定オプションによって指定される追加のノードセレクタを追加します。
tolerations ユーザ定義 }}#kubernetes-jobmanager-tolerations">kubernetes.jobmanager.tolerations }}#kubernetes-taskmanager-tolerations">kubernetes.taskmanager.tolerations Flinkは、Flink設定オプションで指定される追加の許容範囲を追加します。
restartPolicy Flinkによる定義 JobManagerの場合は"always"、TaskManager podの場合は"never"
JobManager podは常にデプロイメントによって再起動されます。また、TaskManager podは再起動しないでください。
serviceAccount ユーザ定義 }}#kubernetes-service-account">kubernetes.service-account JobManagerとTaskManager podsはユーザ定義のサービスアカウントを使って作成されます。
volumes Flinkによってマージされます Flinkは、Flink設定とHadoop設定を送り出すために必要ないくつかの内部ConfigMap値(例えば、flink-config-volume、hadoop-config-volume)を追加します。

Main Container Spec

キー 分類 関連する設定オプション 説明
env Flinkによってマージされます }}#forwarding-environment-variables">containerized.master.env.{ENV_NAME} }}#forwarding-environment-variables">containerized.taskmanager.env.{ENV_NAME} Flinkはユーザ定義値にいくつかの内部環境変数を追加します。
image ユーザ定義 }}#kubernetes-container-image-ref">kubernetes.container.image.ref コンテナイメージは、ユーザ定義値の定義された優先順位に従って解決されます。
imagePullPolicy ユーザ定義 }}#kubernetes-container-image-pull-policy">kubernetes.container.image.pull-policy コンテナイメージのプルポリシーはユーザ定義値の定義された優先順位に従って解決されます。
名前 Flinkによる定義 コンテナ名はFlinkによって"flink-main-container"で上書きされます。
resources ユーザ定義 Memory:
}}#jobmanager-memory-process-size">jobmanager.memory.process.size }}#taskmanager-memory-process-size">taskmanager.memory.process.size
CPU:
}}#kubernetes-jobmanager-cpu">kubernetes.jobmanager.cpu }}#kubernetes-taskmanager-cpu">kubernetes.taskmanager.cpu
メモリとcpuリソース(リクエストと制限を含む)は、Flink設定オプションによって上書きされます。他の全てのリソース(例えば一時ストレージ)は保持されます。
containerPorts Flinkによってマージされます Flinkはいくつかの内部コンテナポート(例えば、rest、jobmanager-rpc、blob、taskmanager-rpc)を追加します。
volumeMounts Flinkによってマージされます Flinkは、Flink設定とHadoop設定を送り出すために必要ないくつかのボリュームマウント(例えば、flink-config-volume、hadoop-config-volume)を追加します。

Podテンプレートの例 #

pod-template.yaml

apiVersion: v1
kind: Pod
metadata:
  name: jobmanager-pod-template
spec:
  initContainers:
    - name: artifacts-fetcher
      image: busybox:latest
      # Use wget or other tools to get user jars from remote storage
      command: [ 'wget', 'https://path/of/StateMachineExample.jar', '-O', '/flink-artifact/myjob.jar' ]
      volumeMounts:
        - mountPath: /flink-artifact
          name: flink-artifact
  containers:
    # Do not change the main container name
    - name: flink-main-container
      resources:
        requests:
          ephemeral-storage: 2048Mi
        limits:
          ephemeral-storage: 2048Mi
      volumeMounts:
        - mountPath: /opt/flink/volumes/hostpath
          name: flink-volume-hostpath
        - mountPath: /opt/flink/artifacts
          name: flink-artifact
        - mountPath: /opt/flink/log
          name: flink-logs
      # Use sidecar container to push logs to remote storage or do some other debugging things
    - name: sidecar-log-collector
      image: sidecar-log-collector:latest
      command: [ 'command-to-upload', '/remote/path/of/flink-logs/' ]
      volumeMounts:
        - mountPath: /flink-logs
          name: flink-logs
  volumes:
    - name: flink-volume-hostpath
      hostPath:
        path: /tmp
        type: Directory
    - name: flink-artifact
      emptyDir: { }
    - name: flink-logs
      emptyDir: { }

ユーザのjarsとクラスパス #

FlinkをKubernetesにネイティブにデプロイすると、次のjarがユーザjarとして認識され、ユーザクラスパスに追加されます:

  • セッションモード: 起動コマンドで指定されたJARファイル。
  • アプリケーションモード: 起動コマンドで指定されたJARファイルと、Flinkのusrlibフォルダの全てのJARファイル。

詳細については、クラスローディングのデバッグを参照してください。

Back to top

inserted by FC2 system