コマンドライン インタフェース

Flink はJARファイルとしてパッケージされているプログラムを実行し、それらの実行を制御するために、コマンドライン インタフェースを提供します。コマンドライン インタフェースはFlinkセットアップの一部であり、ローカルのシングルノードセットアップおよび分散セットアップで利用可能です。<flink-home>/bin/flink の下にあり、デフォルトで同じインストレーションディレクトリから開始された実行中のFlinkマスター(JobManager)に接続します。

コマンドライン インタフェースを使う前提条件は、Flinkマスター (JobManager) が (<flink-home>/bin/start-local.sh あるいは <flink-home>/bin/start-cluster.shを使って) 開始されているか、YARN環境が利用可能であるか、です。

コマンドリアンは以下のために使うことができます

  • 引数無しで例のプログラムを実行する。

    ./bin/flink run ./examples/batch/WordCount.jar
    
  • 入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 並行度16および入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • Flinkログの出力を無効んしいて例のプログラムを実行する

        ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • デタッチモードで例のプログラムを実行する

        ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • 特定のJobManager上で例のプログラムを実行する:

    ./bin/flink run -m myJMHost:6123 \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • エントリーポイントとして特定のクラスを使って例のプログラムを実行する:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • per-job YARN クラスタを使った例のプログラムを2つのタスクマネージャーで実行する

    ./bin/flink run -m yarn-cluster -yn 2 \
                           ./examples/batch/WordCount.jar \
                           --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
    
  • JSONとしてWordCountの例のプログラムのための最適化された実行プランを表示する:

    ./bin/flink info ./examples/batch/WordCount.jar \
                            --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • (JobIDを含む)スケジュールされた、および実行中のジョブをリスト表示する:

    ./bin/flink list
    
  • (JobIDを含む)スケジュールされたジョブをリスト表示する:

    ./bin/flink list -s
    
  • (JobIDを含む)実行中のジョブをリスト表示する:

    ./bin/flink list -r
    
  • Flink YARNセッション内で実行中のFlinkジョブをリスト表示する

    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
    
  • ジョブを取り消す:

    ./bin/flink cancel <jobID>
    
  • Cancel a job with a savepoint:

    ./bin/flink cancel -s [targetDirectory] <jobID>
    
  • ジョブを止める (ストリーミングジョブのみ):

    ./bin/flink stop <jobID>
    

(ストリーミング)ジョブの取り消しと停止の違いは以下の通りです:

取り消しの呼び出し時に、ジョブ内のオペレータはできる限りすぐにそれらを取り消すようにすぐにcancel() メソッドを受け取ります。中止の呼び出しの後でオペレータが停止しない場合は、Flinkはそれが停止するまで定期的にスレッドの中断を開始するでしょう。

"停止"呼び出しは実行中のストリーミングジョブのもっと緩やかな停止方法です。StoppableFunction インタフェースを実装するソースを使うジョブは停止だけが利用可能です。ユーザがジョブを停止するように要求する場合、全てのソースはstop()メソッドの呼び出しを受け取るでしょう。ジョブは全てのソースが適切にシャットダウンするまで実行し続けるでしょう。これによりジョブは全ての実行中の処理を完了することができます。

セーブポイント

Savepoints はコマンドラインクライアントによって制御されます:

savepointの起動

./bin/flink savepoint <jobID> [savepointDirectory]

生成された savepoint のパスを返す。savepoints を回復および破棄するにはこのパスが必要です。

You can optionally specify a savepointDirectory when triggering the savepoint. If you don’t specify one here, you need to configure a default savepoint directory for the Flink installation (see Savepoints).

Cancel with a savepoint

You can atomically trigger a savepoint and cancel a job.

./bin/flink cancel -s  [savepointDirectory] <jobID>

If no savepoint directory is configured, you need to configure a default savepoint directory for the Flink installation (see Savepoints).

The job will only be cancelled if the savepoint succeeds.

savepointを回復する

./bin/flink run -s <savepointPath> ...

