重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

Scala シェル

Flink には統合されたインタラクティブなScalaシェルが付属しています。それはローカルセットアップおよびクラスタセットアップで使うことができます。

統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:

bin/start-scala-shell.sh local

バイナリのFlinkディレクトリのルートディレクトリで。クラスタ上でシェルを実行するには、以下のセットアップの章を見てください。

使い方

シェルはバッチとストリーミングをサポートします。スタートアップの後で二つの異なるExecutionEnvironments が自動的に事前紐付けされます。バッチおよびストリーミング環境それぞれにアクセスするには、"benv"と"senv"を使います。

データセット API

以下の例はScalaシェル内でwordcountプログラムを実行するでしょう。

Scala-Flink> val text = benv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val counts = text
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }.groupBy(0).sum(1)
Scala-Flink> counts.print()

print()コマンドは実行のために自動的に指定されたタスクをジョブマネージャーに送信し、計算の結果をターミナル内に表示するでしょう。

結果をファイルに書き込むことが可能です。しかし、この場合、プログラムを実行するためにexecuteを呼び出す必要があります:

Scala-Flink> benv.execute("MyProgram")

データストリーム API

上のバッチプログラムと似て、データストリームAPIを通じてストリーミングプログラムを実行することができます:

Scala-Flink> val textStreaming = senv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala-Flink> val countsStreaming = textStreaming
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }.keyBy(0).sum(1)
Scala-Flink> countsStreaming.print()
Scala-Flink> senv.execute("Streaming Wordcount")

注意。ストリーミングの場合、print操作は直接実行を引き起こしません。

Flink シェルにはコマンド履歴と自動補完が付属します。

外部依存の追加

外部クラスパスをScalaシェルに追加することができます。呼び出しが実行された場合に、これらはシェルプログラムと並行して自動的にジョブマネージャーに送信されるでしょう。

追加のクラスをロードするには、-a <path/to/jar.jar> あるいは --addclasspath <path/to/jar.jar> パラメータを使います。

bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>

セットアップ

Scalaシェルがどのようなオプションを提供するかの外観を知るためには、以下を使ってください

bin/start-scala-shell.sh --help

ローカル

統合されたFlinkクラスタと一緒にシェルを使うには、単に以下のように実行します:

bin/start-scala-shell.sh local

リモート

実行中のクラスタと一緒に使う場合は、キーワードremote と一緒にscalaシェルを開始し、以下のようにジョブマネージャーのホストとポートを与えます:

bin/start-scala-shell.sh remote <hostname> <portnumber>

Yarn Scala シェル クラスタ

シェルはFlinkクラスタをYARNにデプロイします。これはシェルによって排他的に使用されます。YARNコンテナの数はパラメータ -n <arg>によって制御することができます。シェルは新しいFlinkクラスタをYARN上に配備し、クラスタに接続します。ジョブマネージャーのメモリ、YARNアプリケーションの名前などのようなYARNクラスタのためのオプションを指定することもできます。

例えば、二つのタスクマネージャを持つScalaシェルのためのYARNクラスタを開始するには、以下を使います:

 bin/start-scala-shell.sh yarn -n 2

他の全てのオプションに関しては、一番下の完全なリファレンスを見てください。

Yarn セッション

以前にFlink Yarnセッションを使ってFlinkクラスタをデプロイしたのであれば、Scalaシェルは以下のコマンドを使って接続することができます:

 bin/start-scala-shell.sh yarn

完全なリファレンス

Flink Scala シェル
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...

コマンド: local [options]
local Flink クラスタを使ってFlink scalaシェルを開始する
 -a <path/to/jar> | --addclasspath <path/to/jar>
 Flink内で使われる追加のjarを指定する
コマンド: remote [options] <host> <port>
リモートのクラスタへ接続するFlink scalaシェルを開始する
 <host>
 文字としてのリモートホスト名
 <port>
 数値としてのリモートポート

 -a <path/to/jar> | --addclasspath <path/to/jar>
 Flinkで使われる追加のjarを指定する
コマンド: yarn [options]
yarnクラスタに接続するFlink scalaシェルを開始する
 -n arg | --container arg
 割り当てるYARNコンテナの数 (= タスクマネージャの数)
 -jm arg | --jobManagerMemory arg
 ジョブマネージャのための メモリ [in MB]
 -nm <value> | --name <value>
 YARN上のアプリケーションのための 独自の名前を設定する
 -qu <arg> | --queue <arg>
 YARNキューを指定する
 -s <arg> | --slots <arg>
 タスクマネージャあたりのスロットの数
 -tm <arg> | --taskManagerMemory <arg>
 タスクマネージャのコンテナあたりのメモリ [in MB]
 -a <path/to/jar> | --addclasspath <path/to/jar>
 Flink内で使われる追加のjarを指定する
 --configDir <value>
 設定ディレクトリ
 -h | --help
 この使い方のテキストを表示する
TOP
inserted by FC2 system