Kubernetes
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Kubernetesセットアップ #

はじめに #

このはじめにガイドでは、Kubernetesセッションクラスタをデプロイする方法を説明します。

はじめに #

このページは、Flinkのスタンドアローンデプロイメントを使って、Kubernetes上にスタンドアローン Flinkクラスタをデプロイする方法を説明します。 一般的に、新規ユーザには、native Kubernetes deploymentsを使ってKubernetes上にFlinkをデプロイすることをお勧めします。

Apache Flinkは、Kubernetes上でFlinkクラスタを管理するためのKubernetesオペレータも提供します。スタンドアローンとネイティブデプロイメントの両方をサポートし、kubernetes上のFlinkリソースのデプロイメント、設定、ライフサイクル管理を大幅に簡素化します。

詳細については、Flink Kubernetesオペレータドキュメントを参照してください。

準備 #

このガイドでは、Kubernetes環境が存在することを想定しています。次のようなコマンドkubectl get nodesを実行してKubernetesセットアップが動作していることを確認できます。これは接続されている全てのKubeletsを一覧表示します。

Kubernetesをローカルで実行する場合は、MiniKubeをお勧めします。

MiniKubeを使っている場合は、Flinkクラスタをデプロイする前に必ずminikube ssh 'sudo ip link set docker0 promisc on'を実行してください。そうしなければ、FlinkコンポーネントはKubernetesサービスを介して自身を参照できなくなります。

Kubernetesクラスタの開始(セッションモード) #

Flinkセッションクラスタは長時間実行されるKubernetesデプロイメントとして実行されます。セッションクラスタ上で複数のFlinkジョブを実行できます。 各ジョブは、クラスタのデプロイ後にクラスタに送信する必要があります。

KubernetesでのFlinkセッションクラスタデプロイメントには、少なくとも3つのコンポーネントがあります:

  • JobManagerを実行するデプロイメント
  • TaskManagersのプールのデプロイメント
  • JobManagerのREST APIとUIポートを公開するサービス

共通リソース定義で提供されるファイル内容を使って、次のファイルを作成し、kubectlコマンドを使ってそれぞれのコンポーネントを作成します。

    # Configuration and service definition
    $ kubectl create -f flink-configuration-configmap.yaml
    $ kubectl create -f jobmanager-service.yaml
    # Create the deployments for the cluster
    $ kubectl create -f jobmanager-session-deployment-non-ha.yaml
    $ kubectl create -f taskmanager-session-deployment.yaml

次に、Flink UIにアクセスしてジョブを送信するためのポート転送をセットアップします:

  1. kubectl port-forward ${flink-jobmanager-pod} 8081:8081を実行して、jobmanagerのweb uiをlocal 8081に転送します。
  2. ブラウザでhttp://localhost:8081に移動します。
  3. さらに、以下のコマンドを使ってクラスタにジョブを送信できます:
$ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar

次のコマンドを使ってクラスタを破棄できます:

    $ kubectl delete -f jobmanager-service.yaml
    $ kubectl delete -f flink-configuration-configmap.yaml
    $ kubectl delete -f taskmanager-session-deployment.yaml
    $ kubectl delete -f jobmanager-session-deployment-non-ha.yaml

Back to top

デプロイメントモード #

アプリケーションモード #

アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

Flinkアプリケーションクラスタは、単一のアプリケーションを実行する専用のクラスタであり、デプロイメント時に利用可能である必要があります。

Kubernetesでの基本Flinkアプリケーションクラスタには次の3つのコンポーネントがあります:

  • JobManagerを実行するアプリケーション
  • TaskManagersのプールのデプロイメント
  • JobManagerのREST APIとUIポートを公開するサービス

アプリケーションクラスタ固有のリソース定義を確認し、それに応じて調整してください:

jobmanager-job.yamlargs属性は、ユーザジョブのメインクラスを指定する必要があります。 jobmanager-job.yamlで他のargsをFlinkイメージに渡す方法を理解するには、JobManager引数を指定する方法も参照してください。

job artifactsは、リソース定義の例job-artifacts-volumeから入手できます。 定義の例では、minikubeクラスタ内にコンポーネントを作成することを前提として、ボリュームをホストのローカルディレクトリとしてマウントします。 minikubeクラスタを使わない倍は、Kubernetesクラスタで利用可能な他のタイプのボリュームを使ってjob artifactsを提供できます。 あるいは、代わりにartifactsを既に含んでいるカスタムイメージをビルドすることもできます。

