Flink データストリーム API プログラミング ガイド

Flinkでのデータストリームプログラムはデータストリーム上の変換を実装する一般的なプログラムです(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約)。データストリームは最初に様々なソースから生成されます (例えば、メッセージキュー、ソケットストリーム、ファイル)。結果はsinkを使って返されます。これは例えばファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。

Flink APIの基本的な概念の紹介については基本的な概念を見てください。

独自のFlinkデータストリームプログラムを作成するには、Flinkプログラムの内部構造から始め、次第に独自の ストリーム変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。

プログラムの例

以下のプログラムは、webソケットから5秒のウィンドウ内でやってくる単語を数える、ストリーミング ウィンドウ カウントアプリケーションの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

例のプログラムを実行するためには、最初にターミナルからnetcatを使って入力ストリームを開始します:

nc -lk 9999

単にいくつかの単語をタイプし、新しい単語のためにリターンを打ちます。これらはワードカウントプログラムへの入力になるでしょう。1以上のカウントを見たい場合は、同じ単語を5秒以内に繰り返しタイプします(そんなに速く打てない場合は、ウィンドウサイズを5秒から増やします☺)。

上に戻る

データソース


ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunctionを実装することで独自のソースを常に書くことができ、並行ソースについてはParallelSourceFunctionインタフェースを実装するかRichParallelSourceFunctionを拡張することで独自のソースを常に書くことができます。

StreamExecutionEnvironmentからアクセス可能な幾つかの事前定義されたストリームソースがあります:

ファイルベース:

  • readTextFile(path) - テキストファイル、つまりTextInputFormatの仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。

  • readFile(fileInputFormat, path) - 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormatに基づいてpath内のファイルを読み込みます。指定されたwatchTypeに依存して、このソースは定期的に新しいデータのパス(FileProcessingMode.PROCESS_CONTINUOUSLY)を監視(各 interval ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE)。pathFilterを使ってユーザは処理されるファイルを更に除外することができます。

    実装:

    背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわちdirectory monitoringdata reading に分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel (parallelism = 1) タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(watchTypeに応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらを 分割し、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。

    重要な注記:

    1. ファイルが修正された時に、もしwatchTypeFileProcessingMode.PROCESS_CONTINUOUSLYが設定されると、そのないような完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。

    2. もしwatchTypeFileProcessingMode.PROCESS_ONCE に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを 1回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。

ソケット ベース:

  • socketTextStream - ソケットから読み込みます。要素はデリミタで区切ることができます。

コレクション ベース:

  • fromCollection(Collection) - Java の Java.util.Collection からデータストリームを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator, Class) - イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(T ...) - オブジェクトの指定されたシーケンスからデータストリームを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator, Class) - イテレータから並行してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

独自:

  • addSource - 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer08<>(...))を使うことができます。詳細はコネクタ を見てください。


ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunctionを実装することで独自のソースを常に書くことができ、並行ソースについてはParallelSourceFunctionインタフェースを実装するかRichParallelSourceFunctionを拡張することで独自のソースを常に書くことができます。

StreamExecutionEnvironmentからアクセス可能な幾つかの事前定義されたストリームソースがあります:

ファイルベース:

  • readTextFile(path) - テキストファイル、つまりTextInputFormatの仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。

  • readFile(fileInputFormat, path) - 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - これは上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormatに基づいてpath内のファイルを読み込みます。指定されたwatchTypeに依存して、このソースは定期的に新しいデータのパス(FileProcessingMode.PROCESS_CONTINUOUSLY)を監視(各 interval ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE)。pathFilterを使ってユーザは処理されるファイルを更に除外することができます。

    実装:

    背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわちdirectory monitoringdata reading に分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel (parallelism = 1) タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(watchTypeに応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらを 分割し、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。

    重要な注記:

    1. ファイルが修正された時に、もしwatchTypeFileProcessingMode.PROCESS_CONTINUOUSLYが設定されると、そのないような完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。

    2. もしwatchTypeFileProcessingMode.PROCESS_ONCE に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを 1回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。

ソケット ベース:

  • socketTextStream - ソケットから読み込みます。要素はデリミタで区切ることができます。

コレクション ベース:

  • fromCollection(Seq) - Java の Java.util.Collection からデータストリームを生成します。コレクション内の全ての要素は同じ型でなければなりません。

  • fromCollection(Iterator) - イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • fromElements(elements: _*) - 指定されたオブジェクトの系列からデータストリームを生成します。全てのオブジェクトは同じ型でなければなりません。

  • fromParallelCollection(SplittableIterator) - イテレータから並行してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。

  • generateSequence(from, to) - 並行して渡された間隔内の数のシーケンスを生成します。

独自:

  • addSource - 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer08<>(...))を使うことができます。詳細はコネクタ を見てください。

