Amazon S3
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Amazon S3 #

Amazon Simple Storage Service (Amazon S3)は、様々なユースケースにクラウドストレージを提供します。S3とFlinkを使って、データの読み取り書き込みを行うことも、ストリーミング状態バックエンドを組み合わせて使うこともできます。

以下の形式のパスを指定することでS3オブジェクトを通常のファイルのように使うことができます:

s3://<your-bucket>/<endpoint>

終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:

// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");

// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");

// Use S3 as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("s3://<your-bucket>/<endpoint>");

これらの例は、網羅的なものではありませんが、高可用性セットアップEmbeddedRocksDBStateBackendなど、他の場所でもS3を使えることに注意してください; FlinkがファイリシステムURIを期待する全ての場所で使えます(特に明記されていない限り)。

ほとんどのユースケースでは、自己完結型でセットアップが簡単なflink-s3-fs-hadoopflink-s3-fs-presto S3ファイルシステムプラグインのいずれかを使えます。 ただし、場合によっては、例えばS3をYARNリソースストレージディレクトリとして使う場合、特定のHadoop S3ファイルシステム実装をセットアップする必要がある場合があります。

Hadoop/Presto S3 ファイルシステムプラグイン #

EMR上のFlinkを実行している場合は、手動でこれを設定する必要はありません。

FlinkはAmazon S3と通信するための2つのファイルシステム、flink-s3-fs-prestoflink-s3-fs-hadoopを提供します。 どちらの実装も自己完結型で依存関係のフットプリントが無いため、それらを使うためにHadoopをクラスパスに追加する必要はありません。

  • flink-s3-fs-prestoは、スキーム*s3://s3p://*で登録されており、Prestoプロジェクトのコードに基づいています。 flink-conf.yamlに設定を追加することで、Prestoファイルシステムと同じ設定を使って設定できます。Presto S3実装はS3へのチェックポイント作成に推奨されるファイルシステムです。

  • flink-s3-fs-hadoopは、*s3://s3a://*で登録されており、Hadoopプロジェクトのコードに基づいています。 ファイルシステムは、flink-conf.yamlに設定を追加することで、Hadoopのs3a設定キーを使って設定できます。

    例えば、Hadoopにはfs.s3a.connection.maximum設定キーがあります。これを変更したい場合は、s3.connection.maximum: xyzflink-conf.yamlに追加する必要があります。Flinkは内部的にこれをfs.s3a.connection.maximumに変換します。HadoopのXML設定ファイルを使って設定パラメータを渡す必要はありません。

    これは、FileSystemをsポートする唯一のS3ファイルシステムです。

flink-s3-fs-hadoopflink-s3-fs-prestoはどちらも、URIのデフォルトのファイルシステムラッパーを*s3://スキームに登録します。flink-s3-fs-hadoops3a://に登録し、flink-s3-fs-prestos3p://に登録します。これを使って両方を同時に利用できます。 例えば、ジョブは、HadoopのみをサポートするFileSystemを使いますが、チェックポイントにはPrestoを使います。 この場合、シンク(Hadoop)のスキーマとしてs3a://を使い、チェックポイント(Presto))にはs3p://*を使います。

flink-s3-fs-hadoopまたはflink-s3-fs-prestoを使うには、Flinkを開始する前に、各JARファイルをoptディレクトリからFlinkの配布物のpluginsディレクトリへコピーします。例えば、

mkdir ./plugins/s3-fs-presto
cp ./opt/flink-s3-fs-presto-1.19-SNAPSHOT.jar ./plugins/s3-fs-presto/

アクセス証明書の設定 #

S3ファイルシステムラッパーをセットアップした後で、FlinkがS3バケットにアクセスすることができることを確実にする必要があります。

識別とアクセス管理 (IAM) (お勧め) #

AWS上で証明書をセットアップするお勧めの方法は Identity and Access Management (IAM)を使います。S3バケットにアクセスするためにFlinkインスタンスに証明書を安全に渡すために、IAM機能を使うことができます。これを行う方法の詳細はこのドキュメントの範囲を超えます。AWSユーザガイドを参照してください。探すものはIAM Rolesです。

これを正しくセットアップすると、AWS内のS3へのアクセスを管理することができ、Flinkに何もアクセスキーを配布する必要がありません。

アクセスキー(非推奨) #

S3へのアクセスはアクセスおよび秘密キーのペアを使って保証されます。IAM rolesの紹介なので、これは非推奨であることに注意してください。

Flinkのflink-conf.yamls3.access-keys3.secret-keyの両方を設定する必要があります:

s3.access-key: your-access-key
s3.secret-key: your-secret-key

非S3エンドポイントの設定 #

S3ファイルシステムは、IBM’s Cloud Object StorageMinIOのようなS3互換のオブジェクトストアの使用もサポートします。 これを行うには、flink-conf.yamlでエンドポイントを設定します。

s3.endpoint: your-endpoint-hostname

パススタイルアクセスの設定 #

一部のS3準拠オブジェクトストアは、例えばテスト目的でスタンドアローンMinIO使う場合など、仮想ホストスタイルのアドレス指定がデフォルトで有効になっていない場合があります。このような場合、flink-conf.yamlでパススタイルアクセスを有効にするプロパティを指定する必要があります。

s3.path.style.access: true

S3ファイルシステムのエントロピーインジェクション #

バンドルされているS3ファイルシステム(flink-s3-fs-prestoflink-s3-fs-hadoop)は、エントロピーインジェクションをサポートします。エントロピーインジェクションは、キーの先頭付近にランダムな文字を追加することで、AWS S3バケットのスケーラビリティを改善させる方法です。

エントロピーインジェクションが有効の場合、パス内の設定された部分文字列はランダムな文字に置き換えられます。例えば、s3://my-bucket/_entropy_/checkpoints/dashboard-job/は、s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/のようなものに書き換えられます。 **これは、ファイル作成時にエントロピーインジェクションのオプションを渡した場合にのみ発生します!**それ以外の場合、ファイルパスはエントロピーキーの部分文字列を完全に削除します。詳細は、FileSystem.create(Path, WriteOption)を参照してください。

Flinkランタイムは現在チェックポイントデーtらファイルにのみエントロピーを注入するオプションを渡します。チェックポイントメタデータや外部URIを含む他の全てのファイルは、チェックポイントURIを予測可能に保つためにエントロピーを注入しません。

エントロピーインジェクションを有効にするには、entropy keyentropy lengthパラメータを設定します。

s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)

s3.entropy.keyはランダムな文字で置き換えるパス内の文字列を定義します。エントロピーキーを含まないパスは変更されないままになります。 ファイルシステムオペレーションが*“inject entropy”*書き込みオプションを渡さない場合、エントロピーキー部分文字列は単純に削除されます。 s3.entropy.lengthはエントロピーに使われるランダムな英数字の数を定義します。

Back to top

inserted by FC2 system