This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Text files
テキストファイル形式 #
Flinkは、TextLineInputFormat
を使ってファイルからのテキスト行の読み込みをサポートします。この形式はJavaの組み込みのInputStreamReaderを使って、サポートされる様々な文字セットエンコーディングを使ってバイトストリームをでコードします。
この形式を使うには、プロジェクトにFlink Connector Files依存関係を追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.19-SNAPSHOT</version>
</dependency>
PyFlinkユーザの場合、ジョブの中で直接使えます。
この形式は、バッチモードとストリーミングモードの両方で使える新しいソースと互換性があります。 しtがって、この形式は次の2つの方法で使えます:
- バッチモードの制限付き読み込み
- ストリーミングモードの連続読み込み: ディレクトリに現れる新しいファイルを監視します
制限付き読み込みの例:
この例では、テキストファイルの行をStringsとして含むDataStreamを作成します。 レコードにはイベントのタイムスタンプが含まれていないため、ウォーターマーク戦略は必要ありません。
final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
.build();
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), *path).build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
j連続読み込みの例: この例では、テキストファイルの行をStringsとして含むDataStreamを作成します。このDataStreamは新しいファイルがディレクトリに追加されると無限に増加します。新しいファイルを毎秒監視します。 レコードにはイベントのタイムスタンプが含まれていないため、ウォーターマーク戦略は必要ありません。
final FileSource<String> source =
FileSource.forRecordStreamFormat(new TextLineInputFormat(), /* Flink Path */)
.monitorContinuously(Duration.ofSeconds(1L))
.build();
final DataStream<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
source = FileSource \
.for_record_stream_format(StreamFormat.text_line_format(), *path) \
.monitor_continously(Duration.of_seconds(1)) \
.build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")