runコマンドはジョブをサブミットするためのsavepointフラグを持ちます。これはsavepointから状態を回復します。savepoint パスは savepoint trigger コマンドによって返されます。

By default, we try to match all savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the allowNonRestoredState flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.

./bin/flink run -s <savepointPath> -n ...

This is useful if your program dropped an operator that was part of the savepoint.

savepoint を削除します。

./bin/flink savepoint -d <savepointPath>

指定されたパスのsavepointを削除します。savepoint パスは savepoint trigger コマンドによって返されます。

If you use custom state instances (for example custom reducing state or RocksDB state), you have to specify the path to the program JAR with which the savepoint was triggered in order to dispose the savepoint with the user code class loader:

./bin/flink savepoint -d <savepointPath> -j <jarFile>

そうでなければ、ClassNotFoundExceptionになるでしょう。

使い方

コマンドラインの構文は以下の通りです:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

以下のアクションが利用可能です:

アクション "run" はプログラムをコンパイルし実行します。

  構文: run [OPTIONS] <jar-file> <arguments>
  "run"アクションのオプション:
     -c,--class <classname>                         Class with the program entry
                                                    point ("main" method or
                                                    "getPlan()" method. Only
                                                    needed if the JAR file does
                                                    not specify the class in its
                                                    manifest.
     -C,--classpath <url>                           Adds a URL to each user code
                                                    classloader  on all nodes in
                                                    the cluster. The paths must
                                                    specify a protocol (e.g.
                                                    file://) and be accessible
                                                    on all nodes (e.g. by means
                                                    of a NFS share). You can use
                                                    this option multiple times
                                                    for specifying more than one
                                                    URL. The protocol must be
                                                    supported by the {@link
                                                    java.net.URLClassLoader}.
     -d,--detached                                  If present, runs the job in
                                                    detached mode
     -m,--jobmanager <host:port>                    Address of the JobManager
                                                    (master) to which to
                                                    connect. Use this flag to
                                                    connect to a different
                                                    JobManager than the one
                                                    specified in the
                                                    configuration.
     -n,--allowNonRestoredState                     Allow non restored savepoint
                                                    state in case an operator has
                                                    been removed from the job.
     -p,--parallelism <parallelism>                 The parallelism with which
                                                    to run the program. オプション
                                                    flag to override the default
                                                    value specified in the
                                                    configuration.
     -q,--sysoutLogging                             If present, suppress logging
                                                    output to standard out.
     -s,--fromSavepoint <savepointPath>             Path to a savepoint to
                                                    restore the job from (for
                                                    例
                                                    hdfs:///flink/savepoint-1537
                                                    ).
     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
                                                    Zookeeper sub-paths for high
                                                    availability mode

  yarn-clusterモードのためのオプション:
     -yD <arg>                            Dynamic properties
     -yd,--yarndetached                   Start detached
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=タスクマネージャーの数)
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t は送信を表します)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                          MB]
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

  構文: info [OPTIONS] <jar-file> <arguments>
  "info"アクションのオプション:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "list" lists running and scheduled programs.

  構文: list [OPTIONS]
  "list"アクションのオプション:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
     -r,--running                  Show only running programs and their JobIDs
     -s,--scheduled                Show only scheduled programs and their JobIDs
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "stop" stops a running program (streaming jobs only).

  構文: stop [OPTIONS] <Job ID>
  "stop"アクションのオプション:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "cancel" cancels a running program.

  構文: cancel [OPTIONS] <Job ID>
  "cancel"アクションのオプション:
     -m,--jobmanager <host:port>            Address of the JobManager (master)
                                            to which to connect. Use this flag
                                            to connect to a different JobManager
                                            than the one specified in the
                                            configuration.
     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                                            The target directory is optional. If
                                            no directory is specified, the
                                            configured default directory
                                            (state.savepoints.dir) is used.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint"アクションのオプション:
     -d,--dispose <arg>            Path of savepoint to dispose.
     -j,--jarfile <jarfile>        Flink program JAR file.
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
TOP
inserted by FC2 system