2,3の簡単なステップでFlink Javaプログラムを動かす
唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。
プロジェクトを作成するために以下のコマンドのうちの一つを使います:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.3-SNAPSHOT
$ curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash
作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstart
と呼ばれます。Otherwise, it has the name of your artifactId
:
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ ├── SocketTextStreamWordCount.java
│ ├── StreamingJob.java
│ └── WordCount.java
└── resources
└── log4j.properties
例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJob とBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。
それを開発およびテストするためにこのプロジェクトをIDEにインポートすることをお勧めします。Eclipseを使う場合は、m2e プラグインがMaven プロジェクトをインポートすることができます。いくつかのEclipseのバンドルはデフォルトでそのプラグインを含みますが、その他は手動でインストールする必要があります。IntelliJ IDE もそのままでMavenプロジェクトをサポートします。
Mac OS X ユーザへの注意: デフォルトのJavaのためのJVMヒープサイズはFlinkにとって小さすぎます。手動でそれを増やす必要があります。Eclipseでは、Run Configurations -> Arguments
を選択し、VM Arguments
ボックスに以下を書き込みます: -Xmx800m
。
プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean install -Pbuild-jar
コマンドを発行します。各Flinkクラスタ上で互換のバージョンがあるjarを見つけるでしょう。target/original-your-artifact-id-your-version.jar。更にtarget/your-artifact-id-your-version.jar にMavenプロジェクトに全ての依存が含まれているfat-jarがあります。
アプリケーションを書きます!
クイックスタートプロジェクトはWordCount
実装を含みます。これはビッグデータ処理システムでの"Hello World"です。WordCount
の目的はテキスト内の単語の頻度を確定することです。例えば、全てのWikipediaのテキストの中で単語"the"あるいは"house"がどれだけ現れるか。
Sample Input:
ビッグデータはビッグです。
出力の例:
big 2
data 1
is 1
以下のコードは、二つのオペレータ(合計を集約する FlatMap と Reduce)を持ちいくつかのテキストの行を処理するクイックスタートからのWordCount
の実装を示します。
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
}
オペレータは特別なクラス、ここではLineSplitterクラス、によって定義されます。
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
コードの完全な例についてはGitHubをチェックしてください。
APIの完全な概要については、データストリーム API および データセット APIの章を見てください。何か問題があれば、メーリング リストで聞いてください。喜んで手伝います。