Docker
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Dockerのセットアップ #

はじめに #

このはじめにセクションでは、Dockerコンテナを使ってFlinkクラスタのローカルセットアップ(1台のマシン上だが別のコンテナ)を通じて案内します。

はじめに #

Dockerは人気のあるコンテナランタイムです。 Apache Flinkの公式のDockerイメージはDocker Hubで入手できます。 Dockerイメージを使って、Docker上にセッションまたはアプリケーションクラスタをデプロイできます。このページでは、DockerとDocker ComposerでのFlinkのセットアップに焦点を当てます。

スタンドアロンKubernetesネイティブKubernetesなどの管理されたコンテナ化環境へのデプロイは、別のページで説明されます。

Dockerでのセッションクラスタの開始 #

Flinkセッションクラスタを使って複数のジョブを実行できます。各ジョブは、クラスタのデプロイ後にクラスタに送信する必要があります。 Dockerを使ってFlinkセッションクラスタをデプロイするには、JobManagerコンテナを開始する必要があります。コンテナ間の通信を有効にするために、最初に必要なFlink設定プロパティを設定し、次のネットワークを設定します:

$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network

次にJobManagerを起動します:

$ docker run \
    --rm \
    --name=jobmanager \
    --network flink-network \
    --publish 8081:8081 \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest jobmanager

1つ以上のTaskManagerコンテナ:

$ docker run \
    --rm \
    --name=taskmanager \
    --network flink-network \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:latest taskmanager

webインタフェースはlocalhost:8081で利用できるようになりました。

ジョブの送信は次のように送信できるようになりました(Flinkのローカルディストリビューションが利用可能だと仮定します):

$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

クラスタをシャットダウンするには、JobManagerとTaskManagerプロセスを(CTRL-Cなどで)終了するか、docker psを使ってコンテナを識別し、docker stopを使ってコンテナを終了します。

デプロイメントモード #

Flinkイメージには、でふぉるとせっていと標準のエントリポイントスクリプトを備えた通常のFlinkディストリビューションが含まれます。 エントリポイントは、次のモードで実行できます:

これにより、任意のコンテナ化環境にスタンドアローンクラスタ(セッションまたはアプリケーションモード)をデプロイできます。例えば:

注意 ネイティブKubernetesもデフォルトで同じイメージを実行し、オンデマンドでTaskManagerをデプロイするため、手動で行う必要はありません。

次の章では、様々な目的で単一のFlink Dockerコンテナを開始する方法を説明します。

Docker上でFlinkを開始すると、localhost:8081でFlink Web UIにアクセスしたり、./bin/flink run ./examples/streaming/TopSpeedWindowing.jarのようなジョブを送信することができます。

システム設定を簡単にするために、セッションモードでFlinkを配備するためにDocker Composeを使うことをお勧めします。

アプリケーションモード #

アプリケーションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

Flinkアプリケーションクラスタは単一のジョブを実行する専用のクラスタです。 この場合、ジョブを1つのステップとしてクラスタをデプロイするため、追加のジョブの送信は必要ありません。

job artifactsはコンテナ内のFlinkのJVMプロセスのクラスパスに含まれており、次のもので構成されます:

  • jobのjar、通常はセッションクラスタに送信します。
  • Flinkには含まれていない、その他の全ての必要な依存関係とリソース。

Dockerを使って単一ジョブのクラスタをデプロイするには、

  • /opt/flink/usrlibの下である全てのコンテナでローカルでjob artifactsを利用できるようにし、
  • アプリケーションクラスタモードのJobManagerコンテナを開始し、
  • 必要な数のTaskManagerコンテナを起動します。

ローカルでjob artifactsを利用可能にするには、

  • JobManagerとTaskManagerの起動時に、artifactsを含むボリュームをマウント (または複数のボリューム)/opt/flink/usrlibします:

    $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
    

$ docker network create flink-network

$ docker run
–mount type=bind,src=../../../../../host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1
–mount type=bind,src=../../../../../host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2
–rm
–env FLINK_PROPERTIES="${FLINK_PROPERTIES}"
–name=jobmanager
–network flink-network
flink:latest standalone-job
–job-classname com.job.ClassName
[–job-id ]
[–fromSavepoint /path/to/savepoint [–allowNonRestoredState]]
[job arguments]

$ docker run
–mount type=bind,src=../../../../../host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1
–mount type=bind,src=../../../../../host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2
–env FLINK_PROPERTIES="${FLINK_PROPERTIES}"
flink:latest taskmanager


* 独自の`Dockerfile`を作成して**Flinkイメージを拡張**し、それをビルドして、JobManagerとTaskManagersの起動に使います:


