FileSystem
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

FileSystem SQLコネクタ #

このコネクタは、Flink FileSystem abstractionによってサポートされるファイルシステム内のパーティション化されたファイルへのアクセスを提供します。

ファイルシステムコネクタ自身はFlinkに含まれ、追加の依存関係を必要としません。 対応するjarは、/libディレクトリ内のFlink配布物にあります。 ファイルシステムへの行の読み書きには、対応するフォーマットを指定する必要があります。

ファイルシステムコネクタにより、ローカルまたは分散ファイルシステムへの読み書きができます。ファイルシステムは次のように定義できます:

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = '...',                     -- required: file system connector requires to specify a format,
                                        -- Please refer to Table Formats
                                        -- section for more details
  'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
                                        -- column value is null/empty string
  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter files to read under the
                                        -- directory of `path` option. This regex pattern should be
                                        -- matched with the absolute file path. If this option is set,
                                        -- the connector  will recursive all files under the directory
                                        -- of `path` option

  -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
  -- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
  'sink.shuffle-by-partition.enable' = '...',
  ...
)
Flink File System specific dependenciesを必ず含めてください。
ファイルシステムコネクタの挙動は、以前のレガシーファイルシステムコネクタとは大きく異なります: pathパラメータはファイルではなくディレクトリに指定されており、宣言したパスで人間が判読できるファイルを取得できません。

パーティションファイル #

Flinkのファイルシステムパーティションのサポートでは、標準のhive形式を使います。ただし、パーティションをテーブルカタログに事前登録する必要はありません。パーティションはディレクトリ構造に基づいて検出および推測されます。例えば、以下のディレクトリに基づいてパーティション化されたテーブルには、datetimehourパーティションが含まれていると推測されます。

path
└── datetime=2019-08-25
    └── hour=11
        ├── part-0.parquet
        ├── part-1.parquet
    └── hour=12
        ├── part-0.parquet
└── datetime=2019-08-26
    └── hour=6
        ├── part-0.parquet

ファイルシステムテーブルは、パーティションの挿入と上書き挿入の両方をサポートします。INSERT Statementを参照してください。パーティション化されたテーブルに上書き挿入すると、テーブル全体ではなく対応するパーティションのみが上書きされます。

ファイル形式 #

ファイルシステムコネクタは複数の形式をサポートします:

  • CSV: RFC-4180。非圧縮。
  • JSON: ファイルシステムコネクタのJSON形式は、一般的なJSONファイルではなく、圧縮されていない改行区切りのJSONであることに注意してください。
  • Avro: Apache Avroavro.codecを指定することで、圧縮をサポートします。
  • Parquet: Apache Parquet. Hiveと互換性があります。
  • Orc: Apache Orc. Hiveと互換性があります。
  • Debezium-JSON: debezium-json
  • Canal-JSON: canal-json
  • Raw: raw

ソース #

ファイルシステムコネクタを使うと、単一のファイルまたはディレクトリ全体を1つのテーブルに読み取ることができます。

ソースパスとしてディレクトリを使う場合、ディレクトリ内のファイルの取り込み順序は定義されていません

ディレクトリ監視 #

デフォルトでは、ファイルシステムコネクタは制限なしです。つまり、設定されたパスを1回スキャンしてから、それ自体を閉じます。

オプションsource.monitor-intervalを設定することで、継続的なディレクトリの監視を有効にできます:

キー デフォルト 種類 説明
source.monitor-interval
(none) 期間 ソースが新しいファイルをチェックする間隔。間隔は0より大きくなければなりません。 各ファイルはパスによって一意に識別sれ、検出されるとすぐに1回処理されます。 既に処理されたファイルのセットは、ソースのライフサイクル全体を通じて状態が維持されます。 そのため、ソース状態とともにチェックポイントとセーブポイントに保持されます。 間隔が短いほど、ファイルがより迅速に検出されます。 ただし、ファイルシステム/オブジェクトストアのリスト表示やディレクトリ横断がより頻繁におこなわれることを意味します。 この設定オプションが設定されない場合、指定されたパスは1回スキャンされるため、ソースは制限付きになります。

利用可能なメタデータ #

