Sample Project using the Scala API

ビルドツール

Flink プロジェクトは異なるビルドツールを使ってビルドすることができます。素早く始めるために、Flinkは以下のビルドツールのためのプロジェクトテンプレートを提供します:

これらのテンプレートはプロジェクトの構造をセットアップし初期ビルドファイルを生成するのに役立ちます。

SBT

プロジェクトの生成

$ g8 tillrohrmann/flink-project
これは特定の プロジェクトディレクトリ内にflink-project テンプレートからFlinkプロジェクトを生成するでしょう。giter8をインストールしていない場合は、このインストレーション ガイドに従ってください。
$ git clone https://github.com/tillrohrmann/flink-project.git
これはflink-projectディレクトリ内にFlinkプロジェクトを生成するでしょう。
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
これは特定の プロジェクト ディレクトリ内にFlinkプロジェクトを生成するでしょう。

プロジェクトのビルド

プロジェクトをビルドするためには、単純にsbt clean assembly コマンドを発行する必要があります。これは fat-jar your-project-name-assembly-0.1-SNAPSHOT.jartarget/scala_your-major-scala-version/ディレクトリに生成するでしょう。

プロジェクトの実行

プロジェクトを実行するには、sbt runコマンドを実行する必要があります。

デフォルトでは、sbt が実行しているのと同じJVM内でジョブを実行するでしょう。別個のJVM内でジョブを実行するには、以下の行をbuild.sbtに追加してください。

fork in run := true

IntelliJ

Flink ジョブ開発には、IntelliJの使用をお勧めします。開始するには、新しく生成されたプロジェクトをIntelliJにインポートする必要があります。これはFile -> New -> Project from Existing Sources...からプロジェクトのディレクトリを選択することで行うことができます。IntelliJ は自動的にbuild.sbt ファイルを検知し全てをセットアップすることができます。

Flinkジョブを実行するためには、Run/Debug ConfigurationのクラスパスとしてmainRunnerモジュールを選択することをお勧めします。これにより、providedに設定されている全ての依存が実行時に利用可能になることを確実にします。Run -> Edit Configurations...を使ってRun/Debug Configurations を設定し、Use classpath of module ドロップボックスからmainRunner を選択することができます。

Eclipse

新しく作成されたプロジェクトをEclipseにインポートするには、まずそのためのEclipseプロジェクトを生成する必要があります。これらのプロジェクトファイルはsbteclipse プラグインを使って生成することができます。以下の行を PROJECT_DIR/project/plugins.sbt ファイルに追加してください:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

Eclipseプロジェクトファイルを生成するために、sbtの中で以下のコマンドを使用します

> eclipse

これで、File -> Import... -> Existing Projects into Workspace を使ってEclipseにプロジェクトをインポートし、プロジェクトディレクトリを選択することができます。

Maven

必要条件

唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。

プロジェクトの生成

プロジェクトを作成するために以下のコマンドのうちの一つを使います:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.3-SNAPSHOT
これは新しく作成されたプロジェクトに名前を付けることができます。それは対話的に groupId, artifactId およびパッケージ名を尋ねます。
$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash

プロジェクトの精査

作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstartと呼ばれます。Otherwise, it has the name of your artifactId:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── org
                └── myorg
                    └── quickstart
                        ├── BatchJob.scala
                        ├── SocketTextStreamWordCount.scala
                        ├── StreamingJob.scala
                        └── WordCount.scala

例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJobBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。

このプロジェクトをIDEにインポートすることをお勧めします。Eclipseについては、Eclipse Update Sitesからインストールすることができる以下のプラグインが必要です:

The IntelliJ IDE supports Maven out of the box and offers a plugin for Scala development.

プロジェクトのビルド

プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean package -Pbuild-jarコマンドを発行します。You will find a jar that runs on every Flink cluster with a compatible version, target/original-your-artifact-id-your-version.jar. There is also a fat-jar in target/your-artifact-id-your-version.jar which, additionally, contains all dependencies that were added to the Maven project.

次のステップ

アプリケーションを書きます!

The quickstart project contains a WordCount implementation, the “Hello World” of Big Data processing systems. The goal of WordCount is to determine the frequencies of words in a text, e.g., how often do the terms “the” or “house” occur in all Wikipedia texts.

Sample Input:

ビッグデータはビッグです。

出力の例:

big 2
data 1
is 1

The following code shows the WordCount implementation from the Quickstart which processes some text lines with two operators (a FlatMap and a Reduce operation via aggregating a sum), and prints the resulting words and counts to std-out.

object WordCountJob {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // emit result and print result
    counts.print()
  }
}

コードの完全な例についてはGitHubをチェックしてください

For a complete overview over our API, have a look at the DataStream API, DataSet API, and Scala API Extensions sections. If you have any trouble, ask on our Mailing List. 喜んで手伝います。

TOP
inserted by FC2 system