このコネクタはパーティション化されたファイルをHadoopファイルシステムによってサポートされる任意のファイルシステムに書き込むシンクを提供します。このコネクタを使うには、以下の依存をプロジェクトに追加します:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
ストリーミングコネクタは現在のところバイナリ配布の一部ではないことに注意してください。クラスタ実行のためにプログラムをライブラリと一緒にパッケージする方法についての情報はここを見てください。
バケット化の挙動と書き込みは設定することができますが、あとでそれをしようと思います。時間によって分割されるローリング ファイルへのシンクをするデフォルトのバケット化シンクを生成する方法です:
DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));
val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))
必要なパラメータはバケットが格納されるベースパスのみです。シンクは独自のバケッター、ライターおよびバッチサイズを指定することで更に設定することができます。
デフォルトでは、バケット化シンクは要素が到着した現在のシステム時間によって分割され、日時のパターン "yyyy-MM-dd--HH"
を使ってバケットに名前を付けるでしょう。このパターンはバケットのパスを形成するために現在のシステム時間を使ってSimpleDateFormat
に渡されます。新しいバケットは新しい日付になった時に生成されるでしょう。例えば、精度として分を含むパターンの場合、新しいバケットを分毎に取得するでしょう。Each bucket is itself a directory that contains several part files: each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. バケットが不活性になると、開かれた部分のファイルはフラッシュされ閉じられるでしょう。バケットが最近書き込まれていない場合は、不活性と見なされます。デフォルトでは、シンクは不活性のバケットを毎分チェックし、1分以上書き込まれていない全てのバケットを閉じます。この挙動はBucketingSink
上のsetInactiveBucketCheckInterval()
とsetInactiveBucketThreshold()
を使って設定することができます。
BucketingSink
上のsetBucketer()
を使って独自のバケッターを指定することもできます。希望する場合は、バケッターはバケット ディレクトリを決定するために各要素あるいはタプルのプロパティを使うことができます。
デフォルトのライターは StringWriter
です。これはやってくる改行で区切られた要素上のtoString()
を呼び出し、部分ファイルにそれらを書き込むでしょう。独自のライターを指定するにはBucketingSink
上のsetWriter()
を使います。Hadoop SequenceFiles を書きたい場合は、圧縮を使うように設定することもできる与えられたSequenceFileWriter
を使うことができます。
最後の設定オプションはバッチサイズです。これはパートファイルが閉じられ、新しいものが開始されなければならない時を指定します。(デフォルトの部分ファイルサイズは384 MBです)。
例:
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ...
val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
input.addSink(sink)
これは以下のスキーマに従うバケットファイルに書くシンクを生成するでしょう:
/base/path/{date-time}/part-{parallel-task}-{count}
date-time
は date/time フォーマットから取得する文字列で、parallel-task
は並行シンク インスタンスのインデックスで、count
はバッチサイズによって生成される部分ファイルの実行中の数です。
更に詳しい情報については、BucketingSinkのJavaDocを参照してください。