共通のクラスタコンポーネントを作成した後で、アプリケーションクラスタ固有のリソース定義を使って、kubectlコマンドでクラスタを起動します:

    $ kubectl create -f jobmanager-job.yaml
    $ kubectl create -f taskmanager-job-deployment.yaml

単一のアプリケーションクラスタを終了するには、kubectlコマンドを使ってこれらのコンポーネントを共通コンポーネントとともに 削除できます:

    $ kubectl delete -f taskmanager-job-deployment.yalm
    $ kubectl delete -f jobmanager-job.yaml

セッションモード #

セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

セッションクラスタのデプロイメントについては、このページの上部にあるはじめにガイドで説明されています。

Back to top

スタンドアローンKubernetes上のFlinkのリファレンス #

設定 #

全ての設定オプションは、設定ページにリストされています。設定オプションはflink-configuration-configmap.yaml設定マップのflink-conf.yamlセクションに追加できます。

KubernetesでのFlinkへのアクセス #

その後、Flink UIにアクセスし、様々な方法でジョブを送信できます:

  • kubectl proxy:

    1. ターミナルでkubectl proxyを実行します。
    2. ブラウザでhttp://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxyに移動します。
  • kubectl port-forward:

    1. kubectl port-forward ${flink-jobmanager-pod} 8081:8081を実行して、jobmanagerのweb uiをlocal 8081に転送します。
    2. ブラウザでhttp://localhost:8081に移動します。
    3. さらに、以下のコマンドを使ってクラスタにジョブを送信できます:
    $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
    
  • jobmanagerのrestサービス上にNodePortサービスを作成します:

    1. kubectl create -f jobmanager-rest-service.yamlを実行し、jobmanagerにNodePortサービスを作成します。jobmanager-rest-service.yamlの例は、付録にあります。
    2. kubectl get svc flink-jobmanager-restを実行して、このサービスnode-portを確認し、ブラウザでhttp://<public-node-ip>:<node-port>に移動します。
    3. minikubeを使う場合は、minikube ipを実行してそのpublic ipを取得できます。
    4. port-forwardソリューションと同様に、以下のコマンドを使ってジョブをクラスタに送信できます:
    $ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar
    

デバッグとログアクセス #

一般的なエラーの多くは、Flinkのログファイルを調べることで簡単に検出できます。Flinkのwebユーザインタフェースに悪っすできる場合は、そこからJobManagerとTaskManagerログにアクセスできます。

Flinkの起動に問題がある場合は、Kubernetesユーティリティを使ってログにアクセスすることもできます。kubectl get podsを使って全ての実行中のpodsを確認します。 上記のクイックスタートの例では、3つのpodsが表示されます:

$ kubectl get pods
NAME                                 READY   STATUS             RESTARTS   AGE
flink-jobmanager-589967dcfc-m49xv    1/1     Running            3          3m32s
flink-taskmanager-64847444ff-7rdl4   1/1     Running            3          3m28s
flink-taskmanager-64847444ff-nnd6m   1/1     Running            3          3m28s

これで、kubectl logs flink-jobmanager-589967dcfc-m49xvを実行してログにアクセスできるようになりました

スタンドアローンKubernetesによる高可用性 #

Kubernetesでの高可用性のために、既存の高可用性サービスを使うことができます。

Kubernetes高可用性サービス #

セッションモードとアプリケーションモードクラスタは、Kubernetes高可用性サービスの使用をサポートします。 以下のFlink設定オプションをflink-configuration-configmap.yamlに追加する必要があります。

注意 設定されたHAストレージディレクトリのスキームに対応するファイルシステムがランタイムで利用できる必要があります。詳細については、独自のFlinkイメージプラグインを有効にするを参照してください。

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
  ...
    kubernetes.cluster-id: <cluster-id>
    high-availability.type: kubernetes
    high-availability.storageDir: hdfs:///flink/recovery
    restart-strategy.type: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
  ...  

さらに、ConfigMapsを作成、編集、削除する権限を持つサービスアカウントを使って、JobManagerとTaskManager podsを起動する必要があります。 詳細については、podのサービスアカウントを設定する方法を参照してください。

高可用性が有効な場合、Flinkはサービス検出に独自のHA-servicesを使います。 従って、JobManager podsは、KubernetesサービスではなくIPアドレスをjobmanager.rpc.addressとして起動する必要があります。 完全な設定については付録を参照してください。

