重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

コマンドライン インタフェース

Flink はJARファイルとしてパッケージされているプログラムを実行し、それらの実行を制御するために、コマンドライン インタフェースを提供します。コマンドライン インタフェースはFlinkセットアップの一部であり、ローカルのシングルノードセットアップおよび分散セットアップで利用可能です。<flink-home>/bin/flink の下にあり、デフォルトで同じインストレーションディレクトリから開始された実行中のFlinkマスター(JobManager)に接続します。

コマンドライン インタフェースを使う前提条件は、Flinkマスター (JobManager) が (<flink-home>/bin/start-local.sh あるいは <flink-home>/bin/start-cluster.shを使って) 開始されているか、YARN環境が利用可能であるか、です。

コマンドリアンは以下のために使うことができます

  • 引数無しで例のプログラムを実行する。

    ./bin/flink run ./examples/batch/WordCount.jar
    
  • 入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run ./examples/batch/WordCount.jar \
     file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • 並行度16および入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
     file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • Flinkログの出力を無効んしいて例のプログラムを実行する

     ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • デタッチモードで例のプログラムを実行する

     ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • 特定のJobManager上で例のプログラムを実行する:

    ./bin/flink run -m myJMHost:6123 \
     ./examples/batch/WordCount.jar \
     file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • エントリーポイントとして特定のクラスを使って例のプログラムを実行する:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
     ./examples/batch/WordCount.jar \
     file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • per-job YARN クラスタを使った例のプログラムを2つのタスクマネージャーで実行する

    ./bin/flink run -m yarn-cluster -yn 2 \
     ./examples/batch/WordCount.jar \
     hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
    
  • JSONとしてWordCountの例のプログラムのための最適化された実行プランを表示する:

    ./bin/flink info ./examples/batch/WordCount.jar \
     file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • (JobIDを含む)スケジュールされた、および実行中のジョブをリスト表示する:

    ./bin/flink list
    
  • (JobIDを含む)スケジュールされたジョブをリスト表示する:

    ./bin/flink list -s
    
  • (JobIDを含む)実行中のジョブをリスト表示する:

    ./bin/flink list -r
    
  • Flink YARNセッション内で実行中のFlinkジョブをリスト表示する

    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
    
  • ジョブを取り消す:

    ./bin/flink cancel <jobID>
    
  • ジョブを止める (ストリーミングジョブのみ):

    ./bin/flink stop <jobID>
    

(ストリーミング)ジョブの取り消しと停止の違いは以下の通りです:

取り消しの呼び出し時に、ジョブ内のオペレータはできる限りすぐにそれらを取り消すようにすぐにcancel() メソッドを受け取ります。中止の呼び出しの後でオペレータが停止しない場合は、Flinkはそれが停止するまで定期的にスレッドの中断を開始するでしょう。

"停止"呼び出しは実行中のストリーミングジョブのもっと緩やかな停止方法です。StoppableFunction インタフェースを実装するソースを使うジョブは停止だけが利用可能です。ユーザがジョブを停止するように要求する場合、全てのソースはstop()メソッドの呼び出しを受け取るでしょう。ジョブは全てのソースが適切にシャットダウンするまで実行し続けるでしょう。これによりジョブは全ての実行中の処理を完了することができます。

Savepoints

Savepoints はコマンドラインクライアントによって制御されます:

savepointの起動

./bin/flink savepoint <jobID>

生成された savepoint のパスを返す。savepoints を回復および破棄するにはこのパスが必要です。

savepointを回復する

./bin/flink run -s <savepointPath> ...

runコマンドはジョブをサブミットするためのsavepointフラグを持ちます。これはsavepointから状態を回復します。savepoint パスは savepoint trigger コマンドによって返されます。

savepoint を削除します。

./bin/flink savepoint -d <savepointPath>

指定されたパスのsavepointを削除します。savepoint パスは savepoint trigger コマンドによって返されます。

If you use custom state instances (for example custom reducing state or RocksDB state), you have to specify the path to the program JAR with which the savepoint was triggered in order to dispose the savepoint with the user code class loader:

./bin/flink savepoint -d <savepointPath> -j <jarFile>

そうでなければ、ClassNotFoundExceptionになるでしょう。

使い方

コマンドラインの構文は以下の通りです:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

以下のアクションが利用可能です:

