クラウド インフラストラクチャとの統合
はじめに
全ての主要なクラウド プロバイダはオブジェクト ストアでのデータのストレージの永続性を提供します。それらは伝統的な “POSIX” ファイルシステムではありません。単一障害点無しに数百ペタバイトのデータを格納するために、オブジェクト ストアはobject-name => data
のより単純なモデルを使って伝統的なファイルシステム ディレクトリ ツリーを置き換えます。リモートアクセスを有効にするために、オブジェクトへの走査は通常は(遅い)HTTP REST 操作として提供されます。
SparkはHadoopで実装あるいはインフラストラクチャの提供者自身が提供するファイルシステム コネクタを使ってオブジェクト内のデータを読み書きすることができます。これらのコネクタはオブジェクトストアを、ディレクトリとファイル、そしてリスト、削除およびリネームのような伝統的な操作を使ってほとんどファイルシステムのように見せます。
重要: クラウド オブジェクト ストアは実際のファイルシステムではありません
ストアはファイルシステムのように見えますが、表面化ではそれらはまだオブジェクトストアであり、その違いは重要です
それらは明言されない限りHDFSのようなクラスタファイルシステムの直接の置き換えとして使うことができません。
キーの違いは以下の通りです:
- 格納されたオブジェクトへの変更は、ディレクトリのリスト化と実際のデータアクセスの両方ですぐには見ることができないかもしれません。
- ディレクトリがエミュレートされる方法によりそれらとの連携が遅くなるかもしれないという意味です。
- リネーム操作はとても遅く、障害時には未知の状態として格納したままにします。
- ファイル内の走査は新しいHTTP呼び出しを必要とするかも知れず、惨めなパフォーマンスになります。
これはSparkにどのように影響するか?
- データの読み書きは通常のファイルシステムと連携するよりかなり遅くなるかもしれません。
- 幾つかのディレクトリ構造はクエリの分割計算の間の走査がとても非効率になるかもしれません。
- 作業の出力は後続のクエリにすぐに見えないかもしれません。
- SparkがRDD、データフレームあるいはデータセットを保存する時に普段作業をコミットするリネームベースのアルゴリズムは潜在的に遅くそして信頼できないものです。
それらの理由から、オブジェクトをストアをクエリの直接の宛先、あるいはクエリのチェーン内の中間ストアとして使うことは、常に安全とは限りません。どちらの使用が安全と見なせるかを決定するために、オブジェクトストアのドキュメントを調査してください。
特に: 一貫性の層のなんらかの形式無しに、Amazon S3は通常のリネームベースのコミッタとの連携の直接の宛先として安全に使うことはできません。
インストール
クラスパス上の関連ライブラリと有効な証明書で設定されたSparkを使って、データのパスとしてオブジェクトのURLを使うことでオブジェクトは読み込みあるいは書き込みすることができます。例えば、sparkContext.textFile("s3a://landsat-pds/scene_list.gz")
はs3aコネクタを使ってS3に格納されているファイルscene_list.gz
のRDDを作成することができます。
関連するライブラリをアプリケーションのクラスパスに追加するには、hadoop-cloud
モジュールとその依存物を含めてください。
spark.version
がSparkの選択されたバージョンに設定されたと仮定した場合、Mavenでは以下をpom.xml
ファイルに追加してください:
Apache Sparkをベースにした商用製品は一般的にクラウド インフラストラクチャに通信するためにクラスパスを直接セットアップします。この場合このモジュールは必要無いかもしれません。
認証
Sparkのジョブはオブジェクトストアの中のデータにアクセスするためにそれと認証する必要があります。
- Sparkがクラウド インフラストラクチャ内で実行中の場合、認証は通常自動的にセットアップされます。
spark-submit
はAWS_ACCESS_KEY
,AWS_SECRET_KEY
とAWS_SESSION_TOKEN
環境変数を読み、Amazon S3へのs3n
とs3a
のコネクタのための関連する認証オプションを設定します。- Hadoop クラスタでは、設定は
core-site.xml
ファイルの中で設定されるかもしれません。 - 認証の詳細は
spark-defaults.conf
内のSpark設定へ手動で追加されるかもしれません - 別のやり方として、それらはアプリケーションの
SparkContext
を設定するために使われるSparkConf
インスタンスの中でプログラム的に設定することができます。
重要: ソースコード レポジトリ、特に公開のものには、認証の秘密鍵を入れないでください
関連する設定およびセキュリティのオプションについては、Hadoopドキュメント を調べてください。
設定
各クラウドのコネクタは独自のパラメータのセットを持ちます。これも関連するドキュメントを調べてください。
オブジェクトストアへの書き込みのためのお勧めの設定
一貫性モデルが名前変更ベースのコミットが安全であることを意味するオブジェクトストアの場合、パフォーマンスのために FileOutputCommitter
v2 アルゴリズムを使います; 安全のためには、v1。
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
これは “version 1” アルゴリズムに比べてジョブの最後でのリネームをほとんど行いません。それはまだファイルをコミットするためにrename()
を使うため、オブジェクトストアが矛盾しないメタデータ/リストを持たない場合は使用するのは安全ではありません。
コミッターは一時ファイルを掃除する時に障害を無視するように設定することもできます; これは一時的なネットワークの問題がジョブの失敗に拡大されるリスクを減らします。
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
元の v1 コミットアルゴリズムは、成功したタスクの出力の名前をジョブの試行ディレクトリに変更し、ジョブコミットフェース中にそのディレクトリ内の全てのファイルを名前を最終的な宛先に変更します:
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
The slow performance of mimicked renames on Amazon S3 makes this algorithm very, very slow. これに対する推奨される解決策は、S3 “Zero Rename” コミッタ―に切り替えることです (以下を見てください)。
参考までに、ディレクトリの名前を変更する時の様々なストアとコネクタのパフォーマンスと安全性の特性を、以下に示します:
ストア | コネクタ | ディレクトリの名前変更の安全性 | 名前変更のパフォーマンス |
---|---|---|---|
Amazon S3 | s3a | 安全ではない | O(data) |
Azure Storage | wasb | 安全 | O(files) |
Azure Datalake Gen 2 | abfs | 安全 | O(1) |
Google GCS | gs | 安全 | O(1) |
一時ファイルを格納すると料金が発生するかもしれないため; "_temporary"
というディレクトリを定期的に削除してください。
Parquet I/O 設定
Parquetデータと連携する時の最適パフォーマンスのために、以下の設定を使ってください:
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true
これらはクエリ時のデータの読み込みの量を最小化します。
ORC I/O 設定
ORCデータと連携する時の最適パフォーマンスのために、これらの設定を使ってください:
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true
これらもクエリ時のデータの読み込みの量を最小化します。
Spark ストリーミングとオブジェクト ストレージ
Spark ストリーミングはStreamingContext.textFileStream()
への呼び出しを使ってストア内のパスを監視するためにFileInputDStream
を作成することで、オブジェクトストアに追加されるファイルを監視することができます。
-
新しいファイルを走査する時間は、新しいファイルの数では無く、パスの下のファイル数に比例します。ですのでそれは遅い操作になるかもしれません。ウィンドウのサイズはこれを扱えるように設定される必要があります。
-
一旦完全に書き込まれると、ファイルがオブジェクト ストア内に現れます; ファイルがまだ書き込まれている間にファイルが取り上げられないようにするために、書き込み-そして-リネームの作業手順は必要ありません。アプリケーションは監視されるディレクトリにまっすぐに書き込むことができます。
-
Streams は高速およびアトミックな
rename()
オペレーションを実装するストアにのみチェックポイントされるべきです。そうでなければチェックポイントは遅くなり一時的に読み込めないかもしれません。
安全かつ迅速にクラウドストレージに作業をコミット
前で説明したように、commit-by-rename は結果整合性を提供するオブジェクトストア(例えば: S3)では危険であり、多くの場合、従来のファイルシステムの名前変更よりも遅くなります。
一部のオブジェクトストアコネクタは、名前の変更を使わずにタスクとジョブをコミットするカスタムコミッタを提供します。Hadoop 3.1 以降でビルドされた Spark のバージョンでは、AWS S3 の S3A コネクタはそのようなコミッタです。
これらのコミッタ―は、名前を変更するためにストアの一時ディレクトリにデータを書き込む代わりに、ファイルを最終的な宛先に書き込みますが、大規模な “multi-part” アップロードを表示するために最終的な POST コマンドを発行しないでください。これらの操作は、ジョブがコミットされるまで延期されます。結果として、タスクとジョブのコミットが遥かに速くなり、タスクの失敗は結果に影響しません。
S3A コミッタ に切り替えるには、Hadoop 3.1 以降でビルドされた Spark のバージョンを使い、以下のオプションを使ってコミッタを切り替えます。
spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
Spark でサポートされている最も一般的な形式でテストされています。
mydataframe.write.format("parquet").save("s3a://bucket/destination")
これらのコミッタの詳細については、最新の Hadoop ドキュメントにあります。
更なる読み物
Apacheおよびクラウド プロバイダの両方からの標準的なコネクタのドキュメントです。
- OpenStack Swift.
- Azure Blob Storage and Azure Datalake Gen 2.
- Azure Data Lake Gen 1.
- Hadoop-AWS module (Hadoop 3.x).
- Amazon S3 via S3A and S3N (Hadoop 2.x).
- Amazon EMR ファイルシステム (EMRFS). Amazonから
- SparkおよびHadoopのためのGoogle Cloudストレージ コネクタ。Googleから
- The Azure Blob Filesystem driver (ABFS)
- IBM Cloud Object Storage connector for Apache Spark: Stocator, IBM Object Storage, how-to-use-connector. From IBM