スタンドバイJobManagers #

podがクラッシュするとKubernetesが再起動するため、通常は1つのJobManager podを起動するだけで十分です。 より高速な回復を実現したい場合は、jobmanager-session-deployment-ha.yamlreplicasを設定するか、スタンドバイJobManagerを起動するためにjobmanager-application-ha.yamlparallelism1より大きな値に設定します。

リアクティブモードのスタンドアローンKubernetesの使用 #

リアクティブモードでは、アプリケーションクラスタが利用可能なリソースに合わせてjobの並列度を常に調整するモードでFlinkを実行できます。Kubernetesと組み合わせると、TaskManagerデプロイメントのレプリカ数によって利用可能なリソースが決まります。レプリカ数を増やすとジョブをスケールアップし、減らすとスケールダウンがトリガーされます。Horizontal Pod Autoscalerを使って自動的に行うこともできます。

Kubernetesでリアクティブモードを使うには、アプリケーションクラスタを使ってジョブをデプロイすると同じ手順に従います。ただし、設定マップflink-configuration-configmap.yamlの代わりに、flink-reactive-mode-configuration-configmap.yamlを使います。Flink用のscheduler-mode: reactive設定が含まれます。

アプリケーションクラスタをデプロイすると、flink-taskmanagerデプロイメントのレプリカ数を変更することで、ジョブのスケールアップやスケールダウンができます。

ポッド再起動後のローカルリカバリの有効化 #

ポッドの障害が発生した場合のリカバリを高速化するために、Flinkの作業ディレクトリ昨日をローカルリカバリと組み合わせて利用できます。 作業ディレクトリが、再起動されたTaskManagerポッドに再マウントされる永続ボリュームに常駐するように設定されている場合、Flinkはローカルで状態を回復できます。 StatefulSetを使って、Kubernetesはポッドを永続ボリュームにマップするために必要な正確なツールを提供します。

TaskManagersをStatefulSetとしてデプロイすると、永続ボリュームをTaskManagersにマウントするために使われるボリューム要求テンプレートを設定できます。 さらに、決定的なtaskmanager.resource-idを設定する必要があります。 適切な値は、環境変数を使って公開するpod nameです。 StatefulSet設定の例については、付録をご覧ください。

Back to top

索引 #

共通のクラスタリソース定義 #

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.pekko.name = org.apache.pekko
    logger.pekko.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    

flink-reactive-mode-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2
    scheduler-mode: reactive
    execution.checkpointing.interval: 10s    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.pekko.name = org.apache.pekko
    logger.pekko.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    

jobmanager-service.yaml 非HAモードでのみ必要な、オプションのサービス。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml. jobmanagerのrestポートをパブリックKubernetesノードのポートとして公開するオプションのサービス。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

セッションクラスタリソース定義 #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:latest
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

jobmanager-session-deployment-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1 # Set the value to greater than 1 to start standby JobManagers
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:latest
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
        args: ["jobmanager", "$(POD_IP)"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:latest
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

Application cluster resource definitions #

jobmanager-application-non-ha.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: apache/flink:latest
          env:
          args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: job-artifacts-volume
              mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
            path: /host/path/to/job/artifacts

jobmanager-application-ha.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: apache/flink:latest
          env:
          - name: POD_IP
            valueFrom:
              fieldRef:
                apiVersion: v1
                fieldPath: status.podIP
          # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
          args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: job-artifacts-volume
              mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
            path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:latest
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: job-artifacts-volume
          mountPath: /opt/flink/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        hostPath:
          path: /host/path/to/job/artifacts

Local Recovery Enabled TaskManager StatefulSet #

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    state.backend.local-recovery: true
    process.taskmanager.working-dir: /pv    
---
apiVersion: v1
kind: Service
metadata:
  name: taskmanager-hl
spec:
  clusterIP: None
  selector:
    app: flink
    component: taskmanager
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-taskmanager
spec:
  serviceName: taskmanager-hl
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      securityContext:
        runAsUser: 9999
        fsGroup: 9999
      containers:
      - name: taskmanager
        image: apache/flink:latest
        env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
        args: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6121
          name: metrics
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: pv
          mountPath: /pv
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
  volumeClaimTemplates:
  - metadata:
      name: pv
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 50Gi

Back to top

inserted by FC2 system