Flink には統合されたインタラクティブなScalaシェルが付属しています。それはローカルセットアップおよびクラスタセットアップで使うことができます。
統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:
bin/start-scala-shell.sh local
バイナリのFlinkディレクトリのルートディレクトリで。クラスタ上でシェルを実行するには、以下のセットアップの章を見てください。
シェルはバッチとストリーミングをサポートします。スタートアップの後で二つの異なるExecutionEnvironments が自動的に事前紐付けされます。バッチおよびストリーミング環境それぞれにアクセスするには、"benv"と"senv"を使います。
以下の例はScalaシェル内でwordcountプログラムを実行するでしょう。
Scala-Flink> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()
print()コマンドは実行のために自動的に指定されたタスクをジョブマネージャーに送信し、計算の結果をターミナル内に表示するでしょう。
結果をファイルに書き込むことが可能です。しかし、この場合、プログラムを実行するためにexecute
を呼び出す必要があります:
Scala-Flink> benv.execute("MyProgram")
上のバッチプログラムと似て、データストリームAPIを通じてストリーミングプログラムを実行することができます:
Scala-Flink> val textStreaming = senv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")
注意。ストリーミングの場合、print操作は直接実行を引き起こしません。
Flink シェルにはコマンド履歴と自動補完が付属します。
外部クラスパスをScalaシェルに追加することができます。呼び出しが実行された場合に、これらはシェルプログラムと並行して自動的にジョブマネージャーに送信されるでしょう。
追加のクラスをロードするには、-a <path/to/jar.jar>
あるいは --addclasspath <path/to/jar.jar>
パラメータを使います。
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
Scalaシェルがどのようなオプションを提供するかの外観を知るためには、以下を使ってください
bin/start-scala-shell.sh --help
統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:
bin/start-scala-shell.sh local
実行中のクラスタと一緒に使う場合は、キーワードremote
と一緒にscalaシェルを開始し、以下のようにジョブマネージャーのホストとポートを与えます:
bin/start-scala-shell.sh remote <hostname> <portnumber>
シェルはFlinkクラスタをYARNにデプロイします。これはシェルによって排他的に使用されます。YARNコンテナの数はパラメータ -n <arg>
によって制御することができます。シェルは新しいFlinkクラスタをYARN上に配備し、クラスタに接続します。ジョブマネージャーのメモリ、YARNアプリケーションの名前などのようなYARNクラスタのためのオプションを指定することもできます。
例えば、二つのタスクマネージャを持つScalaシェルのためのYARNクラスタを開始するには、以下を使います:
bin/start-scala-shell.sh yarn -n 2
他の全てのオプションに関しては、一番下の完全なリファレンスを見てください。
以前にFlink Yarnセッションを使ってFlinkクラスタをデプロイしたのであれば、Scalaシェルは以下のコマンドを使って接続することができます:
bin/start-scala-shell.sh yarn
Flink Scala シェル
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
コマンド: local [options]
local Flink クラスタを使ってFlink scalaシェルを開始する
-a <path/to/jar> | --addclasspath <path/to/jar>
Flink内で使われる追加のjarを指定する
コマンド: remote [options] <host> <port>
リモートのクラスタへ接続するFlink scalaシェルを開始する
<host>
文字としてのリモートホスト名
<port>
数値としてのリモートポート
-a <path/to/jar> | --addclasspath <path/to/jar>
Flinkで使われる追加のjarを指定する
コマンド: yarn [options]
yarnクラスタに接続するFlink scalaシェルを開始する
-n arg | --container arg
割り当てるYARNコンテナの数 (= タスクマネージャの数)
-jm arg | --jobManagerMemory arg
ジョブマネージャのための メモリ [in MB]
-nm <value> | --name <value>
YARN上のアプリケーションのための 独自の名前を設定する
-qu <arg> | --queue <arg>
YARNキューを指定する
-s <arg> | --slots <arg>
タスクマネージャあたりのスロットの数
-tm <arg> | --taskManagerMemory <arg>
タスクマネージャのコンテナあたりのメモリ [in MB]
-a <path/to/jar> | --addclasspath <path/to/jar>
Flink内で使われる追加のjarを指定する
--configDir <value>
設定ディレクトリ
-h | --help
この使い方のテキストを表示する