This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Kubernetesセットアップ #
はじめに #
このはじめにガイドでは、Kubernetesにセッションクラスタをデプロイする方法を説明します。
はじめに #
このページは、Flinkのスタンドアローンデプロイメントを使って、Kubernetes上にスタンドアローン Flinkクラスタをデプロイする方法を説明します。 一般的に、新規ユーザには、native Kubernetes deploymentsを使ってKubernetes上にFlinkをデプロイすることをお勧めします。
Apache Flinkは、Kubernetes上でFlinkクラスタを管理するためのKubernetesオペレータも提供します。スタンドアローンとネイティブデプロイメントの両方をサポートし、kubernetes上のFlinkリソースのデプロイメント、設定、ライフサイクル管理を大幅に簡素化します。
詳細については、Flink Kubernetesオペレータドキュメントを参照してください。
準備 #
このガイドでは、Kubernetes環境が存在することを想定しています。次のようなコマンドkubectl get nodes
を実行してKubernetesセットアップが動作していることを確認できます。これは接続されている全てのKubeletsを一覧表示します。
Kubernetesをローカルで実行する場合は、MiniKubeをお勧めします。
MiniKubeを使っている場合は、Flinkクラスタをデプロイする前に必ずminikube ssh 'sudo ip link set docker0 promisc on'
を実行してください。そうしなければ、FlinkコンポーネントはKubernetesサービスを介して自身を参照できなくなります。
Kubernetesクラスタの開始(セッションモード) #
Flinkセッションクラスタは長時間実行されるKubernetesデプロイメントとして実行されます。セッションクラスタ上で複数のFlinkジョブを実行できます。 各ジョブは、クラスタのデプロイ後にクラスタに送信する必要があります。
KubernetesでのFlinkセッションクラスタデプロイメントには、少なくとも3つのコンポーネントがあります:
- JobManagerを実行するデプロイメント
- TaskManagersのプールのデプロイメント
- JobManagerのREST APIとUIポートを公開するサービス
共通リソース定義で提供されるファイル内容を使って、次のファイルを作成し、kubectl
コマンドを使ってそれぞれのコンポーネントを作成します。
# Configuration and service definition
$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
# Create the deployments for the cluster
$ kubectl create -f jobmanager-session-deployment-non-ha.yaml
$ kubectl create -f taskmanager-session-deployment.yaml
次に、Flink UIにアクセスしてジョブを送信するためのポート転送をセットアップします:
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
を実行して、jobmanagerのweb uiをlocal 8081に転送します。- ブラウザでhttp://localhost:8081に移動します。
- さらに、以下のコマンドを使ってクラスタにジョブを送信できます:
$ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
次のコマンドを使ってクラスタを破棄できます:
$ kubectl delete -f jobmanager-service.yaml
$ kubectl delete -f flink-configuration-configmap.yaml
$ kubectl delete -f taskmanager-session-deployment.yaml
$ kubectl delete -f jobmanager-session-deployment-non-ha.yaml
デプロイメントモード #
アプリケーションモード #
アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
Flinkアプリケーションクラスタは、単一のアプリケーションを実行する専用のクラスタであり、デプロイメント時に利用可能である必要があります。
Kubernetesでの基本Flinkアプリケーションクラスタには次の3つのコンポーネントがあります:
- JobManagerを実行するアプリケーション
- TaskManagersのプールのデプロイメント
- JobManagerのREST APIとUIポートを公開するサービス
アプリケーションクラスタ固有のリソース定義を確認し、それに応じて調整してください:
jobmanager-job.yaml
のargs
属性は、ユーザジョブのメインクラスを指定する必要があります。
jobmanager-job.yaml
で他のargs
をFlinkイメージに渡す方法を理解するには、JobManager引数を指定する方法も参照してください。
job artifactsは、リソース定義の例のjob-artifacts-volume
から入手できます。
定義の例では、minikubeクラスタ内にコンポーネントを作成することを前提として、ボリュームをホストのローカルディレクトリとしてマウントします。
minikubeクラスタを使わない倍は、Kubernetesクラスタで利用可能な他のタイプのボリュームを使ってjob artifactsを提供できます。
あるいは、代わりにartifactsを既に含んでいるカスタムイメージをビルドすることもできます。
共通のクラスタコンポーネントを作成した後で、アプリケーションクラスタ固有のリソース定義を使って、kubectl
コマンドでクラスタを起動します:
$ kubectl create -f jobmanager-job.yaml
$ kubectl create -f taskmanager-job-deployment.yaml
単一のアプリケーションクラスタを終了するには、kubectl
コマンドを使ってこれらのコンポーネントを共通コンポーネントとともに
削除できます:
$ kubectl delete -f taskmanager-job-deployment.yalm
$ kubectl delete -f jobmanager-job.yaml
セッションモード #
セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
セッションクラスタのデプロイメントについては、このページの上部にあるはじめにガイドで説明されています。
スタンドアローンKubernetes上のFlinkのリファレンス #
設定 #
全ての設定オプションは、設定ページにリストされています。設定オプションはflink-configuration-configmap.yaml
設定マップのflink-conf.yaml
セクションに追加できます。
KubernetesでのFlinkへのアクセス #
その後、Flink UIにアクセスし、様々な方法でジョブを送信できます:
-
kubectl proxy
:- ターミナルで
kubectl proxy
を実行します。 - ブラウザでhttp://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:webui/proxyに移動します。
- ターミナルで
-
kubectl port-forward
:kubectl port-forward ${flink-jobmanager-pod} 8081:8081
を実行して、jobmanagerのweb uiをlocal 8081に転送します。- ブラウザでhttp://localhost:8081に移動します。
- さらに、以下のコマンドを使ってクラスタにジョブを送信できます:
$ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
-
jobmanagerのrestサービス上に
NodePort
サービスを作成します:kubectl create -f jobmanager-rest-service.yaml
を実行し、jobmanagerにNodePort
サービスを作成します。jobmanager-rest-service.yaml
の例は、付録にあります。kubectl get svc flink-jobmanager-rest
を実行して、このサービスnode-port
を確認し、ブラウザでhttp://<public-node-ip>:<node-port>に移動します。- minikubeを使う場合は、
minikube ip
を実行してそのpublic ipを取得できます。 port-forward
ソリューションと同様に、以下のコマンドを使ってジョブをクラスタに送信できます:
$ ./bin/flink run -m <public-node-ip>:<node-port> ./examples/streaming/TopSpeedWindowing.jar
デバッグとログアクセス #
一般的なエラーの多くは、Flinkのログファイルを調べることで簡単に検出できます。Flinkのwebユーザインタフェースに悪っすできる場合は、そこからJobManagerとTaskManagerログにアクセスできます。
Flinkの起動に問題がある場合は、Kubernetesユーティリティを使ってログにアクセスすることもできます。kubectl get pods
を使って全ての実行中のpodsを確認します。
上記のクイックスタートの例では、3つのpodsが表示されます:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-589967dcfc-m49xv 1/1 Running 3 3m32s
flink-taskmanager-64847444ff-7rdl4 1/1 Running 3 3m28s
flink-taskmanager-64847444ff-nnd6m 1/1 Running 3 3m28s
これで、kubectl logs flink-jobmanager-589967dcfc-m49xv
を実行してログにアクセスできるようになりました
スタンドアローンKubernetesによる高可用性 #
Kubernetesでの高可用性のために、既存の高可用性サービスを使うことができます。
Kubernetes高可用性サービス #
セッションモードとアプリケーションモードクラスタは、Kubernetes高可用性サービスの使用をサポートします。 以下のFlink設定オプションをflink-configuration-configmap.yamlに追加する必要があります。
注意 設定されたHAストレージディレクトリのスキームに対応するファイルシステムがランタイムで利用できる必要があります。詳細については、独自のFlinkイメージとプラグインを有効にするを参照してください。
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
...
kubernetes.cluster-id: <cluster-id>
high-availability.type: kubernetes
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy.type: fixed-delay
restart-strategy.fixed-delay.attempts: 10
...
さらに、ConfigMapsを作成、編集、削除する権限を持つサービスアカウントを使って、JobManagerとTaskManager podsを起動する必要があります。 詳細については、podのサービスアカウントを設定する方法を参照してください。
高可用性が有効な場合、Flinkはサービス検出に独自のHA-servicesを使います。
従って、JobManager podsは、KubernetesサービスではなくIPアドレスをjobmanager.rpc.address
として起動する必要があります。
完全な設定については付録を参照してください。
スタンドバイJobManagers #
podがクラッシュするとKubernetesが再起動するため、通常は1つのJobManager podを起動するだけで十分です。
より高速な回復を実現したい場合は、jobmanager-session-deployment-ha.yaml
にreplicas
を設定するか、スタンドバイJobManagerを起動するためにjobmanager-application-ha.yaml
のparallelism
を1
より大きな値に設定します。
リアクティブモードのスタンドアローンKubernetesの使用 #
リアクティブモードでは、アプリケーションクラスタが利用可能なリソースに合わせてjobの並列度を常に調整するモードでFlinkを実行できます。Kubernetesと組み合わせると、TaskManagerデプロイメントのレプリカ数によって利用可能なリソースが決まります。レプリカ数を増やすとジョブをスケールアップし、減らすとスケールダウンがトリガーされます。Horizontal Pod Autoscalerを使って自動的に行うこともできます。
Kubernetesでリアクティブモードを使うには、アプリケーションクラスタを使ってジョブをデプロイすると同じ手順に従います。ただし、設定マップflink-configuration-configmap.yaml
の代わりに、flink-reactive-mode-configuration-configmap.yaml
を使います。Flink用のscheduler-mode: reactive
設定が含まれます。
アプリケーションクラスタをデプロイすると、flink-taskmanager
デプロイメントのレプリカ数を変更することで、ジョブのスケールアップやスケールダウンができます。
ポッド再起動後のローカルリカバリの有効化 #
ポッドの障害が発生した場合のリカバリを高速化するために、Flinkの作業ディレクトリ昨日をローカルリカバリと組み合わせて利用できます。 作業ディレクトリが、再起動されたTaskManagerポッドに再マウントされる永続ボリュームに常駐するように設定されている場合、Flinkはローカルで状態を回復できます。 StatefulSetを使って、Kubernetesはポッドを永続ボリュームにマップするために必要な正確なツールを提供します。
TaskManagersをStatefulSetとしてデプロイすると、永続ボリュームをTaskManagersにマウントするために使われるボリューム要求テンプレートを設定できます。
さらに、決定的なtaskmanager.resource-id
を設定する必要があります。
適切な値は、環境変数を使って公開するpod nameです。
StatefulSet設定の例については、付録をご覧ください。
索引 #
共通のクラスタリソース定義 #
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.pekko.name = org.apache.pekko
logger.pekko.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
flink-reactive-mode-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
scheduler-mode: reactive
execution.checkpointing.interval: 10s
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.pekko.name = org.apache.pekko
logger.pekko.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
jobmanager-service.yaml
非HAモードでのみ必要な、オプションのサービス。
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
jobmanager-rest-service.yaml
. jobmanagerのrest
ポートをパブリックKubernetesノードのポートとして公開するオプションのサービス。
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
セッションクラスタリソース定義 #
jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:latest
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
jobmanager-session-deployment-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1 # Set the value to greater than 1 to start standby JobManagers
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: apache/flink:latest
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: apache/flink:latest
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
Application cluster resource definitions #
jobmanager-application-non-ha.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:latest
env:
args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
jobmanager-application-ha.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
parallelism: 1 # Set the value to greater than 1 to start standby JobManagers
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: apache/flink:latest
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: apache/flink:latest
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /host/path/to/job/artifacts
Local Recovery Enabled TaskManager StatefulSet #
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
state.backend.local-recovery: true
process.taskmanager.working-dir: /pv
---
apiVersion: v1
kind: Service
metadata:
name: taskmanager-hl
spec:
clusterIP: None
selector:
app: flink
component: taskmanager
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: flink-taskmanager
spec:
serviceName: taskmanager-hl
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
securityContext:
runAsUser: 9999
fsGroup: 9999
containers:
- name: taskmanager
image: apache/flink:latest
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
args: ["taskmanager", "-Dtaskmanager.resource-id=$(POD_NAME)"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6121
name: metrics
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: pv
mountPath: /pv
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
volumeClaimTemplates:
- metadata:
name: pv
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 50Gi