クラウド インフラストラクチャとの統合

はじめに

全ての主要なクラウド プロバイダはオブジェクト ストアでのデータのストレージの永続性を提供します。それらは伝統的な “POSIX” ファイルシステムではありません。単一障害点無しに数百ペタバイトのデータを格納するために、オブジェクト ストアはobject-name => dataのより単純なモデルを使って伝統的なファイルシステム ディレクトリ ツリーを置き換えます。リモートアクセスを有効にするために、オブジェクトへの走査は通常は(遅い)HTTP REST 操作として提供されます。

SparkはHadoopで実装あるいはインフラストラクチャの提供者自身が提供するファイルシステム コネクタを使ってオブジェクト内のデータを読み書きすることができます。これらのコネクタはオブジェクトストアを、ディレクトリとファイル、そしてリスト、削除およびリネームのような伝統的な操作を使ってほとんどファイルシステムのように見せます。

重要: クラウド オブジェクト ストアは実際のファイルシステムではありません

ストアはファイルシステムのように見えますが、表面化ではそれらはまだオブジェクトストアであり、その違いは重要です

それらは明言されない限りHDFSのようなクラスタファイルシステムの直接の置き換えとして使うことができません。

キーの違いは以下の通りです:

これはSparkにどのように影響するか?

  1. データの読み書きは通常のファイルシステムと連携するよりかなり遅くなるかもしれません。
  2. 幾つかのディレクトリ構造はクエリの分割計算の間の走査がとても非効率になるかもしれません。
  3. 作業の出力は後続のクエリにすぐに見えないかもしれません。
  4. SparkがRDD、データフレームあるいはデータセットを保存する時に普段作業をコミットするリネームベースのアルゴリズムは潜在的に遅くそして信頼できないものです。

それらの理由から、オブジェクトをストアをクエリの直接の宛先、あるいはクエリのチェーン内の中間ストアとして使うことは、常に安全とは限りません。どちらの使用が安全と見なせるかを決定するために、オブジェクトストアのドキュメントを調査してください。

特に: 一貫性の層のなんらかの形式無しに、Amazon S3は通常のリネームベースのコミッタとの連携の直接の宛先として安全に使うことはできません。

インストール

クラスパス上の関連ライブラリと有効な証明書で設定されたSparkを使って、データのパスとしてオブジェクトのURLを使うことでオブジェクトは読み込みあるいは書き込みすることができます。例えば、sparkContext.textFile("s3a://landsat-pds/scene_list.gz") はs3aコネクタを使ってS3に格納されているファイルscene_list.gzのRDDを作成することができます。

関連するライブラリをアプリケーションのクラスパスに追加するには、hadoop-cloud モジュールとその依存物を含めてください。

spark.version がSparkの選択されたバージョンに設定されたと仮定した場合、Mavenでは以下をpom.xml ファイルに追加してください:

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>hadoop-cloud_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

Apache Sparkをベースにした商用製品は一般的にクラウド インフラストラクチャに通信するためにクラスパスを直接セットアップします。この場合このモジュールは必要無いかもしれません。

認証

Sparkのジョブはオブジェクトストアの中のデータにアクセスするためにそれと認証する必要があります。

  1. Sparkがクラウド インフラストラクチャ内で実行中の場合、認証は通常自動的にセットアップされます。
  2. spark-submitAWS_ACCESS_KEY, AWS_SECRET_KEYAWS_SESSION_TOKEN 環境変数を読み、Amazon S3へのs3ns3a のコネクタのための関連する認証オプションを設定します。
  3. Hadoop クラスタでは、設定はcore-site.xml ファイルの中で設定されるかもしれません。
  4. 認証の詳細はspark-defaults.conf内のSpark設定へ手動で追加されるかもしれません
  5. 別のやり方として、それらはアプリケーションのSparkContextを設定するために使われるSparkConfインスタンスの中でプログラム的に設定することができます。

重要: ソースコード レポジトリ、特に公開のものには、認証の秘密鍵を入れないでください

関連する設定およびセキュリティのオプションについては、Hadoopドキュメント を調べてください。

設定

各クラウドのコネクタは独自のパラメータのセットを持ちます。これも関連するドキュメントを調べてください。

一貫性モデルが名前変更ベースのコミットが安全であることを意味するオブジェクトストアの場合、パフォーマンスのために FileOutputCommitter v2 アルゴリズムを使います。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

これは “version 1” アルゴリズムに比べてジョブの最後でのリネームをほとんど行いません。それはまだファイルをコミットするためにrename()を使うため、オブジェクトストアが矛盾しないメタデータ/リストを持たない場合は使用するのは安全ではありません。

コミッターは一時ファイルを掃除する時に障害を無視するように設定することもできます; これは一時的なネットワークの問題がジョブの失敗に拡大されるリスクを減らします。

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

一時ファイルを格納すると料金が発生するかもしれないため; これを避けるために "_temporary" というディレクトリを定期的に削除してください。

Parquet I/O 設定

Parquetデータと連携する時の最適パフォーマンスのために、以下の設定を使ってください:

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

これらはクエリ時のデータの読み込みの量を最小化します。

ORC I/O 設定

ORCデータと連携する時の最適パフォーマンスのために、これらの設定を使ってください:

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

これらもクエリ時のデータの読み込みの量を最小化します。

Spark ストリーミングとオブジェクト ストレージ

Spark ストリーミングはStreamingContext.textFileStream()への呼び出しを使ってストア内のパスを監視するためにFileInputDStreamを作成することで、オブジェクトストアに追加されるファイルを監視することができます。

  1. 新しいファイルを走査する時間は、新しいファイルの数では無く、パスの下のファイル数に比例します。ですのでそれは遅い操作になるかもしれません。ウィンドウのサイズはこれを扱えるように設定される必要があります。

  2. 一旦完全に書き込まれると、ファイルがオブジェクト ストア内に現れます; ファイルがまだ書き込まれている間にファイルが取り上げられないようにするために、書き込み-そして-リネームの作業手順は必要ありません。アプリケーションは監視されるディレクトリにまっすぐに書き込むことができます。

  3. Streams は高速およびアトミックなrename() オペレーションを実装するストアにのみチェックポイントされるべきです。そうでなければチェックポイントは遅くなり一時的に読み込めないかもしれません。

更なる読み物

Apacheおよびクラウド プロバイダの両方からの標準的なコネクタのドキュメントです。

TOP
inserted by FC2 system