Flinkでのデータストリームプログラムはデータストリーム上の変換を実装する一般的なプログラムです(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約)。データストリームは最初に様々なソースから生成されます (例えば、メッセージキュー、ソケットストリーム、ファイル)。結果はsinkを使って返されます。これは例えばファイルあるいは標準出力(例えばコマンドラインの端末)へデータを書き込むかも知れません。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。
Flink APIの基本的な概念の紹介については基本的な概念を見てください。
独自のFlinkデータストリームプログラムを作成するには、Flinkプログラムの内部構造から始め、次第に独自の ストリーム変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと上級の特徴についてのリファレンスとして振る舞います。
以下のプログラムは、webソケットから5秒のウィンドウ内でやってくる単語を数える、ストリーミング ウィンドウ カウントアプリケーションの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
例のプログラムを実行するためには、最初にターミナルからnetcatを使って入力ストリームを開始します:
nc -lk 9999
単にいくつかの単語をタイプし、新しい単語のためにリターンを打ちます。これらはワードカウントプログラムへの入力になるでしょう。1以上のカウントを見たい場合は、同じ単語を5秒以内に繰り返しタイプします(そんなに速く打てない場合は、ウィンドウサイズを5秒から増やします☺)。
ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)
を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunction
を実装することで独自のソースを常に書くことができ、並行ソースについてはParallelSourceFunction
インタフェースを実装するかRichParallelSourceFunction
を拡張することで独自のソースを常に書くことができます。
StreamExecutionEnvironment
からアクセス可能な幾つかの事前定義されたストリームソースがあります:
ファイルベース:
readTextFile(path)
- テキストファイル、つまりTextInputFormat
の仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。
readFile(fileInputFormat, path)
- 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormat
に基づいてpath
内のファイルを読み込みます。指定されたwatchType
に依存して、このソースは定期的に新しいデータのパス(FileProcessingMode.PROCESS_CONTINUOUSLY
)を監視(各 interval
ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE
)。pathFilter
を使ってユーザは処理されるファイルを更に除外することができます。
実装:
背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわちdirectory monitoring と data reading に分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel (parallelism = 1) タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(watchType
に応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらを 分割し、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。
重要な注記:
ファイルが修正された時に、もしwatchType
が FileProcessingMode.PROCESS_CONTINUOUSLY
が設定されると、そのないような完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。
もしwatchType
が FileProcessingMode.PROCESS_ONCE
に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを 1回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。
ソケット ベース:
socketTextStream
- ソケットから読み込みます。要素はデリミタで区切ることができます。コレクション ベース:
fromCollection(Collection)
- Java の Java.util.Collection からデータストリームを生成します。コレクション内の全ての要素は同じ型でなければなりません。
fromCollection(Iterator, Class)
- イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。
fromElements(T ...)
- オブジェクトの指定されたシーケンスからデータストリームを生成します。全てのオブジェクトは同じ型でなければなりません。
fromParallelCollection(SplittableIterator, Class)
- イテレータから並行してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。
generateSequence(from, to)
- 並行して渡された間隔内の数のシーケンスを生成します。
独自:
addSource
- 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer08<>(...))
を使うことができます。詳細はコネクタ を見てください。ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)
を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunction
を実装することで独自のソースを常に書くことができ、並行ソースについてはParallelSourceFunction
インタフェースを実装するかRichParallelSourceFunction
を拡張することで独自のソースを常に書くことができます。
StreamExecutionEnvironment
からアクセス可能な幾つかの事前定義されたストリームソースがあります:
ファイルベース:
readTextFile(path)
- テキストファイル、つまりTextInputFormat
の仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。
readFile(fileInputFormat, path)
- 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。
readFile(fileInputFormat, path, watchType, interval, pathFilter)
- これは上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormat
に基づいてpath
内のファイルを読み込みます。指定されたwatchType
に依存して、このソースは定期的に新しいデータのパス(FileProcessingMode.PROCESS_CONTINUOUSLY
)を監視(各 interval
ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE
)。pathFilter
を使ってユーザは処理されるファイルを更に除外することができます。
実装:
背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわちdirectory monitoring と data reading に分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel (parallelism = 1) タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(watchType
に応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらを 分割し、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。
重要な注記:
ファイルが修正された時に、もしwatchType
が FileProcessingMode.PROCESS_CONTINUOUSLY
が設定されると、そのないような完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。
もしwatchType
が FileProcessingMode.PROCESS_ONCE
に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを 1回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。
ソケット ベース:
socketTextStream
- ソケットから読み込みます。要素はデリミタで区切ることができます。コレクション ベース:
fromCollection(Seq)
- Java の Java.util.Collection からデータストリームを生成します。コレクション内の全ての要素は同じ型でなければなりません。
fromCollection(Iterator)
- イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。
fromElements(elements: _*)
- 指定されたオブジェクトの系列からデータストリームを生成します。全てのオブジェクトは同じ型でなければなりません。
fromParallelCollection(SplittableIterator)
- イテレータから並行してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。
generateSequence(from, to)
- 並行して渡された間隔内の数のシーケンスを生成します。
独自:
addSource
- 新しいソース関数を付け加えます。例えば、Apache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer08<>(...))
を使うことができます。詳細はコネクタ を見てください。利用可能なストリーム変換の概要については operators を見てください。
データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システム あるいは出力 に転送します。Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。
writeAsText()
/ TextOutputFormat
- 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
writeAsCsv(...)
/ CsvOutputFormat
- カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
print()
/ printToErr()
- 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。
writeUsingOutputFormat()
/ FileOutputFormat
- 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
writeToSocket
- 要素をSerializationSchema
に応じてソケットに書き込みます。
addSink
- 独自のシンク関数を起動します。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられています。
データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システム あるいは出力 に転送します。Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。
writeAsText()
/ TextOutputFormat
- 行ごとの要素を文字列として書き込む。文字列はそれぞれの要素のtoString()メソッドを呼ぶことで取得されます。
writeAsCsv(...)
/ CsvOutputFormat
- カンマ区切りの値のファイルとしてタプルを書きます。行とフィールドのデリミタが設定可能です。各フィールドの値はオブジェクトの toString()メソッドでもたらされます。
print()
/ printToErr()
- 標準出力/標準エラー ストリーム上の各要素の toString() 値を出力します。任意で、出力に事前出力されるプリフィックス(msg)を与えることができます。これはprintの異なる呼び出し間で区別するのに役立ちます。並行度が1より大きい場合、出力を生成したタスクの識別子が出力に事前出力されるでしょう。
writeUsingOutputFormat()
/ FileOutputFormat
- 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。
writeToSocket
- 要素をSerializationSchema
に応じてソケットに書き込みます。
addSink
- 独自のシンク関数を起動します。Flinkはシンク関数として実装された(Apache Kafkaのような)他のシステムへのコネクタと一緒にまとめられています。
DataStream
上の write*()
メソッドは主にデバッグ目的を意図していることに注意してください。それらはFlinkのチェックポイントに加わらず、そのことはこれらの関数が通常少なくとも1回のセマンティクスを持つことを意味します。目的のシステムへのデータのフラッシュはOutputFormatの実装に依存します。このことはOutputFormatに送信される全ての要素が即座に目的のシステムに現れるわけでは無いことを意味します。また、障害時には、それらのレコードは喪失されるでしょう。
信頼性のためのファイルシステムへのストリームの確実に1回の配送には、flink-connector-filesystem
を使ってください。また、.addSink(...)
メソッドを使った独自の実装は確実に1回のセマンティクスについてFlinkのチェックポイントに参加するかもしれません。
繰り返しのストリーミングプログラムはステップ関数を実装し、それをIterativeStream
に埋め込みます。DataStream プログラムは終了しないかもしれないため、繰り返しの最大数はありません。その代わり、split
変換あるいは filter
を使って、ストリームのどの部分が繰り返しにフィードバックされるか、そしてどの部分がダウンストリームに転送されるかを指定する必要があります。ここで、フィルターを使った例を示します。まず、IterativeStream
を定義します。
IterativeStream<Integer> iteration = input.iterate();
次に、変換(ここでは単純なmap
変換です)の連続を使ってループ内で実行されるロジックを指定します
DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);
繰り返しを閉じ、繰り返しの最後を定義するために、IterativeStream
のcloseWith(feedbackStream)
メソッドを呼びます。closeWith
関数に指定されたDataStreamは繰り返しの頭にフィードバックされるでしょう。一般的なパターンは、フィードバックされるストリームの一部と前方に広められるストリームの一部を分割するために、フィルターを使います。これらのフィルターは、例えば要素がフィードバックされずにダウンストリームに広まることができる “termination” ロジックを定義することができます。
iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);
デフォルトでは、フィードバックストリームのパーティション化はイテレーションの頭の入力と同じように自動的に設定されるでしょう。これを上書きするために、ユーザはcloseWith
内で任意のbooleanフラグを設定することができます。
例えば、これは整数の連続からそれらが0になるまで1を継続的に引くプログラムです:
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
繰り返しのストリーミングプログラムはステップ関数を実装し、それをIterativeStream
に埋め込みます。DataStream プログラムは終了しないかもしれないため、繰り返しの最大数はありません。その代わり、split
変換あるいは filter
を使って、ストリームのどの部分が繰り返しにフィードバックされるか、そしてどの部分がダウンストリームに転送されるかを指定する必要があります。ここで、ボディ(繰り返される計算の一部分)が単純なマップ変換の繰り返しの例を示します。フィードバックされる要素はフィルタを使ってダウンロードストリームに転送される要素と識別されます。
val iteratedStream = someDataStream.iterate(
iteration => {
val iterationBody = iteration.map(/* this is executed many times */)
(tail.filter(/* one part of the stream */), tail.filter(/* some other part of the stream */))
})
デフォルトでは、フィードバックストリームのパーティション化はイテレーションの頭の入力と同じように自動的に設定されるでしょう。これを上書きするために、ユーザはcloseWith
内で任意のbooleanフラグを設定することができます。
例えば、これは整数の連続からそれらが0になるまで1を継続的に引くプログラムです:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)
val iteratedStream = someIntegers.iterate(
iteration => {
val minusOne = iteration.map( v => v - 1)
val stillGreaterThanZero = minusOne.filter (_ > 0)
val lessThanZero = minusOne.filter(_ <= 0)
(stillGreaterThanZero, lessThanZero)
}
)
StreamExecutionEnvironment
はランタイムのためのジョブに固有の設定値を設定することができるExecutionConfig
を含みます。
ほとんどのパラメータの説明については実行の設定 を参照してください。これらのパラメータはDataSteam APIに明確に関係があります:
enableTimestamps()
/ disableTimestamps()
: タイムスタンプをソースから発行される各イベントに付けます。areTimestampsEnabled()
現在の値を返します。
setAutoWatermarkInterval(long milliseconds)
: 自動的なウォーターマークの発行の間隔を設定します。long getAutoWatermarkInterval()
を使って現在の値を取得することができます。
状態 & チェックポイント はFlinkのチェックポイントの仕組みを有効および設定する方法を説明します。
デフォルトでは要素はネットワーク上で1つ1つでは転送されず(不必要なネットワークトラフィックを起こすでしょう)、バッファされます。バッファのサイズ(これはマシーン間での実質的な転送です)はFlink設定ファイルの中で設定することができます。スループットの最適化にこのコマンドは良いですが、入ってくるストリームが十分に速く無い場合はレイテンシの問題を起こすかもしれません。スループットとレイテンシを制御するために、バッファが埋まるまでの最大時間を設定するために、実行環境(あるいは個々のオペレータ)上でenv.setBufferTimeout(timeoutMillis)
を使うことができます。この時間の後で、たとえバッファが埋まっていなくても自動的に送信されます。このタイムアウトのデフォルトの値は100msです。
使い方:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.genereateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
スループットを最大化するには、タイムアウトを削除し、バッファが埋まった時にのみフラッシュするsetBufferTimeout(-1)
を設定します。レイテンシを最小化するには、タイムアウトを0に近い値に設定します (例えば 5 あるいは 10ms)。0のバッファタイムアウトは深刻なパフォーマンスの劣化を起こすかもしれないため、避けるべきです。
分散クラスタでストリーミングプログラムを実行する前に、実装されたアルゴリズムが期待したように動作するかを確認することは良い考えです。従って、データ解析プログラムの実装は通常は結果の調査、デバッグ、そして改善です。
Flink はIDE内からのローカルデバッグのサポート、テストデータの挿入、および結果データの収集によって、データ解析プログラムの開発処理を飛躍的に簡単にする機能を提供します。この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。
LocalStreamEnvironment
は生成されたものと同じJVMプロセス内でFlinkシステムを開始します。IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。
LocalEnvironment は以下のように生成され利用されます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
Flink はテストを簡単にするJavaコレクションで支援される特別なデータソースを提供します。プログラムがいったんテストされると、ソースとシンクは外部システムからの読み込み/への書き込みによって簡単に置き換えることができます。
データソースのコレクションは以下のように使うことができます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意: 現在のところ、データソースのコレクションはSerializable
を実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行 ( 並行度 = 1)で実行することができません。
Flinkはテストおよびデバッグ目的のためのデータストリームを収集するためのシンクも提供します。以下のように使うことができます:
import org.apache.flink.contrib.streaming.DataStreamUtils
DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
import org.apache.flink.contrib.streaming.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala