コマンドライン インタフェース

Flink はJARファイルとしてパッケージされているプログラムを実行し、それらの実行を制御するために、コマンドライン インタフェースを提供します。コマンドライン インタフェースはFlinkセットアップの一部であり、ローカルのシングルノードセットアップおよび分散セットアップで利用可能です。<flink-home>/bin/flink の下にあり、デフォルトで同じインストレーションディレクトリから開始された実行中のFlinkマスター(JobManager)に接続します。

コマンドライン インタフェースを使う前提条件は、Flinkマスター (JobManager) が (<flink-home>/bin/start-local.sh あるいは <flink-home>/bin/start-cluster.shを使って) 開始されているか、YARN環境が利用可能であるか、です。

コマンドリアンは以下のために使うことができます

  • 引数無しで例のプログラムを実行する。

    ./bin/flink run ./examples/batch/WordCount.jar
    
  • 入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • 並行度16および入力と結果のファイルのための引数と一緒に例のプログラムを実行する

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                         --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • Flinkログの出力を無効にして例のプログラムを実行する

        ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • デタッチモードで例のプログラムを実行する

        ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • 特定のJobManager上で例のプログラムを実行する:

    ./bin/flink run -m myJMHost:6123 \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • エントリーポイントとして特定のクラスを使って例のプログラムを実行する:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
                           ./examples/batch/WordCount.jar \
                           --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • per-job YARN クラスタを使った例のプログラムを2つのタスクマネージャーで実行する

    ./bin/flink run -m yarn-cluster -yn 2 \
                           ./examples/batch/WordCount.jar \
                           --input hdfs:///user/hamlet.txt --output hdfs:///user/wordcount_out
    
  • JSONとしてWordCountの例のプログラムのための最適化された実行プランを表示する:

    ./bin/flink info ./examples/batch/WordCount.jar \
                            --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
    
  • (JobIDを含む)スケジュールされた、および実行中のジョブをリスト表示する:

    ./bin/flink list
    
  • (JobIDを含む)スケジュールされたジョブをリスト表示する:

    ./bin/flink list -s
    
  • (JobIDを含む)実行中のジョブをリスト表示する:

    ./bin/flink list -r
    
  • Flink YARNセッション内で実行中のFlinkジョブをリスト表示する

    ./bin/flink list -m yarn-cluster -yid <yarnApplicationID> -r
    
  • ジョブを取り消す:

    ./bin/flink cancel <jobID>
    
  • セーブポイントを使ってジョブを取り消す:

    ./bin/flink cancel -s [targetDirectory] <jobID>
    
  • ジョブを止める (ストリーミングジョブのみ):

    ./bin/flink stop <jobID>
    

(ストリーミング)ジョブの取り消しと停止の違いは以下の通りです:

取り消しの呼び出し時に、ジョブ内のオペレータはできる限りすぐにそれらを取り消すようにすぐにcancel() メソッドを受け取ります。中止の呼び出しの後でオペレータが停止しない場合は、Flinkはそれが停止するまで定期的にスレッドの中断を開始するでしょう。

"停止"呼び出しは実行中のストリーミングジョブのもっと緩やかな停止方法です。StoppableFunction インタフェースを実装するソースを使うジョブは停止だけが利用可能です。ユーザがジョブを停止するように要求する場合、全てのソースはstop()メソッドの呼び出しを受け取るでしょう。ジョブは全てのソースが適切にシャットダウンするまで実行し続けるでしょう。これによりジョブは全ての実行中の処理を完了することができます。

セーブポイント

Savepoints はコマンドラインクライアントによって制御されます:

セーブポイントを起動

./bin/flink savepoint <jobId> [savepointDirectory]

This will trigger a savepoint for the job with ID jobId, and returns the path of the created savepoint. savepoints を回復および破棄するにはこのパスが必要です。

Furthermore, you can optionally specify a target file system directory to store the savepoint in. ディレクトリはジョブマネージャーによってアクセス可能である必要があります。

If you don’t specify a target directory, you need to have configured a default directory (see Savepoints). そうでなければ、セーブポイントの起動は失敗するでしょう。

Trigger a Savepoint with YARN

./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>

This will trigger a savepoint for the job with ID jobId and YARN application ID yarnAppId, and returns the path of the created savepoint.

Everything else is the same as described in the above Trigger a Savepoint section.

セーブポイントを使って取り消す

アトミックにセーブポイントの起動とジョブの取り消しをすることができます。

./bin/flink cancel -s  [savepointDirectory] <jobID>

セーブポイントのディレクトリが設定されない場合、Flinkインストレーションのデフォルトのセーブポイント ディレクトリを設定する必要があります (Savepointsを見てください)。

セーブポイントが成功した場合のみ、ジョブが取り消されるでしょう。

savepointを回復する

./bin/flink run -s <savepointPath> ...

runコマンドはジョブをサブミットするためのsavepointフラグを持ちます。これはsavepointから状態を回復します。savepoint パスは savepoint trigger コマンドによって返されます。

