Flinkはたとえ1つのJavaバーチャルマシーン内でも一つのマシーン上で実行することができます。これによりユーザはFlinkプログラムをローカルでテストおよびデバッグすることができます。この章はローカルの実行機構の概要を説明します。
The local environments and executors allow you to run Flink programs in a local Java Virtual Machine, or with within any JVM as part of existing programs. ほとんどの例は単純にIDEの"Run"ボタンを叩くことでローカルで起動することができます。
Flinkでサポートされるローカルの実行の二つの異なる種類があります。LocalExecutionEnvironment
は、ジョブマネージャーおよびタスクマネージャーを含む完全なFlinkランタイムを開始します。これらはメモリ管理とクラスタモードで実行される全ての内部的なアルゴリズムを含みます。
CollectionEnvironment
はJavaコレクション上のFlinkプログラムを実行します。このモードは完全なFlinkランタイムを開始しないでしょう。つまり実行はとてもオーバーヘッドが低く、軽いです。例えば、DataSet.map()
-transformation はmap()
関数をJavaリストの全ての要素に適用することで実行されるでしょう。
ローカルでFlinkプログラムを実行している場合、プログラムを他のJavaプログラムと同じようにデバッグすることもできます。なんらかの内部変数を書き出すためにSystem.out.println()
を使うか、デバッガーを使うことができます。map()
、reduce()
および他の全てのメソッド内にブレークポイントを設定することができます。テストおよびJava API内のローカルデバッギングユーティリティのガイドとして、Java APIドキュメント内のデバッギングの章も参照してください。
プログラムをMavenプロジェクト内で開発している場合、以下の依存を使ってflink-clients
モジュールを追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
LocalEnvironment
はFlinkプログラムのためのローカル実行の取っ掛かりです。Use it to run a program within a local JVM - standalone or embedded in other programs.
ローカル環境はメソッドExecutionEnvironment.createLocalEnvironment()
によってインスタンス化されます。デフォルトでは、マシーンのCPUコア(ハードウェアコンテキスト)と同数の実行のためのローカルスレッドを使うでしょう。そうではなく、望むだけの並行度を指定することができます。ローカル環境はenableLogging()
/disableLogging()
を使ってコンソールに記録をするように設定することができます。
ほとんどの場合、ExecutionEnvironment.getExecutionEnvironment()
を呼び出すことがよりよいやり方です。プログラムがローカル(コマンドラインインタフェースの外側)で開始した時にメソッドは LocalEnvironment
を返し、プログラムが コマンドラインインタフェースによって起動された場合は、クラスタの実行のためにあらかじめ設定された環境を返すでしょう。
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> data = env.readTextFile("file:///path/to/file");
data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://");
}
})
.writeAsText("file:///path/to/result");
JobExecutionResult res = env.execute();
}
JobExecutionResult
オブジェクト、これは実行が完了した後に返されます、はプログラムランタイムとaccumulatorの結果を含みます。
LocalEnvironment
によって独自の設定値をFlinkに渡すこともできます。
Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
注意: ローカル実行環境は実行を監視するためのwebフロントエンドを開始しません。
CollectionEnvironment
を使ったJavaコレクション上の実行は、Flnkプログラムを実行するための低オーバーヘッドなやり方です。このモードの代表的なユースケースは自動化テスト、デバッギング、およびコードの再利用です。
Users can use algorithms implemented for batch processing also for cases that are more interactive. A slightly changed variant of a Flink program could be used in a Java Application Server for processing incoming requests.
コレクション ベースの実行のための骨組み
public static void main(String[] args) throws Exception {
// initialize a new Collection-based execution environment
final ExecutionEnvironment env = new CollectionEnvironment();
DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
/* Data Set transformations ... */
// retrieve the resulting Tuple2 elements into a ArrayList.
Collection<...> result = new ArrayList<...>();
resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
// kick off execution.
env.execute();
// Do some work with the resulting ArrayList (=Collection).
for(... t : result) {
System.err.println("Result = "+t);
}
}
flink-examples-batch
モジュールは、CollectionExecutionExample
と呼ばれる完全な例を含みます。
コレクション ベースのFlinkプログラムの実行はJVMヒープ内に収まる小さなデータにのみ可能なことに注意してください。コレクション上での十個うはマルチスレッドではなく、一つのスレッドだけが使われます。