```dockerfile
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2
$ docker build --tag flink_with_job_artifacts .
$ docker run \
 flink_with_job_artifacts standalone-job \
 --job-classname com.job.ClassName \
 [--job-id <job id>] \
 [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
 [job arguments]

$ docker run flink_with_job_artifacts taskmanager

standalone-job引数は、アプリケーションモードでJobManagerコンテナを起動します。

JobManager additional command line arguments #

次の追加のコマンドライン引数をクラスタのエントリポイントに指定できます:

  • --job-classname <job class name>: 実行するジョブのクラス名。

    デフォルトでは、FlinkはMain-Classまたはprogram-class manifest entryを持つJARをクラスパスでスキャンし、それをジョブクラスとして選択します。 このコマンドライン引数を使って、手動でジョブのクラスを設定します。 この引数は、Tそのようなマニフェストエントリを持つJarがクラスパス上に存在しないか、1つ以上ある場合に必要です。

  • --job-id <job id> (王pション): ジョブのFlinkジョブIDを手動で設定します: 00000000000000000000000000000000)

  • --fromSavepoint /path/to/savepoint (オプション): セーブポイントから回復します

    セーブポイントから再開するには、セーブポイントのパスも渡す必要があります。 /path/to/savepointは、クラスタの全てのDockerコンテナからアクセスできる必要があることに注意してください(例えば、DFSに保存するか、マウントされたボリュームか、それをイメージに追加します)。

  • --allowNonRestoredState (オプション): 壊れたセーブポイントの状態をスキップします

    さらにこの引数を指定して、復元できないセーブポイントの状態をスキップできるようにすることもできます。

ユーザジョブのメインクラスのmain関数が引数を受け取る場合は、docker runコマンドの最後にそれらを渡すこともできます。

セッションモード #

セッションモードの背景にある高レベルの直感については、配備モードの概要を参照してください。

セッションモードでのローカルデプロイメントについては、上記の始めにのセクションで既に説明しました。

Back to top

イメージのホスティング #

Flink Dockerイメージには2つの配布チャンネルがあります:

  1. Docker Hub上の公式Flinkイメージ(Dockerによってレビューおよびビルドされます)
  2. Docker Hubの apache/flink上のFlinkイメージ(Flink開発者によって管理されます)

Dockerによってレビューされているため、Docker Hubの公式イメージを使うことをお勧めします。apache/flink上のイメージは、Dockerによるレビュープロセスで遅延がある場合に備えて提供されます。

flink:latestという名前のイメージを起動すると、Docker Hubから最新のイメージをpullされます。apache/flinkでホストされているイメージを使う場合は、flinkapache/flinkで置き換えてください。いずれのイメージタグ(Flink 1.11.3以降)も、apache/flinkで利用可能です。

イメージのタグ #

Flink DockerリポジトリはDocker Hubでホストされ、Flinkバージョン1.2.1以降のイメージを提供します。 これらのイメージのソースは、Apache flink-dockerリポジトリにあります。

FlinkバージョンとScalaバージョンのサポートされている組み合わせごとにイメージが利用可能で、tag aliasesは便宜上提供されています。

例えば、次のaliasesを利用できます:

  • flink:latestflink:<latest-flink>-scala_<latest-scala>
  • flink:1.11flink:1.11.<latest-flink-1.11>-scala_2.12

注意 必要なFlinkとScalaの両方のバージョンを指定するdockerイメージのバージョンタグ(例えば、flink:1.11-scala_2.12)を常に明示的に使うことをお勧めします。 これにより、アプリケーションで使われるFlinkおよび/またはScalaのバージョンが、dockerイメージで提供されるバージョンと異なる場合に発生する可能性のあるクラスの競合が回避されます。

注意 Flink 1.5バージョンより前は、Hadoopの依存関係が常にFlinkにバンドルされていました。 特定のタグにHadoopのバージョンが含まれていることがわかります(例えば、-hadoop28)。 Flink 1.5以降、Hadoopのバージョンが省略されたイメージタグは、バンドルされたHadoopディストリビューションが含まないFlinkのHadoopフリーのリリースに対応します。

Docker Composeは、Dockerコンテナのグループをローカルで実行する方法です。 次のセクションでは、Flinkを実行する設定ファイルの例を示します。

概要 #

  • ocker-compose.yamlファイルを作成します。以下のセクションの例を確認してください:

  • フォアグラウンドでクラスタを起動します(バックグランドは-dを使います)

    $ docker-compose up
    
  • クラスタをN個のTaskManagersまで、スケールアップまたはスケールダウンします

    $ docker-compose scale taskmanager=<N>
    
  • JobManagerコンテナにアクセスします

    $ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
    
  • クラスタをkillします

    $ docker-compose down
    
  • Web UIにアクセスします

    クラスタが実行中の場合、http://localhost:8081でweb UIにアクセスできます。

アプリケーションモード #

アプリケーションモードでは、イメージにバンドルされているFlinkジョブだけを実行する専用のFlinkクラスタを起動します。 従って、アプリケーションごとに専用のFlinkイメージをビルドする必要があります。 詳細はhereをご確認ください。 jobmanagerサービスのcommandJobManagerの引数を指定する方法も参照してください。

Application Modedocker-compose.yml

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] [job arguments]
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        parallelism.default: 2        

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    volumes:
      - /host/path/to/job/artifacts:/opt/flink/usrlib
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        parallelism.default: 2        

セッションモード #

セッションモードでは、docker-composeを使って長時間実行されるFlinkクラスタを起動し、そこにジョブを送信できます。

Session Modedocker-compose.yml:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        

この例では、長時間実行されるセッションクラスタを起動し、このクラスタを使ってジョブを送信するFlnk SQL CLIを起動します。

Session Clusterを使ったFlink SQLクライアントのdocker-compose.yml:

version: "2.2"
services:
  jobmanager:
    image: flink:latest
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:latest
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        
  sql-client:
    image: flink:latest
    command: bin/sql-client.sh
    depends_on:
      - jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        rest.address: jobmanager        
  • SQLクライアント開始するには、次を実行します

    docker-compose run sql-client
    

    次に、テーブルの作成を開始し、それらに対してクエリを実行できます。

  • 全ての必要な依存関係(例えば、コネクタなど)がクライアントだけでなくクラスタでも利用可能である必要があることに注意してください。 例えば、Kafkaコネクタを使いたい場合は、次のDockerfileで独自のイメージを作成します

    FROM flink:latest
    

RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.19-SNAPSHOT/flink-sql-connector-kafka_scala_2.12-1.19-SNAPSHOT.jar


Dockerファイルで(例えば`build`コマンドを経由して)それを参照します。
`ADD JAR`のようなSQLコマンドは、ローカルファイルシステム(この場合は、Dockerのオーバーレイファイルシステム)でのみ動作するため、ホストマシン上にあるJARに対しては動作しません。

## Docker上のFlink Pythonの使用

PythonとPyFlinkが用意された独自のイメージを構築するには、次のDockerfileを参照できます:
```Dockerfile