デフォルトで全てのセーブポイントの状態をサブミットされたジョブに一致させようとします。新しいジョブで回復できないセーブポイントの状態をスキップできるようにしたい場合は、allowNonRestoredState フラグを設定することができます。セーブポイントが起動され、そのセーブポイントをまだ使いたい時に、プログラムの一部であったプログラムからオペレータを削除した場合は、これを許容する必要があります。

./bin/flink run -s <savepointPath> -n ...

プログラムがセーブポイントの一部であったオペレータを落とした場合、これは便利です。

savepoint を削除します。

./bin/flink savepoint -d <savepointPath>

指定されたパスのsavepointを削除します。savepoint パスは savepoint trigger コマンドによって返されます。

(例えば独自のreduce状態あるいはRocksDB状態のような)独自の状態インスタンスを使う場合、ユーザコードクラスローダを持つセーブポイントを処分するためにセーブポイントが起動されたプログラム JARへのパスを指定する必要があります。

./bin/flink savepoint -d <savepointPath> -j <jarFile>

そうでなければ、ClassNotFoundExceptionになるでしょう。

使い方

コマンドラインの構文は以下の通りです:

./flink <ACTION> [OPTIONS] [ARGUMENTS]

以下のアクションが利用可能です:

アクション "run" はプログラムをコンパイルし実行します。

  構文: run [OPTIONS] <jar-file> <arguments>
  "run"アクションのオプション:
     -c,--class <classname>                         Class with the program entry
                                                    point ("main" method or
                                                    "getPlan()" method. Only
                                                    needed if the JAR file does
                                                    not specify the class in its
                                                    manifest.
     -C,--classpath <url>                           Adds a URL to each user code
                                                    classloader  on all nodes in
                                                    the cluster. The paths must
                                                    specify a protocol (e.g.
                                                    file://) and be accessible
                                                    on all nodes (e.g. by means
                                                    of a NFS share). You can use
                                                    this option multiple times
                                                    for specifying more than one
                                                    URL. The protocol must be
                                                    supported by the {@link
                                                    java.net.URLClassLoader}.
     -d,--detached                                  If present, runs the job in
                                                    detached mode
     -m,--jobmanager <host:port>                    Address of the JobManager
                                                    (master) to which to
                                                    connect. Use this flag to
                                                    connect to a different
                                                    JobManager than the one
                                                    specified in the
                                                    configuration.
     -n,--allowNonRestoredState                     Allow non restored savepoint
                                                    state in case an operator has
                                                    been removed from the job.
     -p,--parallelism <parallelism>                 The parallelism with which
                                                    to run the program. オプション
                                                    flag to override the default
                                                    value specified in the
                                                    configuration.
     -q,--sysoutLogging                             If present, suppress logging
                                                    output to standard out.
     -s,--fromSavepoint <savepointPath>             Path to a savepoint to
                                                    restore the job from (for
                                                    例
                                                    hdfs:///flink/savepoint-1537
                                                    ).
     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
                                                    Zookeeper sub-paths for high
                                                    availability mode

  yarn-clusterモードのためのオプション:
     -yD <arg>                            Dynamic properties
     -yd,--yarndetached                   Start detached
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                          MB]
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=タスクマネージャーの数)
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t は送信を表します)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                          MB]
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode



Action "info" shows the optimized execution plan of the program (JSON).

  構文: info [OPTIONS] <jar-file> <arguments>
  "info"アクションのオプション:
     -c,--class <classname>           Class with the program entry point ("main"
                                      method or "getPlan()" method. Only needed
                                      if the JAR file does not specify the class
                                      in its manifest.
     -p,--parallelism <parallelism>   The parallelism with which to run the
                                      program. Optional flag to override the
                                      default value specified in the
                                      configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "list" lists running and scheduled programs.

  構文: list [OPTIONS]
  "list"アクションのオプション:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
     -r,--running                  Show only running programs and their JobIDs
     -s,--scheduled                Show only scheduled programs and their JobIDs
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "stop" stops a running program (streaming jobs only).

  構文: stop [OPTIONS] <Job ID>
  "stop"アクションのオプション:
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "cancel" cancels a running program.

  構文: cancel [OPTIONS] <Job ID>
  "cancel"アクションのオプション:
     -m,--jobmanager <host:port>            Address of the JobManager (master)
                                            to which to connect. Use this flag
                                            to connect to a different JobManager
                                            than the one specified in the
                                            configuration.
     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
                                            The target directory is optional. If
                                            no directory is specified, the
                                            configured default directory
                                            (state.savepoints.dir) is used.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session



Action "savepoint" triggers savepoints for a running job or disposes existing ones.

  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
  "savepoint"アクションのオプション:
     -d,--dispose <arg>            Path of savepoint to dispose.
     -j,--jarfile <jarfile>        Flink program JAR file.
     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
                                   to connect. Use this flag to connect to a
                                   different JobManager than the one specified
                                   in the configuration.
  yarn-clusterモードのためのオプション:
     -yid,--yarnapplicationId <arg>   Attach to running YARN session

上に戻る

TOP
inserted by FC2 system