次のコネクタのメタデータは、テーブル定義のメタデータ列としてアクセスできます。全てのメタデータは読み込み専用です。

キー データ型 説明
file.path STRING NOT NULL 入力ファイルのフルパス。
file.name STRING NOT NULL ファイル名。ファイルパスのルートから最も遠い要素です。
file.size BIGINT NOT NULL ファイルのバイト数。
file.modification-time TIMESTAMP_LTZ(3) NOT NULL ファイルの変更時刻。

以下の拡張されたCREATE TABLEの例は、これらのメタデータフィールドを公開するための構文を示しています:

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///path/to/whatever',
  'format' = 'json'
)

ストリーミングシンク #

ファイルシステムコネクタは、FlinkのFileSystemに基づくストリーミング書き込みをサポートし、ファイルにレコードを書き込みます。行エンコード形式は、CSVとJSONです。バルクエンコード形式は、Parquet、ORC、Avroです。

SQLを直接書き込み、ストリームデータを非パーティション化テーブルに挿入できます。 パーティション化されたテーブルの場合、パーティション関連のオペレーションを設定できます。詳細は、Partition Commitを参照してください。

ローリング ポリシー #

パーティションディレクトリ内のデータは、部分ファイルに分割されます。各パーティションは、そのパーティションのデータを受信したシンクの各サブタスクごとに少なくとも1つの部分ファイルを含みます。進行中の部分ファイルは閉じられ、設定可能なローリングポリシーに従って追加の部分ファイルが作成されます。このポリシーは、サイズやファイルを開くことができる最大時間を指定するタイムアウトに基づいて、部分ファイルをロールします。

オプション 必要条件 Forwarded デフォルト 種類 説明
sink.rolling-policy.file-size
オプション はい 128MB MemorySize ローリング前の最大部分ファイルサイズ。
sink.rolling-policy.rollover-interval
オプション はい 30 min 期間 部分ファイルがロールする前に開いたままにできる最大時間(デフォルトでは、小さなファイルが大量になることを避けるため30分)。 これがチェックされる頻度は'sink.rolling-policy.check-interval'オプションによって制御されます。
sink.rolling-policy.check-interval
オプション はい 1 min 期間 時間ベースのローリングポリシーをチェックする間隔これは、'sink.rolling-policy.rollover-interval'に基づいて部分ファイルをロールオーバーするかどうかをチェックする頻度を制御します。

注意: バルクフォーマット(parquet、orc、avro)の場合、ローリングポリシーとチェックポイント間隔(保留中のファイルは次のチェックポイントで完了します)はこれらのパーツのサイズと数を制御します。

NOTE: For row formats (csv, json), you can set the parameter sink.rolling-policy.file-size or sink.rolling-policy.rollover-interval in the connector properties and parameter execution.checkpointing.interval in flink-conf.yaml together if you don’t want to wait a long period before observe the data exists in file system. 他の形式(avro、orc)の場合、flink-conf.yamlでパラメータexecution.checkpointing.intervalを設定するだけです。

ファイルの圧縮 #

ファイルシンクはファイルの圧縮をサポートしているため、アプリケーションは大量のファイルを生成せずにチャックポイントの間隔を短縮できます。

オプション 必要条件 Forwarded デフォルト 種類 説明
自動圧縮
オプション no false 真偽値 ストリーミングシンクで自動圧縮を有効にするかどうか。データは一時ファイルに書き込まれます。チェックポイントが完了すると、チェックポイントで生成された一時ファイルは圧縮されます。一時ファイルは圧縮前は非表示になります。
compaction.file-size
オプション はい (none) MemorySize 圧縮ターゲットファイルサイズ。デフォルト値はローリングファイルサイズです。

有効にすると、ファイルの圧縮は、ターゲットファイルサイズに基づいて、複数の小さなファイルを大きなファイルにマージします。 実稼働環境でファイルの圧縮を実行する場合は、次のことに注意してください:

  • 単一チェックポイント内のファイルのみが圧縮されます。つまり、少なくともチェックポイントと同じ数のファイルが生成されます。
  • マージ前のファイルは不可視であるため、ファイルの可視性は、チェックポイント間隔+圧縮時間となる可能性があります。
  • 圧縮に時間がかかりすぎると、ジョブにバックプレッシャーがかかり、チェックポイントの時間が延長されます。