FROM flink:latest

# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

# install PyFlink

COPY apache-flink*.tar.gz /
RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install /apache-flink*.tar.gz
Note PyFlink packages could be built according to development guide

pyflink:latestという名前のイメージをビルドします:

$ docker build --tag pyflink:latest .

DockerでFlinkを設定します #

動的プロパティ経由 #

$ docker run flink:latest \
    <jobmanager|standalone-job|taskmanager|historyserver> \
    -D jobmanager.rpc.address=host \
    -D taskmanager.numberOfTaskSlots=3 \
    -D blob.server.port=6124

動的プロパティ経由で設定されたオプションは、flink-conf.yamlのオプションを上書きします。

環境変数を経由 #

Flinkイメージを実行する時に、環境変数FLINK_PROPERTIESを設定することで設定オプションを変更することもできます:

$ FLINK_PROPERTIES="jobmanager.rpc.address: host
taskmanager.numberOfTaskSlots: 3
blob.server.port: 6124
"
$ docker run --env FLINK_PROPERTIES=${FLINK_PROPERTIES} flink:latest <jobmanager|standalone-job|taskmanager>

jobmanager.rpc.address オプションを設定する必要があります。その他のオプションは任意です。

環境変数FLINK_PROPERTIESは、改行で区切られたFlinkクラスタ設定のリストを含む必要があります。FLINK_PROPERTIESflink-conf.yamlの設定よりも優先されます。

設定ファイル(flink-conf.yaml, logging, hosts etc)は、Flinkイメージの/opt/flink/confディレクトリにあります。 Flink設定ファイルに独自の場所を指定するには、

  • Flinklイメージを実行する時に、独自の設定ファイルがあるボリュームのマウントをこのパス/opt/flink/confにします:

    $ docker run \
     --mount type=bind,src=/host/path/to/custom/conf,target=/opt/flink/conf \
     flink:latest <jobmanager|standalone-job|taskmanager>
    
  • あるいは、それらを独自のFlinkイメージに追加してビルドし実行します:

    FROM flink
    

ADD /host/path/to/flink-conf.yaml /opt/flink/conf/flink-conf.yaml ADD /host/path/to/log4j.properties /opt/flink/conf/log4j.properties