上に戻る

データストリームの変換

利用可能なストリーム変換の概要については operators を見てください。

上に戻る

データのsink


データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システム あるいは出力 に転送します。Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。

  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。

  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。

  • writeUsingOutputFormat() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。

  • writeToSocket - 要素をSerializationSchemaに応じてソケットに書き込みます。

  • addSink - 独自のシンク関数を起動します。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられています。


データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システム あるいは出力 に転送します。Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。

  • writeAsText() / TextOutputFormat - 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。

  • writeAsCsv(...) / CsvOutputFormat - カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。

  • print() / printToErr() - 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。

  • writeUsingOutputFormat() / FileOutputFormat - 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。

  • writeToSocket - 要素をSerializationSchemaに応じてソケットに書き込みます。

  • addSink - 独自のシンク関数を起動します。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられています。

DataStream上の write*() メソッドは主にデバッグ目的を意図していることに注意してください。それらはFlinkのチェックポイントに加わらず、そのことはこれらの関数が通常少なくとも1回のセマンティクスを持つことを意味します。目的のシステムへのデータのフラッシュはOutputFormatの実装に依存します。このことはOutputFormatに送信される全ての要素が即座に目的のシステムに現れるわけでは無いことを意味します。また、障害時には、それらのレコードは喪失されるでしょう。

信頼性のためのファイルシステムへのストリームの確実に1回の配送には、flink-connector-filesystemを使ってください。また、.addSink(...) メソッドを使った独自の実装は確実に1回のセマンティクスについてFlinkのチェックポイントに参加するかもしれません。

上に戻る

イテレーション


繰り返しのストリーミングプログラムはステップ関数を実装し、それをIterativeStreamに埋め込みます。DataStream プログラムは終了しないかもしれないため、繰り返しの最大数はありません。その代わり、split 変換あるいは filter を使って、ストリームのどの部分が繰り返しにフィードバックされるか、そしてどの部分がダウンストリームに転送されるかを指定する必要があります。ここで、フィルターを使った例を示します。まず、IterativeStreamを定義します。

IterativeStream<Integer> iteration = input.iterate();

次に、変換(ここでは単純なmap 変換です)の連続を使ってループ内で実行されるロジックを指定します

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

繰り返しを閉じ、繰り返しの最後を定義するために、IterativeStreamcloseWith(feedbackStream)メソッドを呼びます。closeWith 関数に指定されたDataStreamは繰り返しの頭にフィードバックされるでしょう。一般的なパターンは、フィードバックされるストリームの一部と前方に広められるストリームの一部を分割するために、フィルターを使います。これらのフィルターは、例えば要素がフィードバックされずにダウンストリームに広まることができる “termination” ロジックを定義することができます。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

デフォルトでは、フィードバックストリームのパーティション化はイテレーションの頭の入力と同じように自動的に設定されるでしょう。これを上書きするために、ユーザはcloseWith内で任意のbooleanフラグを設定することができます。

例えば、これは整数の連続からそれらが0になるまで1を継続的に引くプログラムです:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});


