This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Flink データストリーム API プログラミング ガイド #
FlinkのDataStreamプログラムは、データストリームの変換(例えば、フィルタリング、状態の更新、ウィンドウの定義、集約など)を実装する通常のプログラムです。データストリームは、最初に様々なソース(例えば、メッセージキュー、ソケットストリーム、ファイルなど)から作成されます。結果はシンクから返されます。シンクは、データをファイルや標準出力(コマンドラインターミナルなど)に書き込むことができます。Flink のプログラムは様々なコンテキスト、スタンドアローン、あるいは他のプログラムの組み込みの中で動作します。 実行は、ローカルのJVM、あるいは多くのマシーンのクラスタ上で起こり得ます。
独自のFlinkデータストリームプログラムを作成するには、Flinkプラグラムの内部構造から始めて、徐々に独自のストリーム変換を追加することをお勧めします。残りのセクションは、追加のオペレーションと高度な特徴についてのリファレンスとして振る舞います。
データストリームとは何か? #
データストリームAPIは、Flinkプログラムのデータのコレクションを表すために使われる特別なDataStream
クラスから名前を取得します。これらは、重複を含むデータの不変のコレクションと考えることができます。データは有限または無制限のいずれかであり、データを操作するために使うAPIは同じです。
DataStream
は使用方法の点では通常のJavaCollection
に似ていますが、いくつかの重要な点で大きく異なります。これらは不変です。つまり、作成後に要素を追加または削除することはできません。また、単に内部の要素を検査するだけでなく、DataStream
APIオペレーション、変換とも呼ばれます、を使って要素を操作することもできます。
Flinkプログラムにソースを追加することで、初期のDataStream
を作成することもできます。
次に、そこから新しいストリームを派生し、map
、filter
などのAPIメソッドを使ってそれらを結合できます。
Flinkプログラムの構造 #
FlinkプログラムはDataStreams
を変換する通常のプログラムのように見えます。各プログラムは同じ基本部分で構成されます:
実行環境
を取得、- 初期データのロード/生成、
- このデータの変形を指定、
- 計算結果を出力する場所を指定、
- プログラムの実行を引き起こします。
全てのFlink Scala APIは非推奨となり、将来のFlinkバージョンでは削除される予定です。引き続きScalaでアプリケーションをビルドできますが、DataStream APIやTable APIのJavaバージョンへ移行する必要があります。
これらの各ステップの概要を説明します。詳細はそれぞれのセクションを参照してください。Java DataStream APIの全てのコアクラスは org.apache.flink.streaming.api にあることに注意してください。
StreamExecutionEnvironment
は全てのFlinkプログラムの基礎です。StreamExecutionEnvironment
の静的メソッドを使って取得できます:
getExecutionEnvironment();
createLocalEnvironment();
createRemoteEnvironment(String host, int port, String... jarFiles);
通常、getExecutionEnvironment()
を使うだけで済みます。これは、コンテキストに応じて適切な動作を行うためです: プログラムをIDE内で実行している場合や通常のJavaプログラムとして実行している場合は、ローカルマシン上でプログラムを実行するローカル環境が作成されます。プログラムからJARファイルを作成し、コマンドラインを通じて呼び出すと、Flinkクラスタマネージャーはmainメソッドを実行し、getExecutionEnvironment()
はクラスタ上でプログラムを実行するための実行環境を返します。
データソースを指定するために、実行環境には様々な方法でファイルから読み取る方法がいくつかあります: データを1行ずつ読み取ることも、CSVVファイルとして読み取ることも、その他の提供されたソースを使うこともできます。テキストファイルから単に連続する行として読み取るには、次のものが使えます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
これにより、変換を適用して新しい派生DataStreamを作成できるDataStreamが得られます。
変換関数を使ってDataStream上でメソッドを呼び出すことで、変換を適用できます。例えば、map変換はこのようになります:
DataStream<String> input = ...;
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
これは、元のコレクション内の全ての文字列が整数に変換されて、新しいDataStreamを作成します。
最終結果を含むDataStreamを取得したら、シンクを作成して外部システムにそれを書き込むことができます。以下はシンクを作成するためのメソッドのほんの一部の例です:
writeAsText(String path);
print();
これらの各ステップの概要を説明します。詳細はそれぞれのセクションを参照してください。Scala DataStream APIの全てのコアクラスは、 org.apache.flink.streaming.api.scala にあります。
StreamExecutionEnvironment
は全てのFlinkプログラムの基礎です。StreamExecutionEnvironment
の静的メソッドを使って取得できます:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常、getExecutionEnvironment()
を使うだけで済みます。これは、コンテキストに応じて適切な動作を行うためです: プログラムをIDE内で実行している場合や通常のJavaプログラムとして実行している場合は、ローカルマシン上でプログラムを実行するローカル環境が作成されます。プログラムからJARファイルを作成し、コマンドラインを通じて呼び出すと、Flinkクラスタマネージャーはmainメソッドを実行し、getExecutionEnvironment()
はクラスタ上でプログラムを実行するための実行環境を返します。
データソースを指定するために、実行環境には様々な方法でファイルから読み取る方法がいくつかあります: データを1行ずつ読み取ることも、CSVVファイルとして読み取ることも、その他の提供されたソースを使うこともできます。テキストファイルから単に連続する行として読み取るには、次のものが使えます:
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text: DataStream[String] = env.readTextFile("file:///path/to/file")
変換を適用して新しい派生DataStreamを作成できるDataStreamが得られます。
変換関数を使ってDataStream上でメソッドを呼び出すことで、変換を適用できます。例えば、map変換はこのようになります:
val input: DataSet[String] = ...
val mapped = input.map { x => x.toInt }
これは、元のコレクション内の全ての文字列が整数に変換されて、新しいDataStreamを作成します。
最終結果を含むDataStreamを取得したら、シンクを作成して外部システムにそれを書き込むことができます。以下はシンクを作成するためのメソッドのほんの一部の例です:
writeAsText(path: String)
print()
完全なプログラムを指定すると、StreamExecutionEnvironment
でexecute()
を呼び出して、プログラムの実行をトリガーする必要があります。
ExecutionEnvironment
のタイプに応じて、実行はローカルマシーン上でトリガーされるか、クラスタ上で実行するためにプログラムを送信します。
execute()
メソッドはジョブが終了するのを待ってから、JobExecutionResult
を返します。これには実行時間とアキュムレータの結果が含まれます。
ジョブの終了を待ちたくない場合は、StreamExecutionEnvironment
でexecuteAsync()
を呼び出すことで、非同期ジョブ実行をトリガーできます。
送信したジョブと通信できるJobClient
を返します。例えば、これはexecuteAsync()
を使ってexecute()
のセマンティクスを実装する方法です。
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
プログラムの実行に関数する最後の部分は、Flink操作がいつどうやって実行されるかを理解するために重要です。全てのFlinkプログラムは遅延実行されます: プログラムのmainメソッドが実行される時に、データのロードと変換は直接行われません。むしろ、各オペレーションが作成され、データフローグラフに追加されます。オペレーションが実際に実行されるのは、実行環境でのexecute()
呼び出しによって明示的に実行がトリガーされた時です。プログラムがローカルで実行されるか、クラスタ上で実行されるかは、実行環境のタイプによって異なります。
遅延実行により、Flinkが総合的に計画された1つの単位として実行する高度なプログラムを構築できます。
プログラムの例 #
以下のプログラムは、webソケットから5秒のウィンドウ内でやってくる単語を数える、ストリーミング ウィンドウ カウントアプリケーションの完全に動作する例です。ローカルでそれを動作するためにコードをコピー&ペーストすることができます。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}
例のプログラムを実行するためには、最初にターミナルからnetcatを使って入力ストリームを開始します:
nc -lk 9999
単にいくつかの単語をタイプし、新しい単語のためにリターンを打ちます。これらはワードカウントプログラムへの入力です。1以上のカウントを見たい場合は、同じ単語を5秒以内に繰り返しタイプします(そんなに速く打てない場合は、ウィンドウサイズを5秒から増やします☺)。
データソース #
ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)
を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunction
を実装することで独自のソースを常に書くことができ、並列ソースについてはParallelSourceFunction
インタフェースを実装するかRichParallelSourceFunction
を拡張することで独自のソースを常に書くことができます。
StreamExecutionEnvironment
からアクセス可能な幾つかの事前定義されたストリームソースがあります:
ファイルベース:
-
readTextFile(path)
- テキストファイル、つまりTextInputFormat
の仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。 -
readFile(fileInputFormat, path)
- 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormat
に基づいてpath
内のファイルを読み込みます。指定されたwatchType
に依存して、このソースは定期的に新しいデータのパス((FileProcessingMode.PROCESS_CONTINUOUSLY
)を監視(各interval
ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE
)。pathFilter
を使ってユーザは処理されるファイルを更に除外することができます。IMPLEMENTATION:
背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわち directory monitoringと data readingに分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel(parallelism = 1)タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(
watchType
に応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらをsplitsし、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。重要な注記:
-
ファイルが修正された時に、もし
watchType
がFileProcessingMode.PROCESS_CONTINUOUSLY
に設定されると、その内容は完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。 -
もし
watchType
がFileProcessingMode.PROCESS_ONCE
に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを一回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。
-
ソケット ベース:
socketTextStream
- ソケットから読み込みます。要素はデリミタで区切ることができます。
コレクション ベース:
-
fromCollection(Collection)
- JavaのJava.util.Collectionからデータストリームを生成します。コレクション内の全ての要素は同じタイプでなければなりません。 -
fromCollection(Iterator, Class)
- イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータタイプを指定します。 -
fromElements(T ...)
- オブジェクトの指定されたシーケンスからデータストリームを生成します。全てのオブジェクトは同じタイプでなければなりません。 -
fromParallelCollection(SplittableIterator, Class)
- イテレータから並列してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。 -
generateSequence(from, to)
- 並列して渡された間隔内の数値のシーケンスを生成します。
独自:
addSource
- 新しいソース関数をアタッチします。例えば、Aapache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer<>(...))
を使えます。詳細はconnectorsを参照してください。
ソースはプログラムが入力を取り込むところです。StreamExecutionEnvironment.addSource(sourceFunction)
を使ってソースをプログラムに付け加えることができます。Flinkは多数の事前実装されたソース関数が同梱されますが、非並列ソースについてはSourceFunction
を実装することで独自のソースを常に書くことができ、並列ソースについてはParallelSourceFunction
インタフェースを実装するかRichParallelSourceFunction
を拡張することで独自のソースを常に書くことができます。
StreamExecutionEnvironment
からアクセス可能な幾つかの事前定義されたストリームソースがあります:
ファイルベース:
-
readTextFile(path)
- テキストファイル、つまりTextInputFormat
の仕様を考慮するファイルを行ごとに読み込み、それらを文字列として返します。 -
readFile(fileInputFormat, path)
- 指定されたファイル入力フォーマットで指示されたファイルとして(1度だけ)読み込みます。 -
readFile(fileInputFormat, path, watchType, interval, pathFilter)
- これは上の2つから内部的に呼ばれるメソッドです。指定されたfileInputFormat
に基づいてpath
内のファイルを読み込みます。指定されたwatchType
に依存して、このソースは定期的に新しいデータのパス((FileProcessingMode.PROCESS_CONTINUOUSLY
)を監視(各interval
ms) するか、現在パス内にあるデータを1度だけ処理して終了します(FileProcessingMode.PROCESS_ONCE
)。pathFilter
を使ってユーザは処理されるファイルを更に除外することができます。IMPLEMENTATION:
背後では、Flinkはファイル読み込みプロセスを2つのサブタスク、すなわち directory monitoringと data readingに分割します。それぞれのサブタスクは個々のエンティティによって実装されます。監視は1つのnon-parallel(parallelism = 1)タスクによって実装されますが、読み込みは並行して実行する複数のタスクによって実行されます。後者の並行度はジョブの並行度に等しいです。1つの監視タスクの役割はディレクトリを走査(
watchType
に応じて定期的あるいは1度だけ)し、処理するファイルを見つけ、それらをsplitsし、それらの分割をダウンストリームのリーダーに割り当てます。リーダーは実際のデータを読み込むためのものです。各分割は1つのリーダーによって読み込まれますが、リーダーは1つずつ複数の分割を読むことができます。重要な注記:
-
ファイルが修正された時に、もし
watchType
がFileProcessingMode.PROCESS_CONTINUOUSLY
に設定されると、その内容は完全に再処理されます。ファイルの最後に追加されるデータは再処理される全ての内容となるため、これは“確実に1回” セマンティクスを破壊します。 -
もし
watchType
がFileProcessingMode.PROCESS_ONCE
に設定された場合、リーダーがファイルの内容の読み込みを完了するまで待たずに、ソースはパスを一回だけ走査し終了します。もちろん、リーダーは全てのファイルの内容が読み込まれるまで読み込みを続けます。ソースのクローズはその位置から後にチェックポイントが無いことに繋がります。ジョブは最後のチェックポイントから読み込みを再開するため、これはノードの障害の後での遅い回復に繋がるかもしれません。
-
ソケット ベース:
socketTextStream
- ソケットから読み込みます。要素はデリミタで区切ることができます。
コレクション ベース:
-
fromCollection(Seq)
- JavaのJava.util.Collectionからデータストリームを生成します。コレクション内の全ての要素は同じタイプでなければなりません。 -
fromCollection(Iterator)
- イテレータからデータストリームを生成します。クラスはイテレータによって返される要素のデータタイプを指定します。 -
fromElements(elements: _*)
- 指定されたオブジェクトの系列からデータストリームを生成します。全てのオブジェクトは同じタイプでなければなりません。 -
fromParallelCollection(SplittableIterator)
- イテレータから並行してデータストリームを生成します。クラスはイテレータによって返される要素のデータ型を指定します。 -
generateSequence(from, to)
- 並列して渡された間隔内の数値のシーケンスを生成します。
独自:
addSource
- 新しいソース関数をアタッチします。例えば、Aapache Kafkaから読み込むために、addSource(new FlinkKafkaConsumer<>(...))
を使えます。詳細はconnectorsを参照してください。
データストリームの変換 #
利用可能なストリーム変換の概要については、operatorsを参照してください。
データのsink #
データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システムあるいは出力に転送します。 Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。
-
writeAsText()
/TextOutputFormat
- 行ごとの要素を文字列として書き込みます。文字列はそれぞれの要素の*toString()*メソッドを呼ぶことで取得されます。 -
writeAsCsv(...)
/CsvOutputFormat
- タプルをカンマ区切りの値のファイルとして書き込みます。行とフィールドのデリミタは設定可能です。各フィールドの値はオブジェクトの*toString()*メソッドから取得されます。 -
print()
/printToErr()
- 各要素のtoString()値を標準出力/標準エラーストリームに出力します。オプションで、出力の先頭に付加されるプリフィックス(msg)を指定できます。これはprintの様々な呼び出しを区別するのに役立ちます。並列度が1より大きい場合、出力を生成したタスクの識別子が出力の前に追加されます。 -
writeUsingOutputFormat()
/FileOutputFormat
- 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。 -
writeToSocket
-SerializationSchema
に従ってソケットに要素を書き込みます -
addSink
- 独自のシンク関数を呼び出します。Flinkにはシンク関数として実装される(Apache Kafkaのような)他のシステムへのコネクタがバンドルされています。
データシンクはDataStreamsを消費しそれらをファイル、ソケット、外部システムあるいは出力に転送します。 Flinkはデータストリーム上の操作の背後にカプセル化されている様々な組み込みの出力フォーマットが付属しています。
-
writeAsText()
/TextOutputFormat
- 行ごとの要素を文字列として書き込みます。文字列はそれぞれの要素の*toString()*メソッドを呼ぶことで取得されます。 -
writeAsCsv(...)
/CsvOutputFormat
- タプルをカンマ区切りの値のファイルとして書き込みます。行とフィールドのデリミタは設定可能です。各フィールドの値はオブジェクトの*toString()*メソッドから取得されます。 -
print()
/printToErr()
- 各要素のtoString()値を標準出力/標準エラーストリームに出力します。オプションで、出力の先頭に付加されるプリフィックス(msg)を指定できます。これはprintの様々な呼び出しを区別するのに役立ちます。並列度が1より大きい場合、出力を生成したタスクの識別子が出力の前に追加されます。 -
writeUsingOutputFormat()
/FileOutputFormat
- 独自のファイル出力のメソッドと基本クラス。独自のオブジェクトからバイトへの変換をサポートします。 -
writeToSocket
-SerializationSchema
に従ってソケットに要素を書き込みます -
addSink
- 独自のシンク関数を呼び出します。Flinkにはシンク関数として実装される(Apache Kafkaのような)他のシステムへのコネクタがバンドルされています。
DataStream
のwrite*()
メソッドは主にデバッグ目的であることに注意してください。
それらはFlinkのチェックポイントに加わらず、そのことはこれらの関数が通常少なくとも1回のセマンティクスを持つことを意味します。目的のシステムへのデータのフラッシュはOutputFormatの実装に依存します。このことはOutputFormatに送信される全ての要素が即座に目的のシステムに現れるわけでは無いことを意味します。また、障害時には、それらのレコードは喪失されるでしょう。
信頼性のために、ファイルシステムへのストリームの確実に1回の配送には、 FileSink
を使ってください。
また、.addSink(...)
メソッドを使った独自の実装は確実に1回のセマンティクスのためのFlinkのチェックポイントに参加できます。
実行パラメータ #
StreamExecutionEnvironment<x2/はランタイムのためのジョブに固有の設定値を設定することができる
ExecutionConfig`を含みます。
ほとんどのパラメータの説明については実行の設定を参照してください。これらのパラメータはDataSteam APIに明確に関係があります:
setAutoWatermarkInterval(long milliseconds)
: 自動的なウォーターマークの発行の間隔を設定します。現在の値はlong getAutoWatermarkInterval()
で取得できます
耐障害性 #
状態 & チェックポイントでは、Flinkのチェックポイントの仕組みを有効にして設定する方法について説明します。
レンテンシの制御 #
デフォルトでは、要素はネットワーク上で 1 つずつ転送されず (不要なネットワーク トラフィックが発生する可能性があります)、バッファリングされます。バッファのサイズ(これはマシーン間での実質的な転送です)はFlink設定ファイルの中で設定することができます。
スループットの最適化にこのコマンドは良いですが、入ってくるストリームが十分に速く無い場合はレイテンシの問題を起こすかもしれません。
スループットとレイテンシーを制御するには、実行環境 (または個々のオペレーター)でenv.setBufferTimeout(timeoutMillis)
を使用して、バッファーがいっぱいになるまでの最大待機時間を設定できます。この時間が経過すると、バッファはいっぱいでなくても自動的に送信されます。このタイムアウトのデフォルトの値は100msです。
使い方:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
スループットを最大化するには、タイムアウトを削除し、バッファが埋まった時にのみフラッシュするsetBufferTimeout(-1)
を設定します。レイテンシを最小化するには、タイムアウトを0に近い値に設定します (例えば 5 あるいは 10ms)。
0のバッファタイムアウトは深刻なパフォーマンスの劣化を起こすかもしれないため、避けるべきです。
デバッギング #
分散クラスタでストリーミングプログラムを実行する前に、実装されたアルゴリズムが期待したように動作するかを確認することは良い考えです。従って、データ解析プログラムの実装は、通常、結果の確認、デバッグ、改善の段階的なプロセスになります。
Flink はIDE内からのローカルデバッグのサポート、テストデータの挿入、および結果データの収集によって、データ解析プログラムの開発処理を飛躍的に簡単にする機能を提供します。この章ではFlinkプログラムの開発をどうやって簡単にするかのいくつかのヒントを与えます。
ローカルの実行環境 #
LocalStreamEnvironment
は生成されたものと同じJVMプロセス内でFlinkシステムを開始します。IDEから LocalEnvironment を開始する場合、コード内にブレークポイントを設定し、簡単にプログラムをデバッグすることができます。
LocalEnvironment は以下のように生成され利用されます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStream<String> lines = env.addSource(/* some source */);
// build your program
env.execute();
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()
コレクション データ ソース #
Flink はテストを簡単にするJavaコレクションで支援される特別なデータソースを提供します。プログラムがいったんテストされると、ソースとシンクは外部システムからの読み込み/への書き込みによって簡単に置き換えることができます。
データソースのコレクションは以下のように使うことができます:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
// Create a DataStream from an Iterator
Iterator<Long> longIt = ...;
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)
注意: 現在のところ、データソースのコレクションはSerializable
を実装するデータタイプとイテレータを必要とします。更に、データソースのコレクションは並行(並列度=1)で実行することができません。
イテレータ データ シンク #
Flinkはテストおよびデバッグ目的のためのデータストリームを収集するためのシンクも提供します。以下のように使うことができます:
DataStream<Tuple2<String, Integer>> myResult = ...;
Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = myResult.collectAsync()
次はどこに行きますか? #
- Operators: 利用可能なストリーミングオペレーションの仕様。
- Event Time: Flinkの時間の概念の紹介。
- 状態 & 耐障害性: ステートフルアプリケーションの開発方法の説明。
- Connectors: 利用可能な入力と出力コネクタの説明。