重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

クイックスタート: Java API

2,3の簡単なステップでFlink Javaプログラムを動かす

必要条件

唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。

プロジェクトの生成

プロジェクトを作成するために以下のコマンドのうちの一つを使います:

$ mvn archetype:generate \
 -DarchetypeGroupId=org.apache.flink \
 -DarchetypeArtifactId=flink-quickstart-java \
 -DarchetypeVersion=1.1-SNAPSHOT
これは新しく作成されたプロジェクトに名前を付けることができます。それは対話的に groupId, artifactId およびパッケージ名を尋ねます。
$ curl https://flink.apache.org/q/quickstart.sh | bash

プロジェクトの精査

作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstartと呼ばれます。そうでなければ、それはartifactId の名前を持ちます。

例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJobBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。

それを開発およびテストするためにこのプロジェクトをIDEにインポートすることをお勧めします。Eclipseを使うバイアは、m2e プラグインMaven プロジェクトをインポートすることができます。いくつかのEclipseのバンドルはデフォルトでそのプラグインを含みますが、その他は手動でインストールする必要があります。IntelliJ IDE もそのままでMavenプロジェクトをサポートします。

Mac OS X ユーザへの注意: デフォルトのJavaのためのJVMヒープサイズはFlinkにとって小さすぎます。手動でそれを増やす必要があります。"Run Configurations" -> Arguments を選択し、"VM Arguments" ボックスに以下を書き込みます : Eclipseでの"-Xmx800m"。

プロジェクトのビルド

プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean install -Pbuild-jar コマンドを発行します。target/your-artifact-id-1.0-SNAPSHOT.jarの中で各Flinkクラスタ上で実行するjarを見つけるでしょう。fat-jar、target/your-artifact-id-1.0-SNAPSHOT-flink-fat-jar.jarもあります。これもmavenプロジェクトへ追加される全ての依存を含みます。T

次のステップ

アプリケーションを書きます!

クイックスタートプロジェクトはWordCount実装を含みます。これはビッグデータ処理システムでの"Hello World"です。WordCountの目的はテキスト内の単語の頻度を確定することです。例えば、全てのWikipediaのテキストの中で単語"the"あるいは"house"がどれだけ現れるか。

Sample Input:

ビッグデータはビッグです。

出力の例:

big 2
data 1
is 1

以下のコードは、二つのオペレータ(FlatMap と Reduce)を持ちいくつかのテキストの行を処理するクイックスタートからのWordCountの実装を示します。

public class WordCount {
  
  public static void main(String[] args) throws Exception {
    
    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // get input data
    DataSet<String> text = env.fromElements(
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,"
        );
    
    DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .aggregate(Aggregations.SUM, 1);

    // emit result
    counts.print();
  }
}

オペレータは特別なクラス、ここではLineSplitterクラス、によって定義されます。

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line into words
    String[] tokens = value.toLowerCase().split("\\W+");
    
    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));
      }
    }
  }
}

コードの完全な例についてはGitHubをチェックしてください

APIの完全な概要については、プログラミング ガイド および もっと多くのプログラムの例を見てください。何か門ぢアがあれば、メーリング リストで聞いてください。喜んで手伝います。

TOP
inserted by FC2 system