パーティションコミット #

パーティションを書き込んだ後、ダウンストリームアプリケーションに通知する必要があります。例えば、パーティションをHiveメタストアに追加するか、ディレクトリに_SUCCESSファイルを書き込みます。ファイルシステムシンクには、独自のポリシーを設定できるパーティションコミット機能が含まれています。コミットアクションは、トリガーポリシーの組み合わせに基づいています。

  • トリガー: パーティションのコミットのタイミングは、パーティションから抽出された時間のウォーターマーク、または処理時間によって決定できます。
  • ポリシー: パーティションのコミット方法、成功ファイルとメタストアのコミットをサポートする組み込みポリシー。Hiveの分析をトリガーして統計を生成したり、小さなファイルをマージしたりなど、独自のポリシーを実装することもできます。

注意: パーティションコミットは動的パーティションの挿入でのみ動作します。

パーティションコミットトリガー #

パーティションをコミットするタイミングを決定するには、パーティションコミットトリガーを指定します:

オプション 必要条件 Forwarded デフォルト 種類 説明
sink.partition-commit.trigger
オプション はい process-time 文字列 パーティションコミットのトリガータイプ: 'process-time': マシンの時間に基づき、パーティション時間の抽出もウォーターマークの生成も必要ありません。'現在のシステム時間'が'パーティション作成のシステム時間'に'遅延'を加えた時間を経過したら、パーティションをコミットします。'partition-time': パーティション値から抽出された時間に基づき、ウォーターマークの生成が必要です。'ウォーターマーク'が'パーティション値から抽出された時間'に'遅延'を加えた時間を経過したら、パーティションをコミットします。
sink.partition-commit.delay
オプション はい 0 s 期間 パーティションは遅延時間までコミットされません。日単位のパーティションの場合は'1 d'、時間単位のパーティションの場合は'1 h'にする必要があります。
sink.partition-commit.watermark-time-zone
オプション はい UTC 文字列 長いウォーターマーク値をTIMESTAMP値に解析するためのタイムゾーン。解析されたウォーターマークタイムスタンプは、パーティション時間と比較して、パーティションをコミットするかどうかを決定するために使われます。このオプションは`sink.partition-commit.trigger`が'partition-time'に設定されている場合にのみ有効です。このオプションが正しく設定されていない場合、例えば、ソース行時間がTIMESTAMP_LTZ列で定義されていますが、この設定が設定されていない場合、数時間後にパーティションがコミットされたことがユーザに表示される可能性があります。デフォルト値は'UTC'で、これはウォーターマークがTIMESTAMP列で定義されているか定義されていないことを意味します。ウォーターマークがTIMESTAMP_LTZ列で定義されている場合、ウォーターマークのタイムゾーンはセッションのタイムゾーンです。オプション値は、'America/Los_Angeles'のような完全名、または'GMT-08:00'のような独自のタイムゾーンです。

トリガーには次の2種類があります:

  • 1つ目はパーティションの処理時間です。パーティション時間の抽出も、ウォーターマークの生成も必要ありません。パーティションの作成時間と現在のシステム時間に応じたパーティションコミットのトリガー。このトリガーはよい汎用的ですが、それほど正確ではありません。 is more universal, but not so precise. 例えば、データの遅延やフェイルオーバーにより、パーティションのコミットが早まってしまう可能性があります。
  • 2つ目は、パーティション値とウォーターマークから抽出された時間に応じたパーティションコミットのトリガーです。 これには、ジョブにウォーターマーク生成があり、パーティションが時間ごとのパーティションや日ごとのパーティションなど、時間にしたがtって分割されている必要があります。

データが完了しているかどうかに関係なく、ダウンストリームにパーティションをできるだけ早く表示したい場合は、次のようにします:

  • ‘sink.partition-commit.trigger’=‘process-time’ (デフォルト値)
  • ‘sink.partition-commit.delay’=‘0s’ (デフォルト値) パーティションにデータが存在すると、すぐにコミットされます。注意: パーティションは複数回コミットされる可能性があります。

