Sample Project using the Java API

2,3の簡単なステップでFlink Javaプログラムを動かす

必要条件

唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。

プロジェクトの生成

プロジェクトを作成するために以下のコマンドのうちの一つを使います:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.3-SNAPSHOT
これは新しく作成されたプロジェクトに名前を付けることができます。それは対話的に groupId, artifactId およびパッケージ名を尋ねます。
$ curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash

プロジェクトの精査

作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstartと呼ばれます。Otherwise, it has the name of your artifactId:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               ├── SocketTextStreamWordCount.java
        │               ├── StreamingJob.java
        │               └── WordCount.java
        └── resources
            └── log4j.properties

例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJobBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。

それを開発およびテストするためにこのプロジェクトをIDEにインポートすることをお勧めします。Eclipseを使うバイアは、m2e プラグインMaven プロジェクトをインポートすることができます。いくつかのEclipseのバンドルはデフォルトでそのプラグインを含みますが、その他は手動でインストールする必要があります。The IntelliJ IDE supports Maven projects out of the box.

A note to Mac OS X users: The default JVM heapsize for Java is too small for Flink. 手動でそれを増やす必要があります。In Eclipse, choose Run Configurations -> Arguments and write into the VM Arguments box: -Xmx800m.

プロジェクトのビルド

プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean install -Pbuild-jar コマンドを発行します。You will find a jar that runs on every Flink cluster with a compatible version, target/original-your-artifact-id-your-version.jar. There is also a fat-jar in target/your-artifact-id-your-version.jar which, additionally, contains all dependencies that were added to the Maven project.

次のステップ

アプリケーションを書きます!

The quickstart project contains a WordCount implementation, the “Hello World” of Big Data processing systems. The goal of WordCount is to determine the frequencies of words in a text, e.g., how often do the terms “the” or “house” occur in all Wikipedia texts.

Sample Input:

ビッグデータはビッグです。

出力の例:

big 2
data 1
is 1

The following code shows the WordCount implementation from the Quickstart which processes some text lines with two operators (a FlatMap and a Reduce operation via aggregating a sum), and prints the resulting words and counts to std-out.

public class WordCount {

  public static void main(String[] args) throws Exception {

    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // get input data
    DataSet<String> text = env.fromElements(
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,"
        );

    DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .sum(1);

    // execute and print result
    counts.print();
  }
}

オペレータは特別なクラス、ここではLineSplitterクラス、によって定義されます。

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line
    String[] tokens = value.toLowerCase().split("\\W+");

    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));
      }
    }
  }
}

コードの完全な例についてはGitHubをチェックしてください

For a complete overview over our API, have a look at the DataStream API and DataSet API sections. 何か門ぢアがあれば、メーリング リストで聞いてください。喜んで手伝います。

TOP
inserted by FC2 system