Google Cloud Storage
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Google Cloud Storage #

Google Cloud Storage (GCS)は、様々なユースケースにクラウドストレージを提供します。これは、データの読み込み書き込みストリーミング状態バックエンドによるFileSystemCheckpointStorage)を使う場合のチェックポイントストレージにも使えます。

以下の形式のパスを指定することでGCSオブジェクトを通常のファイルのように使うことができます:

gs://<your-bucket>/<endpoint>

終端は1つのファイルまたはディレクトリのどちらかが成りえます。例えば:

// Read from GCS bucket
env.readTextFile("gs://<bucket>/<endpoint>");

// Write to GCS bucket
stream.writeAsText("gs://<bucket>/<endpoint>");

// Use GCS as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>");

これらの例は、網羅的なものではありませんが、高可用性セットアップEmbeddedRocksDBStateBackendなど、他の場所でもGCSを使えることに注意してください; FlinkがファイルシステムURIを期待する全ての場所で使えます。

GCS ファイルシステムプラグイン #

Flinkは、GCSに書き込むためのflink-gs-fs-hadoopファイルシステムを提供します。 この実装は自己完結型で依存関係のフットプリントがないため、使うためにHadoopをクラスパスに追加する必要はありません。

flink-gs-fs-hadoopは、*gs://*スキームを使ってURIのFileSystemラッパーを登録します。Googleのgcs-connector Hadoopライブラリを使ってGCSへアクセスします。また、Googleのgoogle-cloud-storageライブラリを使って、RecoverableWriterサポートを提供します。

このファイルシステムは、FileSystem connectorと一緒に使えます。

flink-gs-fs-hadoopを使うには、Flinkを開始する前に、JARファイルをoptディレクトリからFlinkの配布物のpluginsディレクトリにコピーします。例えば、

mkdir ./plugins/gs-fs-hadoop
cp ./opt/flink-gs-fs-hadoop-1.19-SNAPSHOT.jar ./plugins/gs-fs-hadoop/

設定 #

基盤となるHadoopファイルシステムは、gcs-connectorHadoop設定キーを使って、設定をflink-conf.yamlに追加することで設定できます。

例えば、gcs-connectorには、fs.gs.http.connect-timeout設定キーがあります。これを変更したい場合は、flink-conf.yamlgs.http.connect-timeout: xyzを設定する必要があります。Flinkはこれを内部的にfs.gs.http.connect-timeoutに変換します。

You can also set gcs-connector options directly in the Hadoop core-site.xml configuration file, so long as the Hadoop configuration directory is made known to Flink via the env.hadoop.conf.dir Flink option or via the HADOOP_CONF_DIR environment variable.

flink-gs-fs-hadoopは、flink-conf.yamlに以下のオプションを設定することで設定することもできます:

キー 説明
gs.writer.temporary.bucket.name RecoverableWriterによる進行中の書き込み用の一時blogsを保持するバケットを選択するために、このプロパティを設定します。このプロパティが設定されていない場合、一時blogsは最終ファイルが書き込まれるのと同じバケットに書き込まれます。どちらの場合でも、一時blobsはプリフィックス.inprogress/を付けて書き込まれます。

It is recommended to choose a separate bucket in order to assign it a TTL, to provide a mechanism to clean up orphaned blobs that can occur when restoring from check/savepoints.

If you do use a separate bucket with a TTL for temporary blobs, attempts to restart jobs from check/savepoints after the TTL interval expires may fail.
gs.writer.chunk.size このプロパティを設定して、RecoverableWriter経由の書き込みのチャンクサイズを設定します。

設定されていない場合、Googleが決定したデフォルトのチャンクサイズが使われます。

GCSにアクセスするための認証 #

GCS上のほとんどのオペレーションは認証を必要とします。認証資格情報を提供するために、以下のいずれかを行います:

  • ここで説明されているように、GOOGLE_APPLICATION_CREDENTIALS環境変数を、JobManagersとTaskManagersが実行されるJSON認証情報ファイルのパスに設定します。これは推奨される方法です。

  • core-site.xmlgoogle.cloud.auth.service.account.json.keyfileプロパティをJSON認証情報ファイルのパスに設定します(そして、上記で説明したように、Hadoop設定ディレクトリがFlinkに指定されていることを確認します):

<configuration>
  <property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>PATH TO GOOGLE AUTHENTICATION JSON FILE</value>
  </property>
</configuration>

flink-gs-fs-hadoopがこれら2つのいずれかの方法で認証情報を使うには、認証のためのサービスアカウントの使用を有効にする必要があります。これはデフォルトで有効になっています; ただし、core-site.xmlで次のように設定することで無効にすることができます:

<configuration>
  <property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>false</value>
  </property>
</configuration>

gcs-connectorは、上記のgoogle.cloud.auth.service.account.json.keyfileオプション以外に、認証視覚情報を提供する追加のオプションをサポートします。

ただし、これらの他のオプションを使う場合、提供された認証情報はRecoverableWriterサポートを提供するgoogle-cloud-storageライブラリでは使われないため、Flinkの回復可能な書き込みオペレーションは失敗することが予想されます。

このため、google.cloud.auth.service.account.json.keyfile以外のgcs-connector認証資格情報オプションの使用は推奨されません

Back to top

inserted by FC2 system