繰り返しのストリーミングプログラムはステップ関数を実装し、それをIterativeStreamに埋め込みます。DataStream プログラムは終了しないかもしれないため、繰り返しの最大数はありません。その代わり、split 変換あるいは filter を使って、ストリームのどの部分が繰り返しにフィードバックされるか、そしてどの部分がダウンストリームに転送されるかを指定する必要があります。ここで、ボディ(繰り返される計算の一部分)が単純なマップ変換の繰り返しの例を示します。フィードバックされる要素はフィルタを使ってダウンロードストリームに転送される要素と識別されます。

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})

デフォルトでは、フィードバックストリームのパーティション化はイテレーションの頭の入力と同じように自動的に設定されるでしょう。これを上書きするために、ユーザはcloseWith内で任意のbooleanフラグを設定することができます。

例えば、これは整数の連続からそれらが0になるまで1を継続的に引くプログラムです:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)

上に戻る

実行パラメータ

StreamExecutionEnvironment はランタイムのためのジョブに固有の設定値を設定することができるExecutionConfigを含みます。

ほとんどのパラメータの説明については実行の設定 を参照してください。これらのパラメータはDataSteam APIに明確に関係があります:

  • enableTimestamps() / disableTimestamps(): タイムスタンプをソースから発行される各イベントに付けます。areTimestampsEnabled() 現在の値を返します。

  • setAutoWatermarkInterval(long milliseconds): 自動的なウォーターマークの発行の間隔を設定します。long getAutoWatermarkInterval()を使って現在の値を取得することができます。

上に戻る

耐障害性

状態 & チェックポイント はFlinkのチェックポイントの仕組みを有効および設定する方法を説明します。

レンテンシの制御

デフォルトでは要素はネットワーク上で1つ1つでは転送されず(不必要なネットワークトラフィックを起こすでしょう)、バッファされます。バッファのサイズ(これはマシーン間での実質的な転送です)はFlink設定ファイルの中で設定することができます。スループットの最適化にこのコマンドは良いですが、入ってくるストリームが十分に速く無い場合はレイテンシの問題を起こすかもしれません。スループットとレイテンシを制御するために、バッファが埋まるまでの最大時間を設定するために、実行環境(あるいは個々のオペレータ)上でenv.setBufferTimeout(timeoutMillis)を使うことができます。この時間の後で、たとえバッファが埋まっていなくても自動的に送信されます。このタイムアウトのデフォルトの値は100msです。

使い方:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

スループットを最大化するには、タイムアウトを削除し、バッファが埋まった時にのみフラッシュするsetBufferTimeout(-1)を設定します。レイテンシを最小化するには、タイムアウトを0に近い値に設定します (例えば 5 あるいは 10ms)。0のバッファタイムアウトは深刻なパフォーマンスの劣化を起こすかもしれないため、避けるべきです。

上に戻る

デバッギング

分散クラスタでストリーミングプログラムを実行する前に、実装されたアルゴリズムが期待したように動作するかを確認することは良い考えです。従って、データ解析プログラムの実装は通常は結果の調査、デバッグ、そして改善です。

Flink はIDE内からのローカルデバッグのサポート、テストデータの挿入、および結果データの収集によって、データ解析プログラムの開発処理を飛躍的に簡単にする機能を提供します。この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。

ローカルの実行環境

LocalStreamEnvironment は生成されたものと同じJVMプロセス内でFlinkシステムを開始します。IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。

LocalEnvironment は以下のように生成され利用されます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */)
// build your program

env.execute()

コレクション データ ソース

Flink はテストを簡単にするJavaコレクションで支援される特別なデータソースを提供します。プログラムがいったんテストされると、ソースとシンクは外部システムからの読み込み/への書き込みによって簡単に置き換えることができます。

データソースのコレクションは以下のように使うことができます:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 現在のところ、データソースのコレクションはSerializableを実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行 ( 並行度 = 1)で実行することができません。

イテレータ データ シンク

Flinkはテストおよびデバッグ目的のためのデータストリームを収集するためのシンクも提供します。以下のように使うことができます:

import org.apache.flink.contrib.streaming.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

上に戻る

次はどこに行きますか?

上に戻る

TOP
inserted by FC2 system