Flink プロジェクトは異なるビルドツールを使ってビルドすることができます。素早く始めるために、Flinkは以下のビルドツールのためのプロジェクトテンプレートを提供します:
これらのテンプレートはプロジェクトの構造をセットアップし初期ビルドファイルを生成するのに役立ちます。
$ g8 tillrohrmann/flink-project
$ git clone https://github.com/tillrohrmann/flink-project.git
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
プロジェクトをビルドするためには、単純にsbt clean assembly
コマンドを発行する必要があります。これは fat-jar your-project-name-assembly-0.1-SNAPSHOT.jarをtarget/scala_your-major-scala-version/ディレクトリに生成するでしょう。
プロジェクトを実行するには、sbt run
コマンドを実行する必要があります。
デフォルトでは、sbt
が実行しているのと同じJVM内でジョブを実行するでしょう。別個のJVM内でジョブを実行するには、以下の行をbuild.sbt
に追加してください。
fork in run := true
Flink ジョブ開発には、IntelliJの使用をお勧めします。開始するには、新しく生成されたプロジェクトをIntelliJにインポートする必要があります。これはFile -> New -> Project from Existing Sources...
からプロジェクトのディレクトリを選択することで行うことができます。IntelliJ は自動的にbuild.sbt
ファイルを検知し全てをセットアップすることができます。
Flinkジョブを実行するためには、Run/Debug ConfigurationのクラスパスとしてmainRunner
モジュールを選択することをお勧めします。これにより、providedに設定されている全ての依存が実行時に利用可能になることを確実にします。Run -> Edit Configurations...
を使ってRun/Debug Configurations を設定し、Use classpath of module ドロップボックスからmainRunner
を選択することができます。
新しく作成されたプロジェクトをEclipseにインポートするには、まずそのためのEclipseプロジェクトを生成する必要があります。これらのプロジェクトファイルはsbteclipse プラグインを使って生成することができます。以下の行を PROJECT_DIR/project/plugins.sbt
ファイルに追加してください:
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
Eclipseプロジェクトファイルを生成するために、sbt
の中で以下のコマンドを使用します
> eclipse
これで、File -> Import... -> Existing Projects into Workspace
を使ってEclipseにプロジェクトをインポートし、プロジェクトディレクトリを選択することができます。
唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。
プロジェクトを作成するために以下のコマンドのうちの一つを使います:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.3-SNAPSHOT
$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash
作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstart
と呼ばれます。そうでなければ、それはartifactId
の名前を持ちます。
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── resources
│ └── log4j.properties
└── scala
└── org
└── myorg
└── quickstart
├── BatchJob.scala
├── SocketTextStreamWordCount.scala
├── StreamingJob.scala
└── WordCount.scala
例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJob とBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。
このプロジェクトをIDEにインポートすることをお勧めします。Eclipseについては、Eclipse Update Sitesからインストールすることができる以下のプラグインが必要です:
IntelliJ IDE はそのままでMavenをサポートし、Scala開発のためのプラグインを提供します。
プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean package -Pbuild-jar
コマンドを発行します。各Flinkクラスタ上で互換のバージョンがあるjarを見つけるでしょう。target/original-your-artifact-id-your-version.jar。更にtarget/your-artifact-id-your-version.jar にMavenプロジェクトに全ての依存が含まれているfat-jarがあります。
アプリケーションを書きます!
クイックスタートプロジェクトはWordCount
実装を含みます。これはビッグデータ処理システムでの"Hello World"です。WordCount
の目的はテキスト内の単語の頻度を確定することです。例えば、全てのWikipediaのテキストの中で単語"the"あるいは"house"がどれだけ現れるか。
Sample Input:
ビッグデータはビッグです。
出力の例:
big 2
data 1
is 1
以下のコードは、二つのオペレータ(合計を集約する FlatMap と Reduce)を持ちいくつかのテキストの行を処理するクイックスタートからのWordCount
の実装を示します。
object WordCountJob {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// emit result and print result
counts.print()
}
}
コードの完全な例についてはGitHubをチェックしてください。
APIの完全な概要については、データストリーム API、データセット APIおよびScala APIの拡張の章を見てください。何か問題があれば、メーリング リストで聞いてください。喜んで手伝います。