This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
コマンドライン インタフェース #
Flinkは、JARファイルとしてパッケージ化されたプログラムを実行し、その実行を制御するためのコマンドラインインタフェース(CLI) bin/flink
を提供します。
CLIはFlinkセットアップの一部であり、ローカルの単一ノードセットアップや分散セットアップで利用できます。
conf/flink-conf.yaml
で指定された実行中のJobManagerに接続します。
Jobのライフサイクル管理 #
このセクションでリストされているコマンドが機能するための前提条件は、KubernetesのようなFlinkデプロイメントの実行、YARN、その他の利用可能なオプションがあることです。 自分のマシンでコマンドを試すために、Flinkクラスタをローカルで起動してください。
ジョブの送信 #
ジョブの送信とは、ジョブのJARと関連する依存関係をFlinkクラスタにアップロードし、ジョブの実行を開始することを意味します。
この例では、examples/streaming/StateMachineExample.jar
のような長時間実行ジョブを選択します。
examples/
フォルダから他のJARファイルを自由に選択するか、独自のジョブをデプロイしてください。
$ ./bin/flink run \
--detached \
./examples/streaming/StateMachineExample.jar
--detached
を使ってジョブを送信すると、送信完了後にコマンドが返されます。
出力には、(その他のものに加えて)新しく送信されたジョブのIDが含まれます。
組み込みのデータジェネレータでの使用: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
Kafkaでの使用法: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
上記の両方の瀬戸アップのオプション:
[--backend <file|rocks>]
[--checkpoint-dir <filepath>]
[--async-checkpoints <true|false>]
[--incremental-checkpoints <true|false>]
[--output <filepath> OR null for stdout]
Using standalone source with error rate 0.000000 and sleep delay 1 millis
Job has been submitted with JobID cca7bc1061d61cf15238e92312c2fc20
出力される使用法情報には、必要に応じてジョブ送信コマンドの最後に追加できるジョブ関連のパラメータが一覧表示されます。
読みやすくするために、以下のコマンドでは、返されたJobIDが変数JOB_ID
に格納されていると想定します:
$ export JOB_ID="cca7bc1061d61cf15238e92312c2fc20"
アプリケーションモードでジョブを実行するために使えるrun-application
と呼ばれる別のアクションがあります。
このアクションは、CLIフロントエンドのrun
アクションと同様に動作するため、このドキュメントでは個別に説明しません。
run
とrun-application
コマンドは、-D
引数を介した追加の設定パラメータの受け渡しをサポートします。
例えば、ジョブの最大並行処理を設定するには、-Dpipeline.max-parallelism=120
を設定します。
この引数は、設定ファイルを変更せずにクラスタに任意の設定パラメータを渡すことができるため、アプリケーションモードのクラスタの設定に非常に役立ちます。
既存のセッションクラスタにジョブを送信する場合、実行設定パラメータのみがサポートされます。
ジョブの監視 #
list
アクションを使って、実行中のジョブを監視できます:
$ ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
30.11.2020 16:02:29 : cca7bc1061d61cf15238e92312c2fc20 : State machine job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
送信されたがまだ開始されていないジョブは、“Scheduled Jobs"の下に一覧表示されます。
セーブポイントの作成 #
セーブポイントはジョブの現在の状態を保存するために作成されます。 必要なのはJobIDだけです:
$ ./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints
Triggering savepoint for job cca7bc1061d61cf15238e92312c2fc20.
Waiting for response...
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
You can resume your program from this savepoint with the run command.
セーブポイントフォルダはオプションであり、state.savepoints.dirが設定されていない場合は指定する必要があります。
最後に、オプションで、セーブポイントのバイナリ形式を指定できます。
セーブポイントへのパスは、あtおでFlinkジョブを再起動するために使えます。
セーブポイントの破棄 #
セーブポイント
アクションを使ってセーブポイントを削除することもできます。
--dispose
対応するセープポイントのパスを追加する必要があります:
$ ./bin/flink savepoint \
--dispose \
/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
$JOB_ID
Disposing savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab'.
Waiting for response...
Savepoint '/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab' disposed.
カスタム状態インスタンス(例えば、独自のreduce状態やRocksDB状態)を使う場合は、セーブポイントがトリガーされたプログラムJARへのパスを指定する必要があります。
そうしなければ、ClassNotFoundException
が発生します:
$ ./bin/flink savepoint \
--dispose <savepointPath> \
--jarfile <jarFile>
savepoint
アクションを通じてセーブポイントの破棄をトリガーすると、ストレージからデータが削除されるだけでなく、Flinkによってセーブポイント関連のメタデータもクリーンアップされます。
ジョブの終了 #
ジョブの正常な停止 最終セーブポイントの作成 #
ジョブを停止するためのもう1つのアクションはstop
です。
これは、stop
がソースからシンクに流れる時に、実行中のストリーミングジョブをより適切に停止する方法です。
ユーザがジョブの停止をリクエストすると、全てのソースはセーブポイントをトリガーする最後のチェックポイントバリアの送信をリクエストされ、そのセーブポイントが正常に完了すると、cancel()
メソッドを呼び出して終了します。
$ ./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
$JOB_ID
Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint.
Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab
state.savepoints.dirが設定されていない場合、セーブポイントフォルダを指定するために--savepointPath
を使う必要があります。
isn’t set.
--drain
フラグが指定されている場合、最後のチェックポイントバリアの前にMAX_WATERMARK
が発行されます。
barrier.
これにより、登録されている全てのイベント時間タイマーが起動され、特定のウォーターマーク(例えばウィンドウ)を待っている状態がフラッシュされます。
ジョブは全てのソースが適切にシャットダウンするまで実行し続けるでしょう。
これにより、ジョブは全ての処理中のデータの処理を完了できるようになり、停止中に取得されたセーブポイントの後に処理するレコードが生成される可能性があります。
ジョブを完全に終了するには、--drain
フラグを使います。
ジョブを後で再開したい場合は、パイプラインを空にしないでください。ジョブが再開された時に誤った結果が生じる可能性があります。
最後に、オプションで、セーブポイントのバイナリ形式を指定できます。
ジョブの不当なキャンセル #
ジョブをキャンセルするにはcancel
アクションを使います:
$ ./bin/flink cancel $JOB_ID
Cancelling job cca7bc1061d61cf15238e92312c2fc20.
Cancelled job cca7bc1061d61cf15238e92312c2fc20.
対応するジョブの状態がRunning
からCancelled
に移行します。
Any computations will be stopped.
--withSavepoint
フラグを使うと、ジョブのキャンセルの一部といてセーブポイントを作成できます。
この機能は廃止されました。
代わりにstopアクションを使ってください。
セーブポイントからジョブを開始します。 #
セーブポイントからジョブを開始するには、run
(とrun-application
)アクションを使います。
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
組み込みのデータジェネレータでの使用: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]
Kafkaでの使用法: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]
上記の両方の瀬戸アップのオプション:
[--backend <file|rocks>]
[--checkpoint-dir <filepath>]
[--async-checkpoints <true|false>]
[--incremental-checkpoints <true|false>]
[--output <filepath> OR null for stdout]
Using standalone source with error rate 0.000000 and sleep delay 1 millis
Job has been submitted with JobID 97b20a0a8ffd5c1d656328b0cd6436a6
以前に停止したジョブの状態を参照するために使われる--fromSavepoint
パラメータを除き、このコマンドが最初の実行コマンドとどのように同じであるかを確認してください。
ジョブを維持するために使える新しいJobIDが生成されます。
デフォルトでは、セーブポイントの状態全体を送信中のジョブと一致させようとします。
新しいジョブで再現できないセーブポイント状態をスキップできるようにしたい場合は、--allowNonRestoredState
フラグを設定できます。
セーブポイントが起動され、そのセーブポイントをまだ使いたい時に、プログラムの一部であったプログラムからオペレータを削除した場合は、これを許容する必要があります。
$ ./bin/flink run \
--fromSavepoint <savepointPath> \
--allowNonRestoredState ...
プログラムがセーブポイントの一部であったオペレータを落とした場合、これは便利です。
セーブポイントに使うrestore modeを選択することもできます。 which should be used for the savepoint. このモードは、指定されたセーブポイントのファイルの所有者を誰が取得するかを制御します。
CLIアクション #
FlinkのCLIツールでサポートされるアクションの概要は次の通りです:
アクション | 目的 |
---|---|
run |
このアクションはジョブを実行します。少なくともジョブをI含むjarが必要です。必要に応じて、Flink-やjob-related引数を渡すことができます。 |
run-application |
このアクションは、}}#application-mode">アプリケーションモードにある上部を実行します。
それ以外は、run アクションと同じパラメータを必要とします。
|
info |
このアクションを使うと、渡されたジョブの最適化されt時刻グラフを出力できます。 もう一度、ジョブを含むjaraが渡される必要があります。 |
list |
このアクションは、実行中のジョブまたはスケジュールされたジョブを全て一覧表示します。 |
savepoint |
このアクションは、指定されたジョブのセーブポイントを作成または破棄するために使われます。
conf/flink-conf.yaml で}}#state-savepoints-dir">state.savepoints.dirパラメータが指定されていない場合は、JobIDの他にセーブポイントディレクリを指定する必要がある場合があります。
|
cancel |
このアクションを使うと、JobIDに基づいた実行中のジョブをキャンセルできます。 |
stop |
このアクションは、cancel とsavepoint アクションを組み合わせて実行中のジョブを停止しますが、再度開始するためのセーブポイントも作成します。
|
全てのアクションとそれらのパラメータのより詳細な説明には、bin/flink --help
または個々のアクションのbin/flink <action> --help
からアクセスできます。
高度なCLI #
REST API #
Flinkクラスタは、REST APIを使って管理することもできます。
前のセクションで説明したコマンドは、FlinkのRESTエンドポイントによって提供されるもののサブセットです。
従って、curl
のようなツールを使うと、Flinkをより活用することができます。
デプロイメントターゲットの選択 #
Flinkは、Kubernetes、YARNのような複数のクラスタ管理フレームワークーと互換性があります。詳細はリソースプロバイダのセクションで説明されます。ジョブは、様々なデプロイメントモードで送信できます。 ジョブ送信のパラメータ化は、基礎となるフレームワークとデプロイメントモードによって異なります。
bin/flink
は様々なオプションを処理するためのパラメータ--target
を提供します。
それに加えて、ジョブはrun
(Session用とPer-Jobモード (非推奨))、またはrun-application
(アプリケーションモード用)のいずれかを使って送信される必要があります。
パラメータの組み合わせについては、次の概要を参照してください:
- YARN
./bin/flink run --target yarn-session
: すでに実行中のYARNクラスタ上のFlinkへの送信./bin/flink run --target yarn-per-job
: ジョブごとのモード(非推奨)でYARNクラスタ上のFlinkをしピンアップする送信./bin/flink run-application --target yarn-application
: アプリケーションモードでYARNクラスタ上でFlinkをスピンアップする送信
- Kubernetes
./bin/flink run --target kubernetes-session
: すでに実行中のKubernetesクラスタ上のFlinkへの送信./bin/flink run-application --target kubernetes-application
: アプリケーションモードでKubernetes上でFlinkをスピンアップする送信
- Standalone:
./bin/flink run --target local
: セッションモードでMiniClusterを使ったローカル送信./bin/flink run --target remote
: 既に実行中のFlinkクラスタへの送信
--target
は、conf/flink-conf.yaml
で指定されるexecution.targetを上書きします。
コマンドと利用可能なオプションの詳細については、ドキュメントのリソースプロバイダ固有のページを参照してください。
PyFlinkジョブの送信 #
現在、ユーザはCLI経由でPyFlinkジョブを送信できます。Jabaジョブの送信と異なり、JARファイルのパスやエントリのメインクラスを指定する必要はありません。
flink run
経由でPythonジョブを送信する場合、Flinkはコマンド"python"を実行します。
次のコマンドを実行して、現在の環境でのPython実行可能ファイルが、サポートされているPythonバージョン3.8以降を指していることを確認してください。
$ python --version
# the version printed here must be 3.8+
次のコマンドは様々なPyFlinkジョブ送信のユースケースを示しています:
- PyFlinkジョブの事項:
$ ./bin/flink run --python examples/python/table/word_count.py
- 追加のソースファイルとリソースファイルを使ってPyFlinkジョブを実行します。
--pyFiles
で指定されたファイルは、PYTHONPATH
に追加されるため、Pythonコードで利用できるようになります。
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt
- Java UDFまたは外部コネクタを参照するPYFぃんkジョブを実行します。
--jarfile
で指定されたJARファイルは、クラスタに送信されます。
$ ./bin/flink run \
--python examples/python/table/word_count.py \
--jarfile <jarFile>
- pyFilesと
--pyModule
で指定されたメインエントリモジュールを使って、PyFlinkジョブを実行します:
$ ./bin/flink run \
--pyModule word_count \
--pyFiles examples/python/table
- ホスト
<jobmanagerHost>
(合わせてコマンドを適用)で実行されている特定のJobManagerでPyFlinkジョブを送信します。
$ ./bin/flink run \
--jobmanager <jobmanagerHost>:8081 \
--python examples/python/table/word_count.py
- Per-JobモードのYARNクラスタを使ってPyFlinkジョブを実行します:
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/word_count.py
- アプリケーションモードのYARNクラスタを使ってPyFlinkを実行します:
$ ./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name=<ApplicationName> \
-Dyarn.ship-files=/path/to/shipfiles \
-pyarch shipfiles/venv.zip \
-pyclientexec venv.zip/venv/bin/python3 \
-pyexec venv.zip/venv/bin/python3 \
-pyfs shipfiles \
-pym word_count
注意 ジョブの実行に必要なPythonの依存関係がディレクトリ/path/to/shipfiles
にすでに配置されていることを前提としています。
例えば、上の例では、venv.zipとword_count.pyが含まれている必要があります。
注意 YARNアプリケーションモードのJobManagerのジョブを実行するため、-pyarch
と-pyfs
で指定されるパスは出荷されたファイルのディレクトリ名であるshipfiles
に対する相対パスになります。
エントリポイントプログラムの絶対パスも相対パスも知ることができないため、プログラムのエントリポイントを指定するには、-py
ではなく-pym
を使うことをお勧めします。
注意 -pyarch
で指定された圧縮ファイルは、ファイルサイズ制限が2GBのblogサーバを通じてTaskManagerに配布されます。
圧縮ファイルのサイズが2GBを超える場合は、それを分散ファイルシステムにアップロードし、コマンドラインオプション-pyarch
のパスを使うことができます。
- クラスタID
<ClusterId>
を持つネイティブKubernetesクラスタ上でPyFlinkアプリケーションを実行します。これには、PyFlinkがインストールされたdockerイメージが必要です。dockerでPyFlinkを有効にするを参照してください:
$ ./bin/flink run-application \
--target kubernetes-application \
--parallelism 8 \
-Dkubernetes.cluster-id=<ClusterId> \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.container.image.ref=<PyFlinkImageName> \
--pyModule word_count \
--pyFiles /opt/flink/examples/python/table/word_count.py
利用可能なオプションの詳細については、ロソースプロバイダセクションで詳しく説明されているKubernetesまたは YARNを参照してください。
上記の--pyFiles
、--pyModule
、--python
以外にも、Pythonに関係するオプションがあります。
ここでは、FlinkのCLIツールにサポートされるアクションrun
とrun-application
のPythonに関係する全てのオプションの概要を示します:
オプション | 説明 |
---|---|
-py,--python |
プログラムエントリを含むPythonスクリプト。
依存リソースは、--pyFiles オプションを使って設定できます。
|
-pym,--pyModule |
プログラムエントリを含むPythonモジュール。
このオプションは--pyFiles と組み合わせて使う必要があります。
|
-pyfs,--pyFiles |
ジョブに独自のファイルを添付します。 .py/.egg/.zip/.whlなどの標準リソースファイルの拡張子やディレクトリがサポートされます。 これらのファイルは、ローカルクライアントとリモートのpython UDFワーカーの両方のPYTHONPATHに追加されます。 拡張子.zipは解凍され、PYTHONPATHに追加されます。 複数ファイルを指定するためにカンマ(',')を区切り文字として使えます(例えば、--pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)。 |
-pyarch,--pyArchives |
ジョブ用のpythonアーカイブファイルを追加します。圧縮ファイルはpython UDFワーカーの作業ディレクトリに解凍されます。 圧縮ファイルごとに、ターゲットディレクトリを指定します。 ターゲットディレクトリ名が指定された場合、指定した名前のディレクトリに圧縮ファイルが解凍されます。 それ以外の場合、圧縮ファイルは圧縮ファイル名と同じディレクトリに解凍されます。 このオプションを介してアップロードされたファイルは、相対パスを介してアクセスできます。 '#'は、圧縮ファイルパスとターゲットディレクトリ名の区切り文字として使えます。 カンマ(',')は複数の圧縮ファイルを指定するための区切り文字として使われます このオプションは、仮想環境、Python UDFで使われるデータファイル(例えば、--pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable py37.zip/py37/bin/python)をアップロードするために使えます。 データファイルはPython UDFでアクセスできます。例えば: f = open('data/data.txt', 'r'). |
-pyclientexec,--pyClientExecutable |
\"flink run\"を介してPythonジョブを送信する時、またはPython UDFジョブをコンパイルする時に、Pythonプロセスを起動するために使われるPythonインタプリタのパス。 (例えば、--pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python) |
-pyexec,--pyExecutable |
python UDFワーカーの実行に使われるpythonインタプリタのパスを指定します(例えば: --pyExecutable /usr/local/bin/python3)。 python UDFワーカーは、Python 3.8+、Apache Beam (version == 2.43.0)、Pip (version >= 20.3)、SetupTools (version >= 37.0.0)に依存します。 指定した環境が上記の要件を満たしていることを確認してください。 |
-pyreq,--pyRequirements |
サードパーティの依存関係を定義するrequirements.txtファイルを指定します。 これらの依存関係はインストールされ、python UDFワーカーのPYTHONPATHに追加されます。 これらの依存関係のインストールパッケージを含むディレクトリは、オプションで指定できます。 オプションのパラメータが存在する場合は、区切り文字として'#'を使います(例えばあ、--pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)。 |
ジョブ送信時のコマンドラインオプションに加えて、コード内の設定またはPython APIを介して依存関係を指定することもサポートされます。 詳細については、依存関係管理を参照してください。