This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.
Amazon S3 #
Amazon Simple Storage Service (Amazon S3)は、様々なユースケースにクラウドストレージを提供します。S3とFlinkを使って、データの読み取りと書き込みを行うことも、ストリーミング状態バックエンドを組み合わせて使うこともできます。
以下の形式のパスを指定することでS3オブジェクトを通常のファイルのように使うことができます:
s3://<your-bucket>/<endpoint>
終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:
// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");
// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");
// Use S3 as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("s3://<your-bucket>/<endpoint>");
これらの例は、網羅的なものではありませんが、高可用性セットアップやEmbeddedRocksDBStateBackendなど、他の場所でもS3を使えることに注意してください; FlinkがファイリシステムURIを期待する全ての場所で使えます(特に明記されていない限り)。
ほとんどのユースケースでは、自己完結型でセットアップが簡単なflink-s3-fs-hadoop
とflink-s3-fs-presto
S3ファイルシステムプラグインのいずれかを使えます。
ただし、場合によっては、例えばS3をYARNリソースストレージディレクトリとして使う場合、特定のHadoop S3ファイルシステム実装をセットアップする必要がある場合があります。
Hadoop/Presto S3 ファイルシステムプラグイン #
EMR上のFlinkを実行している場合は、手動でこれを設定する必要はありません。
FlinkはAmazon S3と通信するための2つのファイルシステム、flink-s3-fs-presto
とflink-s3-fs-hadoop
を提供します。
どちらの実装も自己完結型で依存関係のフットプリントが無いため、それらを使うためにHadoopをクラスパスに追加する必要はありません。
-
flink-s3-fs-presto
は、スキーム*s3://とs3p://*で登録されており、Prestoプロジェクトのコードに基づいています。flink-conf.yaml
に設定を追加することで、Prestoファイルシステムと同じ設定を使って設定できます。Presto S3実装はS3へのチェックポイント作成に推奨されるファイルシステムです。 -
flink-s3-fs-hadoop
は、*s3://とs3a://*で登録されており、Hadoopプロジェクトのコードに基づいています。 ファイルシステムは、flink-conf.yaml
に設定を追加することで、Hadoopのs3a設定キーを使って設定できます。例えば、Hadoopには
fs.s3a.connection.maximum
設定キーがあります。これを変更したい場合は、s3.connection.maximum: xyz
をflink-conf.yaml
に追加する必要があります。Flinkは内部的にこれをfs.s3a.connection.maximum
に変換します。HadoopのXML設定ファイルを使って設定パラメータを渡す必要はありません。これは、FileSystemをsポートする唯一のS3ファイルシステムです。
flink-s3-fs-hadoop
とflink-s3-fs-presto
はどちらも、URIのデフォルトのファイルシステムラッパーを*s3://スキームに登録します。flink-s3-fs-hadoop
はs3a://に登録し、flink-s3-fs-presto
はs3p://に登録します。これを使って両方を同時に利用できます。
例えば、ジョブは、HadoopのみをサポートするFileSystemを使いますが、チェックポイントにはPrestoを使います。
この場合、シンク(Hadoop)のスキーマとしてs3a://を使い、チェックポイント(Presto))にはs3p://*を使います。
flink-s3-fs-hadoop
またはflink-s3-fs-presto
を使うには、Flinkを開始する前に、各JARファイルをopt
ディレクトリからFlinkの配布物のplugins
ディレクトリへコピーします。例えば、
mkdir ./plugins/s3-fs-presto
cp ./opt/flink-s3-fs-presto-1.19-SNAPSHOT.jar ./plugins/s3-fs-presto/
アクセス証明書の設定 #
S3ファイルシステムラッパーをセットアップした後で、FlinkがS3バケットにアクセスすることができることを確実にする必要があります。
識別とアクセス管理 (IAM) (お勧め) #
AWS上で証明書をセットアップするお勧めの方法は Identity and Access Management (IAM)を使います。S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。探すものはIAM Rolesです。
これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。
アクセスキー(非推奨) #
S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。
Flinkのflink-conf.yaml
のs3.access-key
とs3.secret-key
の両方を設定する必要があります:
s3.access-key: your-access-key
s3.secret-key: your-secret-key
非S3エンドポイントの設定 #
S3ファイルシステムは、IBM’s Cloud Object StorageやMinIOのようなS3互換のオブジェクトストアの使用もサポートします。
これを行うには、flink-conf.yaml
でエンドポイントを設定します。
s3.endpoint: your-endpoint-hostname
パススタイルアクセスの設定 #
一部のS3準拠オブジェクトストアは、例えばテスト目的でスタンドアローンMinIO使う場合など、仮想ホストスタイルのアドレス指定がデフォルトで有効になっていない場合があります。このような場合、flink-conf.yaml
でパススタイルアクセスを有効にするプロパティを指定する必要があります。
s3.path.style.access: true
S3ファイルシステムのエントロピーインジェクション #
バンドルされているS3ファイルシステム(flink-s3-fs-presto
とflink-s3-fs-hadoop
)は、エントロピーインジェクションをサポートします。エントロピーインジェクションは、キーの先頭付近にランダムな文字を追加することで、AWS S3バケットのスケーラビリティを改善させる方法です。
エントロピーインジェクションが有効の場合、パス内の設定された部分文字列はランダムな文字に置き換えられます。例えば、s3://my-bucket/_entropy_/checkpoints/dashboard-job/
は、s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/
のようなものに書き換えられます。
**これは、ファイル作成時にエントロピーインジェクションのオプションを渡した場合にのみ発生します!**それ以外の場合、ファイルパスはエントロピーキーの部分文字列を完全に削除します。詳細は、FileSystem.create(Path, WriteOption)を参照してください。
Flinkランタイムは現在チェックポイントデーtらファイルにのみエントロピーを注入するオプションを渡します。チェックポイントメタデータや外部URIを含む他の全てのファイルは、チェックポイントURIを予測可能に保つためにエントロピーを注入しません。
エントロピーインジェクションを有効にするには、entropy keyとentropy lengthパラメータを設定します。
s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)
s3.entropy.key
はランダムな文字で置き換えるパス内の文字列を定義します。エントロピーキーを含まないパスは変更されないままになります。
ファイルシステムオペレーションが*“inject entropy”*書き込みオプションを渡さない場合、エントロピーキー部分文字列は単純に削除されます。
s3.entropy.length
はエントロピーに使われるランダムな英数字の数を定義します。