This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Dockerのセットアップ #
はじめに #
このはじめにセクションでは、Dockerコンテナを使ってFlinkクラスタのローカルセットアップ(1台のマシン上だが別のコンテナ)を通じて案内します。
はじめに #
Dockerは人気のあるコンテナランタイムです。 Apache Flinkの公式のDockerイメージはDocker Hubで入手できます。 Dockerイメージを使って、Docker上にセッションまたはアプリケーションクラスタをデプロイできます。このページでは、DockerとDocker ComposerでのFlinkのセットアップに焦点を当てます。
スタンドアロンKubernetesやネイティブKubernetesなどの管理されたコンテナ化環境へのデプロイは、別のページで説明されます。
Dockerでのセッションクラスタの開始 #
Flinkセッションクラスタを使って複数のジョブを実行できます。各ジョブは、クラスタのデプロイ後にクラスタに送信する必要があります。 Dockerを使ってFlinkセッションクラスタをデプロイするには、JobManagerコンテナを開始する必要があります。コンテナ間の通信を有効にするために、最初に必要なFlink設定プロパティを設定し、次のネットワークを設定します:
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network
次にJobManagerを起動します:
$ docker run \
--rm \
--name=jobmanager \
--network flink-network \
--publish 8081:8081 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest jobmanager
1つ以上のTaskManagerコンテナ:
$ docker run \
--rm \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:latest taskmanager
webインタフェースはlocalhost:8081で利用できるようになりました。
ジョブの送信は次のように送信できるようになりました(Flinkのローカルディストリビューションが利用可能だと仮定します):
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
クラスタをシャットダウンするには、JobManagerとTaskManagerプロセスを(CTRL-C
などで)終了するか、docker ps
を使ってコンテナを識別し、docker stop
を使ってコンテナを終了します。
デプロイメントモード #
Flinkイメージには、でふぉるとせっていと標準のエントリポイントスクリプトを備えた通常のFlinkディストリビューションが含まれます。 エントリポイントは、次のモードで実行できます:
- JobManager セッションクラス用
- JobManager アプリケーションクラスタ用
- TaskManager 任意のクラスタ用
これにより、任意のコンテナ化環境にスタンドアローンクラスタ(セッションまたはアプリケーションモード)をデプロイできます。例えば:
- ローカルのDockerセットアップ、
- Kubernetesクラスタ、
- Docker Compose、
注意 ネイティブKubernetesもデフォルトで同じイメージを実行し、オンデマンドでTaskManagerをデプロイするため、手動で行う必要はありません。
次の章では、様々な目的で単一のFlink Dockerコンテナを開始する方法を説明します。
Docker上でFlinkを開始すると、localhost:8081でFlink Web UIにアクセスしたり、./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
のようなジョブを送信することができます。
システム設定を簡単にするために、セッションモードでFlinkを配備するためにDocker Composeを使うことをお勧めします。
アプリケーションモード #
アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
Flinkアプリケーションクラスタは単一のジョブを実行する専用のクラスタです。 この場合、ジョブを1つのステップとしてクラスタをデプロイするため、追加のジョブの送信は必要ありません。
job artifactsはコンテナ内のFlinkのJVMプロセスのクラスパスに含まれており、次のもので構成されます:
- jobのjar、通常はセッションクラスタに送信します。
- Flinkには含まれていない、その他の全ての必要な依存関係とリソース。
Dockerを使って単一ジョブのクラスタをデプロイするには、
/opt/flink/usrlib
の下である全てのコンテナでローカルでjob artifactsを利用できるようにし、- アプリケーションクラスタモードのJobManagerコンテナを開始し、
- 必要な数のTaskManagerコンテナを起動します。
ローカルでjob artifactsを利用可能にするには、
-
JobManagerとTaskManagerの起動時に、artifactsを含むボリュームをマウント (または複数のボリューム)
/opt/flink/usrlib
します:$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network
$ docker run
–mount type=bind,src=../../../../../host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1
–mount type=bind,src=../../../../../host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2
–rm
–env FLINK_PROPERTIES="${FLINK_PROPERTIES}"
–name=jobmanager
–network flink-network
flink:latest standalone-job
–job-classname com.job.ClassName
[–job-id
[–fromSavepoint /path/to/savepoint [–allowNonRestoredState]]
[job arguments]
$ docker run
–mount type=bind,src=../../../../../host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1
–mount type=bind,src=../../../../../host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2
–env FLINK_PROPERTIES="${FLINK_PROPERTIES}"
flink:latest taskmanager
* 独自の`Dockerfile`を作成して**Flinkイメージを拡張**し、それをビルドして、JobManagerとTaskManagersの起動に使います:
```dockerfile
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2
$ docker build --tag flink_with_job_artifacts .
$ docker run \
flink_with_job_artifacts standalone-job \
--job-classname com.job.ClassName \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
$ docker run flink_with_job_artifacts taskmanager
standalone-job
引数は、アプリケーションモードでJobManagerコンテナを起動します。
JobManager additional command line arguments #
次の追加のコマンドライン引数をクラスタのエントリポイントに指定できます:
-
--job-classname <job class name>
: 実行するジョブのクラス名。デフォルトでは、FlinkはMain-Classまたはprogram-class manifest entryを持つJARをクラスパスでスキャンし、それをジョブクラスとして選択します。 このコマンドライン引数を使って、手動でジョブのクラスを設定します。 この引数は、Tそのようなマニフェストエントリを持つJarがクラスパス上に存在しないか、1つ以上ある場合に必要です。
-
--job-id <job id>
(王pション): ジョブのFlinkジョブIDを手動で設定します: 00000000000000000000000000000000) -
--fromSavepoint /path/to/savepoint
(オプション): セーブポイントから回復しますセーブポイントから再開するには、セーブポイントのパスも渡す必要があります。
/path/to/savepoint
は、クラスタの全てのDockerコンテナからアクセスできる必要があることに注意してください(例えば、DFSに保存するか、マウントされたボリュームか、それをイメージに追加します)。 -
--allowNonRestoredState
(オプション): 壊れたセーブポイントの状態をスキップしますさらにこの引数を指定して、復元できないセーブポイントの状態をスキップできるようにすることもできます。
ユーザジョブのメインクラスのmain関数が引数を受け取る場合は、docker run
コマンドの最後にそれらを渡すこともできます。
セッションモード #
セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。
セッションモードでのローカルデプロイメントについては、上記の始めにのセクションで既に説明しました。
Flink Dockerイメージ #
イメージのホスティング #
Flink Dockerイメージには2つの配布チャンネルがあります:
- Docker Hub上の公式Flinkイメージ(Dockerによってレビューおよびビルドされます)
- Docker Hubの
apache/flink
上のFlinkイメージ(Flink開発者によって管理されます)
Dockerによってレビューされているため、Docker Hubの公式イメージを使うことをお勧めします。apache/flink
上のイメージは、Dockerによるレビュープロセスで遅延がある場合に備えて提供されます。
flink:latest
という名前のイメージを起動すると、Docker Hubから最新のイメージをpullされます。apache/flink
でホストされているイメージを使う場合は、flink
をapache/flink
で置き換えてください。いずれのイメージタグ(Flink 1.11.3以降)も、apache/flink
で利用可能です。
イメージのタグ #
Flink DockerリポジトリはDocker Hubでホストされ、Flinkバージョン1.2.1以降のイメージを提供します。 これらのイメージのソースは、Apache flink-dockerリポジトリにあります。
FlinkバージョンとScalaバージョンのサポートされている組み合わせごとにイメージが利用可能で、tag aliasesは便宜上提供されています。
例えば、次のaliasesを利用できます:
flink:latest
→flink:<latest-flink>-scala_<latest-scala>
flink:1.11
→flink:1.11.<latest-flink-1.11>-scala_2.12
注意 必要なFlinkとScalaの両方のバージョンを指定するdockerイメージのバージョンタグ(例えば、flink:1.11-scala_2.12
)を常に明示的に使うことをお勧めします。
これにより、アプリケーションで使われるFlinkおよび/またはScalaのバージョンが、dockerイメージで提供されるバージョンと異なる場合に発生する可能性のあるクラスの競合が回避されます。
注意 Flink 1.5バージョンより前は、Hadoopの依存関係が常にFlinkにバンドルされていました。
特定のタグにHadoopのバージョンが含まれていることがわかります(例えば、-hadoop28
)。
Flink 1.5以降、Hadoopのバージョンが省略されたイメージタグは、バンドルされたHadoopディストリビューションが含まないFlinkのHadoopフリーのリリースに対応します。
Docker Composeを使ったFlink #
Docker Composeは、Dockerコンテナのグループをローカルで実行する方法です。 次のセクションでは、Flinkを実行する設定ファイルの例を示します。
概要 #
-
ocker-compose.yamlファイルを作成します。以下のセクションの例を確認してください:
-
フォアグラウンドでクラスタを起動します(バックグランドは
-d
を使います)$ docker-compose up
-
クラスタを
N
個のTaskManagersまで、スケールアップまたはスケールダウンします$ docker-compose scale taskmanager=<N>
-
JobManagerコンテナにアクセスします
$ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
-
クラスタをkillします
$ docker-compose down
-
Web UIにアクセスします
クラスタが実行中の場合、http://localhost:8081でweb UIにアクセスできます。
アプリケーションモード #
アプリケーションモードでは、イメージにバンドルされているFlinkジョブだけを実行する専用のFlinkクラスタを起動します。
従って、アプリケーションごとに専用のFlinkイメージをビルドする必要があります。
詳細はhereをご確認ください。
jobmanager
サービスのcommand
でJobManagerの引数を指定する方法も参照してください。
Application Modeのdocker-compose.yml
。
version: "2.2"
services:
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
セッションモード #
セッションモードでは、docker-composeを使って長時間実行されるFlinkクラスタを起動し、そこにジョブを送信できます。
Session Modeのdocker-compose.yml
:
version: "2.2"
services:
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
セッションクラスタを使ったFlink SQLクライアント #
この例では、長時間実行されるセッションクラスタを起動し、このクラスタを使ってジョブを送信するFlnk SQL CLIを起動します。
Session Clusterを使ったFlink SQLクライアントのdocker-compose.yml
:
version: "2.2"
services:
jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:latest
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
sql-client:
image: flink:latest
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
-
SQLクライアント開始するには、次を実行します
docker-compose run sql-client
次に、テーブルの作成を開始し、それらに対してクエリを実行できます。
-
全ての必要な依存関係(例えば、コネクタなど)がクライアントだけでなくクラスタでも利用可能である必要があることに注意してください。 例えば、Kafkaコネクタを使いたい場合は、次のDockerfileで独自のイメージを作成します
FROM flink:latest
RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.19-SNAPSHOT/flink-sql-connector-kafka_scala_2.12-1.19-SNAPSHOT.jar
Dockerファイルで(例えば`build`コマンドを経由して)それを参照します。
`ADD JAR`のようなSQLコマンドは、ローカルファイルシステム(この場合は、Dockerのオーバーレイファイルシステム)でのみ動作するため、ホストマシン上にあるJARに対しては動作しません。
## Docker上のFlink Pythonの使用
PythonとPyFlinkが用意された独自のイメージを構築するには、次のDockerfileを参照できます:
```Dockerfile
FROM flink:latest
# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python
# install PyFlink
COPY apache-flink*.tar.gz /
RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install /apache-flink*.tar.gz
Note PyFlink packages could be built according to development guide
pyflink:latestという名前のイメージをビルドします:
$ docker build --tag pyflink:latest .
DockerでFlinkを設定します #
動的プロパティ経由 #
$ docker run flink:latest \
<jobmanager|standalone-job|taskmanager|historyserver> \
-D jobmanager.rpc.address=host \
-D taskmanager.numberOfTaskSlots=3 \
-D blob.server.port=6124
動的プロパティ経由で設定されたオプションは、flink-conf.yaml
のオプションを上書きします。
環境変数を経由 #
Flinkイメージを実行する時に、環境変数FLINK_PROPERTIES
を設定することで設定オプションを変更することもできます:
$ FLINK_PROPERTIES="jobmanager.rpc.address: host
taskmanager.numberOfTaskSlots: 3
blob.server.port: 6124
"
$ docker run --env FLINK_PROPERTIES=${FLINK_PROPERTIES} flink:latest <jobmanager|standalone-job|taskmanager>
jobmanager.rpc.address
オプションを設定する必要があります。その他のオプションは任意です。
環境変数FLINK_PROPERTIES
は、改行で区切られたFlinkクラスタ設定のリストを含む必要があります。FLINK_PROPERTIES
はflink-conf.yaml
の設定よりも優先されます。
flink-conf.yaml経由 #
設定ファイル(flink-conf.yaml
, logging, hosts etc)は、Flinkイメージの/opt/flink/conf
ディレクトリにあります。
Flink設定ファイルに独自の場所を指定するには、
-
Flinklイメージを実行する時に、独自の設定ファイルがあるボリュームのマウントをこのパス
/opt/flink/conf
にします:$ docker run \ --mount type=bind,src=/host/path/to/custom/conf,target=/opt/flink/conf \ flink:latest <jobmanager|standalone-job|taskmanager>
-
あるいは、それらを独自のFlinkイメージに追加してビルドし実行します:
FROM flink
ADD /host/path/to/flink-conf.yaml /opt/flink/conf/flink-conf.yaml ADD /host/path/to/log4j.properties /opt/flink/conf/log4j.properties
マウントされるボリュームは必要な全ての設定ファイルを含む必要があります。
flink-conf.yaml
ファイルは、 特定の場合にDockerエントリポイントスクリプトがファイルを変更できるように、書き込み権限が必要です。
### ファイルシステムプラグインの使用
[プラグイン](/docs/deployment/filesystems/plugins/)のドキュメントのページで説明されっているように、プラグインを使うにはプラグインをDockerコンテナ内のFlink内の正しい場所にコピーする必要があります。
Flinkで提供されるプラグイン(Flinkディストリビューションの`opt/`ディレクトリの中)を有効にしたい場合は、Flinkイメージを実行する時に環境変数`ENABLE_BUILT_IN_PLUGINS`を渡す必要があります。
`ENABLE_BUILT_IN_PLUGINS`は、`;`で区切られるプラグインのjarファイル名のリストが含まれている必要があります。有効なプラグイン名は、例えば`flink-s3-fs-hadoop-1.19-SNAPSHOT.jar`です
```sh
$ docker run \
--env ENABLE_BUILT_IN_PLUGINS=flink-plugin1.jar;flink-plugin2.jar \
flink:latest <jobmanager|standalone-job|taskmanager>
Flinkイメージをカスタマイズするための高度な方法もあります。
Memory Allocatorの切り替え #
Flinkは、メモリ断片化問題を解決するために、デフォルトのメモリアロケータとしてjemalloc
を導入しました(FLINK-19125を参照してください)。
glibc
をメモリアロケータとして使うように切り替えると、環境変数DISABLE_JEMALLOC
をtrueに設定することで、以前の動作に戻すことができます。また、予期しないメモリの消費や問題が発生した場合は、問題をjiraまたはメーリングリストを通じて報告してください。
$ docker run \
--env DISABLE_JEMALLOC=true \
flink:latest <jobmanager|standalone-job|taskmanager>
glibc
メモリアロケータをまだ使っているユーザの場合、特にRocksDBStateBackendを使ってセーブポイントまたは完全なチェックポイントを作成している場合は、glibcのバグが簡単に再現できます。
環境変数MALLOC_ARENA_MAX
を設定すると、メモリ使用の無制限な増加を回避できます:
$ docker run \
--env MALLOC_ARENA_MAX=1 \
flink:latest <jobmanager|standalone-job|taskmanager>
さらなるカスタマイズ #
Flinkイメージをさらにカスタマイズするには、いくつかの方法があります:
- カスタムソフトウェア(例えば、python)のインストール
/opt/flink/opt
から/opt/flink/lib
または/opt/flink/plugins
へのオプションのライブラリまたはプラグインを有効(シンボリックリンク)にします。- 他のライブラリを
/opt/flink/lib
(例えば、Hadoop)に追加します - 他のプラグインを
/opt/flink/plugins
に追加します
Flinkイメージはいくつかの方法でカスタマイズできます:
-
ブートストラップアクションを実行できる独自のスクリプトを使って、コンテナのエントリポイントを上書きします。 最後に、サポートされるデプロイメントモードで説明されているのと同じ引数を使って、Flinkイメージの標準の
/docker-entrypoint.sh
スクリプトを呼び出すことができます。以下の例は、より多くのライブラリとプラグインを有効にする独自のエントリポイントスクリプトを作成します。 独自のスクリプト、独自のライブラリ、プラグインは、マウントされたボリュームから提供されます。 次に、Flinkイメージの標準エントリポイントスクリプトを実行します:
# create custom_lib.jar
create custom_plugin.jar #
$ echo "
enable an optional library #
ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/
enable a custom library #
ln -fs /mnt/custom_lib.jar /opt/flink/lib/
mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
enable an optional plugin #
ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/
mkdir -p /opt/flink/plugins/custom_plugin
enable a custom plugin #
ln -fs /mnt/custom_plugin.jar /opt/flink/plugins/custom_plugin/
/docker-entrypoint.sh <jobmanager|standalone-job|taskmanager> " > custom_entry_point_script.sh
$ chmod 755 custom_entry_point_script.sh
$ docker run
–mount type=bind,src=$(pwd),target=/mnt
flink:latest /mnt/custom_entry_point_script.sh
* 独自の`Dockerfile`を書くことで**Flinkイメージを拡張**し、独自のイメージをビルドします:
```dockerfile
FROM flink
RUN set -ex; apt-get update; apt-get -y install python
ADD /host/path/to/flink-conf.yaml /container/local/path/to/custom/conf/flink-conf.yaml
ADD /host/path/to/log4j.properties /container/local/path/to/custom/conf/log4j.properties
RUN ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/.
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
ENV VAR_NAME value
ビルドのためのコマンド:
```sh
$ docker build --tag custom_flink_image .
# optional push to your docker image registry if you have it,
# e.g. to distribute the custom image to your cluster
$ docker push custom_flink_image
```