Flink はJARファイルとしてパッケージされているプログラムを実行し、それらの実行を制御するために、コマンドライン インタフェースを提供します。コマンドライン インタフェースはFlinkセットアップの一部であり、ローカルのシングルノードセットアップおよび分散セットアップで利用可能です。<flink-home>/bin/flink
の下にあり、デフォルトで同じインストレーションディレクトリから開始された実行中のFlinkマスター(JobManager)に接続します。
コマンドライン インタフェースを使う前提条件は、Flinkマスター (JobManager) が (<flink-home>/bin/start-local.sh
あるいは <flink-home>/bin/start-cluster.sh
を使って) 開始されているか、YARN環境が利用可能であるか、です。
コマンドリアンは以下のために使うことができます
引数無しで例のプログラムを実行する。
./bin/flink run ./examples/batch/WordCount.jar
入力と結果のファイルのための引数と一緒に例のプログラムを実行する
./bin/flink run ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
並行度16および入力と結果のファイルのための引数と一緒に例のプログラムを実行する
./bin/flink run -p 16 ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
Flinkログの出力を無効にして例のプログラムを実行する
./bin/flink run -q ./examples/batch/WordCount.jar
デタッチモードで例のプログラムを実行する
./bin/flink run -d ./examples/batch/WordCount.jar
特定のJobManager上で例のプログラムを実行する:
./bin/flink run -m myJMHost:6123 \
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
エントリーポイントとして特定のクラスを使って例のプログラムを実行する:
./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
per-job YARN クラスタを使った例のプログラムを2つのタスクマネージャーで実行する
./bin/flink run -m yarn-cluster -yn 2 \
./examples/batch/WordCount.jar \
--input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
JSONとしてWordCountの例のプログラムのための最適化された実行プランを表示する:
./bin/flink info ./examples/batch/WordCount.jar \
--input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
(JobIDを含む)スケジュールされた、および実行中のジョブをリスト表示する:
./bin/flink list
(JobIDを含む)スケジュールされたジョブをリスト表示する:
./bin/flink list -s
(JobIDを含む)実行中のジョブをリスト表示する:
./bin/flink list -r
Flink YARNセッション内で実行中のFlinkジョブをリスト表示する
./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
ジョブを取り消す:
./bin/flink cancel <jobID>
セーブポイントを使ってジョブを取り消す:
./bin/flink cancel -s [targetDirectory] <jobID>
ジョブを止める (ストリーミングジョブのみ):
./bin/flink stop <jobID>
(ストリーミング)ジョブの取り消しと停止の違いは以下の通りです:
取り消しの呼び出し時に、ジョブ内のオペレータはできる限りすぐにそれらを取り消すようにすぐにcancel()
メソッドを受け取ります。中止の呼び出しの後でオペレータが停止しない場合は、Flinkはそれが停止するまで定期的にスレッドの中断を開始するでしょう。
"停止"呼び出しは実行中のストリーミングジョブのもっと緩やかな停止方法です。StoppableFunction
インタフェースを実装するソースを使うジョブは停止だけが利用可能です。ユーザがジョブを停止するように要求する場合、全てのソースはstop()
メソッドの呼び出しを受け取るでしょう。ジョブは全てのソースが適切にシャットダウンするまで実行し続けるでしょう。これによりジョブは全ての実行中の処理を完了することができます。
Savepoints はコマンドラインクライアントによって制御されます:
./bin/flink savepoint <jobID> [savepointDirectory]
生成された savepoint のパスを返す。savepoints を回復および破棄するにはこのパスが必要です。
セーブポイントが引き起こされた時に任意でsavepointDirectory
を指定することができます。ここで指定しない場合は、Flinkインストレーションのデフォルトのセーブポイント ディレクトリを設定する必要があります (Savepointsを見てください)。
アトミックにセーブポイントの起動とジョブの取り消しをすることができます。
./bin/flink cancel -s [savepointDirectory] <jobID>
セーブポイントのディレクトリが設定されない場合、Flinkインストレーションのデフォルトのセーブポイント ディレクトリを設定する必要があります (Savepointsを見てください)。
セーブポイントが成功した場合のみ、ジョブが取り消されるでしょう。
./bin/flink run -s <savepointPath> ...
runコマンドはジョブをサブミットするためのsavepointフラグを持ちます。これはsavepointから状態を回復します。savepoint パスは savepoint trigger コマンドによって返されます。
デフォルトで全てのセーブポイントの状態をサブミットされたジョブに一致させようとします。新しいジョブで回復できないセーブポイントの状態をスキップできるようにしたい場合は、allowNonRestoredState
フラグを設定することができます。セーブポイントが起動され、そのセーブポイントをまだ使いたい時に、プログラムの一部であったプログラムからオペレータを削除した場合は、これを許容する必要があります。
./bin/flink run -s <savepointPath> -n ...
プログラムがセーブポイントの一部であったオペレータを落とした場合、これは便利です。
./bin/flink savepoint -d <savepointPath>
指定されたパスのsavepointを削除します。savepoint パスは savepoint trigger コマンドによって返されます。
(例えば独自のreduce状態あるいはRocksDB状態のような)独自の状態インスタンスを使う場合、ユーザコードクラスローダを持つセーブポイントを処分するためにセーブポイントが起動されたプログラム JARへのパスを指定する必要があります。
./bin/flink savepoint -d <savepointPath> -j <jarFile>
そうでなければ、ClassNotFoundException
になるでしょう。
コマンドラインの構文は以下の通りです:
./flink <ACTION> [OPTIONS] [ARGUMENTS]
以下のアクションが利用可能です:
アクション "run" はプログラムをコンパイルし実行します。
構文: run [OPTIONS] <jar-file> <arguments>
"run"アクションのオプション:
-c,--class <classname> Class with the program entry
point ("main" method or
"getPlan()" method. Only
needed if the JAR file does
not specify the class in its
manifest.
-C,--classpath <url> Adds a URL to each user code
classloader on all nodes in
the cluster. The paths must
specify a protocol (e.g.
file://) and be accessible
on all nodes (e.g. by means
of a NFS share). You can use
this option multiple times
for specifying more than one
URL. The protocol must be
supported by the {@link
java.net.URLClassLoader}.
-d,--detached If present, runs the job in
detached mode
-m,--jobmanager <host:port> Address of the JobManager
(master) to which to
connect. Use this flag to
connect to a different
JobManager than the one
specified in the
configuration.
-n,--allowNonRestoredState Allow non restored savepoint
state in case an operator has
been removed from the job.
-p,--parallelism <parallelism> The parallelism with which
to run the program. オプション
flag to override the default
value specified in the
configuration.
-q,--sysoutLogging If present, suppress logging
output to standard out.
-s,--fromSavepoint <savepointPath> Path to a savepoint to
restore the job from (for
例
hdfs:///flink/savepoint-1537
).
-z,--zookeeperNamespace <zookeeperNamespace> Namespace to create the
Zookeeper sub-paths for high
availability mode
yarn-clusterモードのためのオプション:
-yD <arg> Dynamic properties
-yd,--yarndetached Start detached
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
MB]
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=タスクマネージャーの数)
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t は送信を表します)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [in
MB]
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
Action "info" shows the optimized execution plan of the program (JSON).
構文: info [OPTIONS] <jar-file> <arguments>
"info"アクションのオプション:
-c,--class <classname> Class with the program entry point ("main"
method or "getPlan()" method. Only needed
if the JAR file does not specify the class
in its manifest.
-p,--parallelism <parallelism> The parallelism with which to run the
program. Optional flag to override the
default value specified in the
configuration.
yarn-clusterモードのためのオプション:
-yid,--yarnapplicationId <arg> Attach to running YARN session
Action "list" lists running and scheduled programs.
構文: list [OPTIONS]
"list"アクションのオプション:
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
-r,--running Show only running programs and their JobIDs
-s,--scheduled Show only scheduled programs and their JobIDs
yarn-clusterモードのためのオプション:
-yid,--yarnapplicationId <arg> Attach to running YARN session
Action "stop" stops a running program (streaming jobs only).
構文: stop [OPTIONS] <Job ID>
"stop"アクションのオプション:
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
yarn-clusterモードのためのオプション:
-yid,--yarnapplicationId <arg> Attach to running YARN session
Action "cancel" cancels a running program.
構文: cancel [OPTIONS] <Job ID>
"cancel"アクションのオプション:
-m,--jobmanager <host:port> Address of the JobManager (master)
to which to connect. Use this flag
to connect to a different JobManager
than the one specified in the
configuration.
-s,--withSavepoint <targetDirectory> Trigger savepoint and cancel job.
The target directory is optional. If
no directory is specified, the
configured default directory
(state.savepoints.dir) is used.
yarn-clusterモードのためのオプション:
-yid,--yarnapplicationId <arg> Attach to running YARN session
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
"savepoint"アクションのオプション:
-d,--dispose <arg> Path of savepoint to dispose.
-j,--jarfile <jarfile> Flink program JAR file.
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
yarn-clusterモードのためのオプション:
-yid,--yarnapplicationId <arg> Attach to running YARN session