重要: Scalaに依存するMaven アーティファクトはScalaのメジャーバージョンが後ろに付きます。例えば、"2.10" あるいは "2.11"。プロジェクトwiki上のマイグレーションガイドに相談してください。

クラスタでの実行

Flink プログラムは多くのマシーンのクラスタ上に分散されて実行することができます。プログラムを実行のためにクラスタに送信するには二つの方法があります:

コマンドライン インタフェース

コマンドラインインタフェースはパッケージ化されたプログラム(JARs)をクラスタ(あるいは1つのマシーンセットアップ)にサブミットさせます。

詳細はコマンドラインインタフェース ドキュメントを参照してください。

遠隔環境

リモート環境はクラスタ上で直接Flink Javaプログラムを実行させます。リモート環境はプログラムを実行したいクラスタを指します。

Maven 依存

プログラムをMavenプロジェクトとして開発している場合、以下の依存を使ってflink-clients モジュールを追加する必要があります:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

以下はRemoteEnvironmentの使い方を説明します:

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment
        .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");

    DataSet<String> data = env.readTextFile("hdfs://path/to/file");

    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("hdfs://path/to/result");

    env.execute();
}

プログラムは独自のユーザコードを含んでいるため、アタッチされたコードのクラスと一緒にJARファイルを必要とすることに注意してください。リモート環境のコンストラクタはJARファイル(s)へのパス(s)を取ります。

バイナリ配布物に含まれていないモジュールとのリンク

バイナリ配布は、配布プログラムのクラスパスへ自動的に配布されるlibフォルダ内にjarパッケージを含みます。ほとんど全てのFlinkクラスは、例えばストリーミングコネクタといくつかの新しく追加されたモジュールの2,3の例外を除いてそこにあります。これらのモジュールに依存するコードを実行するには、実行時にそれらにアクセス可能にする必要があります。以下の二つのオプションはそれらを示唆します:

  1. jarファイルを全てのタスクマネージャー上のlib フォルダにコピーする。この後でタスクマネージャーを再起動する必要があることに注意してください。
  2. あるいは、それらをコードと一緒にパッケージ化する。

後者のバージョンはFlinkでのクラスローダーの管理を尊重するため、お勧めです。

Mavenを使ったユーザコードとの依存物のパッケージ

Flinkによって含まれないこれらの依存を提供するために、Mavenを使った二つのオプションを提案します。

  1. mavenアセンブリ プラグインは全ての依存を含むuber-jar (実行可能jar)と呼ばれるものをビルドします。アセンブリ設定は簡単ですが、結果のjarは嵩張ったものになるかも知れません。更に詳しい情報はmaven-assembly-pluginを見てください。
  2. maven unpack プラグインは、依存物の関係する部分を解凍し、それをあなたのコードと一緒にパッケージ化します。

Using the latter approach in order to bundle the Kafka connector, flink-connector-kafka you would need to add the classes from both the connector and the Kafka API itself. プラグインのセクションに以下を追加します:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-dependency-plugin</artifactId>
    <version>2.9</version>
    <executions>
        <execution>
            <id>unpack</id>
            <!-- executed just before the package phase -->
            <phase>prepare-package</phase>
            <goals>
                <goal>unpack</goal>
            </goals>
            <configuration>
                <artifactItems>
                    <!-- For Flink connector classes -->
                    <artifactItem>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-kafka</artifactId>
                        <version>1.1-SNAPSHOT</version>
                        <type>jar</type>
                        <overWrite>false</overWrite>
                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                        <includes>org/apache/flink/**</includes>
                    </artifactItem>
                    <!-- For Kafka API classes -->
                    <artifactItem>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
                        <version><YOUR_KAFKA_VERSION></version>
                        <type>jar</type>
                        <overWrite>false</overWrite>
                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                        <includes>kafka/**</includes>
                    </artifactItem>
                </artifactItems>
            </configuration>
        </execution>
    </executions>
</plugin>

これで、mvn clean package を実行する時に、生成されたjarは必要とする依存を含みます。

TOP
inserted by FC2 system