マウントされるボリュームは必要な全ての設定ファイルを含む必要があります。 flink-conf.yamlファイルは、 特定の場合にDockerエントリポイントスクリプトがファイルを変更できるように、書き込み権限が必要です。
### ファイルシステムプラグインの使用 [プラグイン](/docs/deployment/filesystems/plugins/)のドキュメントのページで説明されっているように、プラグインを使うにはプラグインをDockerコンテナ内のFlink内の正しい場所にコピーする必要があります。 Flinkで提供されるプラグイン(Flinkディストリビューションの`opt/`ディレクトリの中)を有効にしたい場合は、Flinkイメージを実行する時に環境変数`ENABLE_BUILT_IN_PLUGINS`を渡す必要があります。 `ENABLE_BUILT_IN_PLUGINS`は、`;`で区切られるプラグインのjarファイル名のリストが含まれている必要があります。有効なプラグイン名は、例えば`flink-s3-fs-hadoop-1.19-SNAPSHOT.jar`です ```sh $ docker run \ --env ENABLE_BUILT_IN_PLUGINS=flink-plugin1.jar;flink-plugin2.jar \ flink:latest <jobmanager|standalone-job|taskmanager>

Flinkイメージをカスタマイズするための高度な方法もあります。

Memory Allocatorの切り替え #

Flinkは、メモリ断片化問題を解決するために、デフォルトのメモリアロケータとしてjemallocを導入しました(FLINK-19125を参照してください)。

glibcをメモリアロケータとして使うように切り替えると、環境変数DISABLE_JEMALLOCをtrueに設定することで、以前の動作に戻すことができます。また、予期しないメモリの消費や問題が発生した場合は、問題をjiraまたはメーリングリストを通じて報告してください。

    $ docker run \
      --env DISABLE_JEMALLOC=true \
      flink:latest <jobmanager|standalone-job|taskmanager>

glibcメモリアロケータをまだ使っているユーザの場合、特にRocksDBStateBackendを使ってセーブポイントまたは完全なチェックポイントを作成している場合は、glibcのバグが簡単に再現できます。 環境変数MALLOC_ARENA_MAXを設定すると、メモリ使用の無制限な増加を回避できます:

    $ docker run \
      --env MALLOC_ARENA_MAX=1 \
      flink:latest <jobmanager|standalone-job|taskmanager>

さらなるカスタマイズ #

Flinkイメージをさらにカスタマイズするには、いくつかの方法があります:

  • カスタムソフトウェア(例えば、python)のインストール
  • /opt/flink/optから/opt/flink/libまたは/opt/flink/pluginsへのオプションのライブラリまたはプラグインを有効(シンボリックリンク)にします。
  • 他のライブラリを/opt/flink/lib (例えば、Hadoop)に追加します
  • 他のプラグインを/opt/flink/pluginsに追加します

Flinkイメージはいくつかの方法でカスタマイズできます:

  • ブートストラップアクションを実行できる独自のスクリプトを使って、コンテナのエントリポイントを上書きします。 最後に、サポートされるデプロイメントモードで説明されているのと同じ引数を使って、Flinkイメージの標準の/docker-entrypoint.shスクリプトを呼び出すことができます。

    以下の例は、より多くのライブラリとプラグインを有効にする独自のエントリポイントスクリプトを作成します。 独自のスクリプト、独自のライブラリ、プラグインは、マウントされたボリュームから提供されます。 次に、Flinkイメージの標準エントリポイントスクリプトを実行します:

    # create custom_lib.jar
    

create custom_plugin.jar #

$ echo "

enable an optional library #

ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/

enable a custom library #

ln -fs /mnt/custom_lib.jar /opt/flink/lib/

mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop

enable an optional plugin #

ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/

mkdir -p /opt/flink/plugins/custom_plugin

enable a custom plugin #

ln -fs /mnt/custom_plugin.jar /opt/flink/plugins/custom_plugin/

/docker-entrypoint.sh <jobmanager|standalone-job|taskmanager> " > custom_entry_point_script.sh

$ chmod 755 custom_entry_point_script.sh

$ docker run
–mount type=bind,src=$(pwd),target=/mnt flink:latest /mnt/custom_entry_point_script.sh


* 独自の`Dockerfile`を書くことで**Flinkイメージを拡張**し、独自のイメージをビルドします:


```dockerfile
FROM flink

RUN set -ex; apt-get update; apt-get -y install python

ADD /host/path/to/flink-conf.yaml /container/local/path/to/custom/conf/flink-conf.yaml
ADD /host/path/to/log4j.properties /container/local/path/to/custom/conf/log4j.properties

RUN ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/.

RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.

ENV VAR_NAME value

ビルドのためのコマンド:

```sh
$ docker build --tag custom_flink_image .
# optional push to your docker image registry if you have it,
# e.g. to distribute the custom image to your cluster
$ docker push custom_flink_image
```

Back to top

inserted by FC2 system