アクション "run" はプログラムをコンパイルし実行します。

 構文: run [OPTIONS] <jar-file> <arguments>
 "run"アクションのオプション:
 -c,--class <classname> プログラムのエントリポイントを持つクラス
 jarファイルを実行します。
 jarファイルを実行します。
 specify the class in its manifest.
 -C,--classpath <url> Adds a URL to each user code
 classloader on all nodes in the
 cluster. The paths must specify a
 protocol (e.g. file://) and be
 accessible on all nodes (e.g. by means
 of a NFS share). You can use this
 option multiple times for specifying
 more than one URL. The protocol must
 be supported by the {@link
 java.net.URLClassLoader}.
 -d,--detached ある場合、ジョブをでタッチして実行します
 モード
 -m,--jobmanager <host:port> Address of the JobManager (master) to
 which to connect. Specify
 'yarn-cluster' as the JobManager to
 deploy a YARN cluster for the job. Use
 this flag to connect to a different
 JobManager than the one specified in
 the configuration.
 -p,--parallelism <parallelism> The parallelism with which to run the
 program. Optional flag to override the
 default value specified in the
 configuration.
 -q,--sysoutLogging If present, supress logging output to
 standard out.
 -s,--fromSavepoint <savepointPath> Path to a savepoint to reset the job
 back to (for example
 file:///flink/savepoint-1537).
 -m yarn-cluster が設定される場合の追加の引数:
 -yD <arg> 動的なプロパティ
 -yd,--yarndetached デタッチを開始します
 -yj,--yarnjar <arg> Flinkのjarファイルへのパス
 -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
 MB]
 -yn,--yarncontainer <arg> 割り当てるYARNコンテナの数
 (=タスクマネージャーの数)
 -ynm,--yarnname <arg> Set a custom name for the application
 on YARN
 -yq,--yarnquery 利用可能なYARNリソースを表示します
 (memory, cores)
 -yqu,--yarnqueue <arg> YARNキューを指定します。
 -ys,--yarnslots <arg> タスクマネージャー辺りのスロットの数
 -yst,--yarnstreaming Flinkをストリーミングモードで開始します。
 -yt,--yarnship <arg> 指定されたディレクトリにファイルを送り出します
 (t は送信を表します)
 -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [in
 MB]


アクション "info" はプログラムの最適化された実行プランを表示します(JSON)

 構文: info [OPTIONS] <jar-file> <arguments>
 "info"アクションのオプション:
 -c,--class <classname> Class with the program entry point ("main"
 method or "getPlan()" method. Only needed
 if the JAR file does not specify the class
 in its manifest.
 -m,--jobmanager <host:port> Address of the JobManager (master) to
 which to connect. Specify 'yarn-cluster'
 as the JobManager to deploy a YARN cluster
 for the job. Use this flag to connect to a
 different JobManager than the one
 specified in the configuration.
 -p,--parallelism <parallelism> The parallelism with which to run the
 program. Optional flag to override the
 default value specified in the
 configuration.


アクション"list"は実行中およびスケジュールされたプログラムをリスト表示します。

 構文: list [OPTIONS]
 "list"アクションのオプション:
 -m,--jobmanager <host:port> Address of the JobManager (master) to which
 to connect. Specify 'yarn-cluster' as the
 JobManager to deploy a YARN cluster for the
 job. Use this flag to connect to a different
 JobManager than the one specified in the
 configuration.
 -r,--running Show only running programs and their JobIDs
 -s,--scheduled Show only scheduled programs and their JobIDs
 -m yarn-cluster が設定される場合の追加の引数:
 -yid <yarnApplicationId> YARN application ID of Flink YARN session to
 connect to. Must not be set if JobManager HA
 is used. In this case, JobManager RPC
 location is automatically retrieved from
 Zookeeper.


アクション "cancel"は実行中のプログラムを取り消します。

 構文: cancel [OPTIONS] <Job ID>
 "cancel"アクションのオプション:
 -m,--jobmanager <host:port> Address of the JobManager (master) to which
 to connect. Specify 'yarn-cluster' as the
 JobManager to deploy a YARN cluster for the
 job. Use this flag to connect to a different
 JobManager than the one specified in the
 configuration.
 -m yarn-cluster が設定される場合の追加の引数:
 -yid <yarnApplicationId> YARN application ID of Flink YARN session to
 connect to. Must not be set if JobManager HA
 is used. In this case, JobManager RPC
 location is automatically retrieved from
 Zookeeper.


アクション "stop" は実行中のプログラム(ストリーミングジョブのみ)を停止します。stopのリクエストについて、強い一貫性の保証はありません。

 構文: stop [OPTIONS] <Job ID>
 "stop"アクションのオプション:
 -m,--jobmanager <host:port> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
 different JobManager than the one specified
 in the configuration.
 -m yarn-cluster が設定される場合の追加の引数:
 -yid <yarnApplicationId> YARN application ID of Flink YARN session to
 connect to. Must not be set if JobManager HA
 is used. In this case, JobManager RPC
 location is automatically retrieved from
 Zookeeper.


アクション "savepoint"は、実行中のジョブのためのsavepointを起動するか、あるいは既存のsavepointを削除します。

 構文: savepoint [OPTIONS] <Job ID>
 "savepoint"アクションのオプション:
 -d,--dispose <arg> 削除するsavepointへのパス。
 -j,--jarfile <jarfile> Flink プログラムのJARファイル。
 -m,--jobmanager <host:port> Address of the JobManager (master) to which
 to connect. Use this flag to connect to a
 different JobManager than the one specified
 in the configuration.
 yarn-clusterモードのためのオプション:
 -yid,--yarnapplicationId <arg> 実行中のYARNセッションにアタッチします
TOP
inserted by FC2 system