Scala APIを使ったプロジェクトの例

ビルドツール

Flink プロジェクトは異なるビルドツールを使ってビルドすることができます。素早く始めるために、Flinkは以下のビルドツールのためのプロジェクトテンプレートを提供します:

これらのテンプレートはプロジェクトの構造をセットアップし初期ビルドファイルを生成するのに役立ちます。

SBT

プロジェクトの生成

$ g8 tillrohrmann/flink-project
これは特定の プロジェクトディレクトリ内にflink-project テンプレートからFlinkプロジェクトを生成するでしょう。giter8をインストールしていない場合は、このインストレーション ガイドに従ってください。
$ git clone https://github.com/tillrohrmann/flink-project.git
これはflink-projectディレクトリ内にFlinkプロジェクトを生成するでしょう。
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
これは特定の プロジェクト ディレクトリ内にFlinkプロジェクトを生成するでしょう。

プロジェクトのビルド

プロジェクトをビルドするためには、単純にsbt clean assembly コマンドを発行する必要があります。これは fat-jar your-project-name-assembly-0.1-SNAPSHOT.jartarget/scala_your-major-scala-version/ディレクトリに生成するでしょう。

プロジェクトの実行

プロジェクトを実行するには、sbt runコマンドを実行する必要があります。

デフォルトでは、sbt が実行しているのと同じJVM内でジョブを実行するでしょう。別個のJVM内でジョブを実行するには、以下の行をbuild.sbtに追加してください。

fork in run := true

IntelliJ

Flink ジョブ開発には、IntelliJの使用をお勧めします。開始するには、新しく生成されたプロジェクトをIntelliJにインポートする必要があります。これはFile -> New -> Project from Existing Sources...からプロジェクトのディレクトリを選択することで行うことができます。IntelliJ は自動的にbuild.sbt ファイルを検知し全てをセットアップすることができます。

Flinkジョブを実行するためには、Run/Debug ConfigurationのクラスパスとしてmainRunnerモジュールを選択することをお勧めします。これにより、providedに設定されている全ての依存が実行時に利用可能になることを確実にします。Run -> Edit Configurations...を使ってRun/Debug Configurations を設定し、Use classpath of module ドロップボックスからmainRunner を選択することができます。

Eclipse

新しく作成されたプロジェクトをEclipseにインポートするには、まずそのためのEclipseプロジェクトを生成する必要があります。これらのプロジェクトファイルはsbteclipse プラグインを使って生成することができます。以下の行を PROJECT_DIR/project/plugins.sbt ファイルに追加してください:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

Eclipseプロジェクトファイルを生成するために、sbtの中で以下のコマンドを使用します

> eclipse

これで、File -> Import... -> Existing Projects into Workspace を使ってEclipseにプロジェクトをインポートし、プロジェクトディレクトリを選択することができます。

Maven

必要条件

唯一の必要条件は、動作しているMaven 3.0.4 (以上) と Java 7.x (以上)のインストレーションです。

プロジェクトの生成

プロジェクトを作成するために以下のコマンドのうちの一つを使います:

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
      -DarchetypeVersion=1.3-SNAPSHOT
これは新しく作成されたプロジェクトに名前を付けることができます。それは対話的に groupId, artifactId およびパッケージ名を尋ねます。
$ curl https://flink.apache.org/q/quickstart-scala-SNAPSHOT.sh | bash

プロジェクトの精査

作業ディレクトリ内に新しいディレクトリがあるでしょう。curl のやり方を使った場合は、ディレクトリはquickstartと呼ばれます。そうでなければ、それはartifactId の名前を持ちます。

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala
            └── org
                └── myorg
                    └── quickstart
                        ├── BatchJob.scala
                        ├── SocketTextStreamWordCount.scala
                        ├── StreamingJob.scala
                        └── WordCount.scala

例のプロジェクトは Maven projectで、これは4つのクラスを含みます。StreamingJobBatchJob は基本的なスケルトンのプログラムです。SocketTextStreamWordCount は動作するストリーミングの例で、WordCountJob は動作するバッチの例です。全てのクラスのmain メソッドは development/testing モードでFlinkを開始することができることに注意してください。

このプロジェクトをIDEにインポートすることをお勧めします。Eclipseについては、Eclipse Update Sitesからインストールすることができる以下のプラグインが必要です:

IntelliJ IDE はそのままでMavenをサポートし、Scala開発のためのプラグインを提供します。

プロジェクトのビルド

プロジェクトをビルドしたい場合は、プロジェクトディレクトリに行き、mvn clean package -Pbuild-jarコマンドを発行します。各Flinkクラスタ上で互換のバージョンがあるjarを見つけるでしょう。target/original-your-artifact-id-your-version.jar。更にtarget/your-artifact-id-your-version.jar にMavenプロジェクトに全ての依存が含まれているfat-jarがあります。

次のステップ

アプリケーションを書きます!

クイックスタートプロジェクトはWordCount実装を含みます。これはビッグデータ処理システムでの"Hello World"です。WordCountの目的はテキスト内の単語の頻度を確定することです。例えば、全てのWikipediaのテキストの中で単語"the"あるいは"house"がどれだけ現れるか。

Sample Input:

ビッグデータはビッグです。

出力の例:

big 2
data 1
is 1

以下のコードは、二つのオペレータ(合計を集約する FlatMap と Reduce)を持ちいくつかのテキストの行を処理するクイックスタートからのWordCountの実装を示します。

object WordCountJob {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // emit result and print result
    counts.print()
  }
}

コードの完全な例についてはGitHubをチェックしてください

APIの完全な概要については、データストリーム APIデータセット APIおよびScala APIの拡張の章を見てください。何か問題があれば、メーリング リストで聞いてください。喜んで手伝います。

TOP
inserted by FC2 system