Sample Project using the Java API

2,3の簡単なステップでFlink Javaプログラムを動かす

必要条件

唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。

プロジェクトの生成

プロジェクトを作成するために以下のコマンドのうちの一つを使います:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.3-SNAPSHOT
これは新しく作成されたプロジェクトに名前を付けることができます。それは対話的に groupId, artifactId およびパッケージ名を尋ねます。
$ curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash

プロジェクトの精査

作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstartと呼ばれます。Otherwise, it has the name of your artifactId:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               ├── SocketTextStreamWordCount.java
        │               ├── StreamingJob.java
        │               └── WordCount.java
        └── resources
            └── log4j.properties

例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJobBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。

それを開発およびテストするためにこのプロジェクトをIDEにインポートすることをお勧めします。Eclipseを使う場合は、m2e プラグインMaven プロジェクトをインポートすることができます。いくつかのEclipseのバンドルはデフォルトでそのプラグインを含みますが、その他は手動でインストールする必要があります。IntelliJ IDE もそのままでMavenプロジェクトをサポートします。

Mac OS X ユーザへの注意: デフォルトのJavaのためのJVMヒープサイズはFlinkにとって小さすぎます。手動でそれを増やす必要があります。Eclipseでは、Run Configurations -> Arguments を選択し、VM Arguments ボックスに以下を書き込みます: -Xmx800m

プロジェクトのビルド

プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean install -Pbuild-jar コマンドを発行します。各Flinkクラスタ上で互換のバージョンがあるjarを見つけるでしょう。target/original-your-artifact-id-your-version.jar。更にtarget/your-artifact-id-your-version.jar にMavenプロジェクトに全ての依存が含まれているfat-jarがあります。

次のステップ

アプリケーションを書きます!

クイックスタートプロジェクトはWordCount実装を含みます。これはビッグデータ処理システムでの"Hello World"です。WordCountの目的はテキスト内の単語の頻度を確定することです。例えば、全てのWikipediaのテキストの中で単語"the"あるいは"house"がどれだけ現れるか。

Sample Input:

ビッグデータはビッグです。

出力の例:

big 2
data 1
is 1

以下のコードは、二つのオペレータ(合計を集約する FlatMap と Reduce)を持ちいくつかのテキストの行を処理するクイックスタートからのWordCountの実装を示します。

public class WordCount {

  public static void main(String[] args) throws Exception {

    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // get input data
    DataSet<String> text = env.fromElements(
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,"
        );

    DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .sum(1);

    // execute and print result
    counts.print();
  }
}

オペレータは特別なクラス、ここではLineSplitterクラス、によって定義されます。

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line
    String[] tokens = value.toLowerCase().split("\\W+");

    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));
      }
    }
  }
}

コードの完全な例についてはGitHubをチェックしてください

APIの完全な概要については、データストリーム API および データセット APIの章を見てください。何か問題があれば、メーリング リストで聞いてください。喜んで手伝います。

TOP
inserted by FC2 system