データが完了した場合にのみダウンストリームにパーティションを表示させ、ジョブにウォーターマークの生成があり、パーティション値から時間を抽出できる場合は、次のようにします:

  • ‘sink.partition-commit.trigger’=‘partition-time’
  • ‘sink.partition-commit.delay’=‘1h’ (パーティションが時間単位のパーティションの場合は'1h’。パーティションの種類によって異なります) これはパーティションをコミットする最も正確な方法であり、コミットされたパーティションのデータが可能な限り完全であることを確実にしようとします。

データが完了した時にのみダウンストリームにパーティションを表示したいが、ウォーターマークが無い場合、またはパーティション値から時間が抽出できない場合は、次のようにします:

  • ‘sink.partition-commit.trigger’=‘process-time’ (デフォルト値)
  • ‘sink.partition-commit.delay’=‘1h’ (パーティションが時間単位のパーティションの場合は'1h’。パーティションの種類によって異なります) パーティションを正確にコミットしようとしますが、データの遅延やフェイルオーバーによりパーティションのコミットが早まってしまう可能性があります。

遅延データ処理: すでにコミットされているパーティションにレコードが書き込まれていることになっている場合、レコードがそのパーティションに書き込まれ、このパーティションのコミットが再度トリガーされます。

パーティション時間Extractor #

このextractorsはパーティション値からの時間の抽出を定義します。

オプション 必要条件 Forwarded デフォルト 種類 説明
partition.time-extractor.kind
オプション no デフォルト: 文字列 パーティション値からの時間の抽出をするための時間extractor。デフォルトとカスタムをサポートします。デフォルトでは、タイムスタンプのpattern\formatterを設定できます。カスタムの場合は、extractorクラスを設定する必要があります。
partition.time-extractor.class
オプション no (none) 文字列 PartitionTimeExtractorインターフェースを実装するためのextractorクラス。
partition.time-extractor.timestamp-pattern
オプション no (none) 文字列 'default'の構築方法では、ユーザはパーティションフィールドを使って正式なタイムスタンプパターンを取得できます。デフォルトでは、最初のフィールドから'yyyy-MM-dd hh:mm:ss'をサポートします。タイムスタンプを単一のパーティションフィールド'dt'から抽出する必要がある場合は、'$dt'を設定できます。タイムスタンプを複数のパーティションフィールド、例えば'year'、'month'、'day'、'hour'から抽出する必要がある場合は、'$year-$month-$day $hour:00:00'を設定できます。タイムスタンプを2つのパーティションフィールド'dt'と'hour'から抽出する必要がある場合は、'$dt $hour:00:00'を設定できます。
partition.time-extractor.timestamp-formatter
オプション no yyyy-MM-dd HH:mm:ss 文字列 パーティションのタイムスタンプ文字列値をタイムスタンプにフォーマットするformatter。パーティションのタイムスタンプ文字列値は'partition.time-extractor.timestamp-pattern'で表されます。例えば、パーティションタイムスタンプが複数のパーティションフィールド、例えば、'year'、'month'、'day'から抽出されます。'partition.time-extractor.timestamp-pattern'は'$year$month$day'に、`partition.time-extractor.timestamp-formatter`は'yyyyMMdd'に設定します。デフォルトでは、formatterは'yyyy-MM-dd HH:mm:ss'です。
timestamp-formatterはJavaのDateTimeFormatterと互換性があります

デフォルトのextractorはパーティションフィールドで構成されるタイムスタンプパターンに基づいています。PartitionTimeExtractorインタフェースに基づいて完全にカスタムのパーティション抽出の実装を指定することもできます。

public class HourPartTimeExtractor implements PartitionTimeExtractor {
    @Override
    public LocalDateTime extract(List<String> keys, List<String> values) {
        String dt = values.get(0);
        String hour = values.get(1);
		return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
	}
}

パーティションコミットポリシー #

パーティションコミットポリシーは、パーティションがコミットされた時に実行されるアクションを定義します。

  • 1つ目はメタストアで、hiveテーブルのみがメタストアポリシーをサポートし、ファイルシステムはディレクトリ構造を通じてパーティションを管理します。
  • 2つ目はソースファイルで、パーティションに対応するディレクトリに空のファイルを書き込みます。
オプション 必要条件 Forwarded デフォルト 種類 説明
sink.partition-commit.policy.kind
オプション はい (none) 文字列 パーティションをコミットするポリシーは、パーティションの書き込みが完了し、パーティションを読み取る準備ができたことをダウンストリームアプリケーションに通知することです。メタストア: パーティションをメタストアに追加します。hiveテーブルのみがメタストアポリシーを差ポー押し、ファイルシステムはディレクトリ構造を通じてパーティションを管理します。success-file: ディレクトリに'_success'ファイルを追加します。両方を同時に設定できます: 'metastore,success-file'。custom: コミットポリシーを作成するためにポリシークラスを使います。複数のポリシーの設定サポート: 'metastore,success-file'。
sink.partition-commit.policy.class
オプション はい (none) 文字列 PartitionCommitPolicyインタフェースを実装するためのパーティションコミットポリシークラス。カスタムコミットポリシーでのみ動作します。
sink.partition-commit.policy.class.parameters
オプション はい (none) 文字列 カスタムコミットポリシーのコンストラクタへ渡されるパラメータ。複数のパラメータは、'param1;param2'のように、セミコロンで区切られます。設定値はリスト(['param1', 'param2'])に分割され、カスタムコミットポリシークラスのコンストラクタに渡されます。このオプションはオプションで、設定されない場合はデフォルトのコンストラクタが使われます。
sink.partition-commit.success-file.name
オプション はい _SUCCESS 文字列 success-fileパーティションコミットポリシーのファイル名で、デフォルトは'_SUCCESS'です。

コミットポリシーの実装を拡張でき、カスタムコミットポリシーの実装は次のようになります:

public class AnalysisCommitPolicy implements PartitionCommitPolicy {
    private HiveShell hiveShell;
	
    @Override
	public void commit(Context context) throws Exception {
	    if (hiveShell == null) {
	        hiveShell = createHiveShell(context.catalogName());
	    }
	    
        hiveShell.execute(String.format(
            "ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = '%s') location '%s'",
	        context.tableName(),
	        context.partitionKeys().get(0),
	        context.partitionValues().get(0),
	        context.partitionPath()));
	    hiveShell.execute(String.format(
	        "ANALYZE TABLE %s PARTITION (%s = '%s') COMPUTE STATISTICS FOR COLUMNS",
	        context.tableName(),
	        context.partitionKeys().get(0),
	        context.partitionValues().get(0)));
	}
}

Sinkの並列度 #

外部ファイル(Hiveを含む)へのファイルの書き込みの並列度は、ストリーミングモードとバッチモードの両方でサポートされる、対応するテーブルオプションによって設定できます。デフォルトでは、並列度は最後に上流にチェーンされたオペレータの並列度と同じになるように設定されます。上流の並列度と異なる並列度が設定される場合は、ファイルを書き込むオペレータとファイルを圧縮するオペレータ(使用されている場合)が並列度に適用されます。

オプション 必要条件 Forwarded デフォルト 種類 説明
sink.parallelism
オプション no (none) 数字 亜ギブファイルシステムへのファイルの書き込みの並列度。値は0より大きくなければなりません。それ以外の場合は例外が投げられます。

注意: 現在、シンクの並列度の設定は、アップストリームの変更ログモードがINSERT-ONLYの場合のみサポートされます。それ以外の場合は例外が投げられます。

完全な例 #

以下の例は、ファイルシステムコネクタを使ってストリーミングクエリを作成してKafkaからファイルシステムにデータを書き込み、バッチクエリを使ってデータを読み戻す方法を示しています。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table 
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
    DATE_FORMAT(log_ts, 'HH') 
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

ウォーターマークがTIMESTAMP_LTZカラムで定義され、コミットにpartition-timeを使う場合、sink.partition-commit.watermark-time-zoneがセッションタイムゾーンに設定される必要があります。それ以外の場合は数時間後にパーティションがコミットされる場合があります。

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  ts BIGINT, -- time in epoch milliseconds
  ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
) WITH (...);

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
  'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table 
SELECT 
    user_id, 
    order_amount, 
    DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
    DATE_FORMAT(ts_ltz, 'HH') 
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';

Back to top

inserted by FC2 system