Sparkのチューニング

ほとんどのSparkの計算のメモリ内の性質により、Sparkプログラムはクラスタ内のリソースによってボトルネックになるかも知れません: CPU, ネットワーク帯域、あるいはメモリ。よくあるのは、データがメモリに収まる場合にネットワークの帯域がボトルネックなることですが、時には、メモリの使用を減らすためにRDDをシリアライズ形式で格納など幾つか調整をする必要があるかも知れません。このガイドは2つの主要なトピックをカバーします: データのシリアライズ化、これはネットワークパフォーマンスが良いときに重要で、メモリの使用を減らすこともでき、メモリのチューニングに重要です。幾つかの小さなトピックについても概説します。

データのシリアライズ

シリライズ化は分散されたアプリケーションのパフォーマンスにといて重要な役割を果たします。オブジェクトをシリアライズするのが遅いフォーマット、あるいはバイトを大量に消費するフォーマットは、計算をとても遅くするでしょう。しばしば、これはSparkアプリケーションを最適化するために調整しなければならない最初のことでしょう。Sparkは便利さ(操作の中での任意のJavaタイプを動かすことができる)とパフォーマンスをうまく両立させることを狙っています。2つのシリアライズのライブラリを提供します:

ジョブを SparkConfを使って初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")を呼び出すことで、Kryoを使うように切り替えることができます。この設定は、ワーカー間でデータをシャッフルするためだけでなく、RDDをディスクにシリアライズ化するための使われるシリアライザーを設定します。Kryoが標準ではない唯一の理由は独自の登録が必要なためですが、ネットワーク集約的なアプリケーションの中で使ってみることをお勧めします。Spark 2.0.0 から、単純な型、単純な型の配列、あるいは文字列型を使ってRDDをシャッフルする時に内部的にKryoシリアライザを使います。

Spark は、AllScalaRegistrar内でカバーされる多くの一般的に使用されるコアScalaクラスのために、Twitter chill ライブラリからKryoシリアライザを自動的にインクルードします。

独自のクラスをKryoに登録するには、registerKryoClasses メソッドを使います。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo ドキュメントは、独自のシリアライズ化を追加するような、もっと進んだ登録のオプションを説明します。

オブジェクトが大きければ、spark.kryoserializer.buffer 設定を大きくする必要もあるかも知れません。この値はシリアライズ化しようと思っているもっとも大きな オブジェクトを十分に保持できる必要があります。

最終的にもし独自のクラスを登録できない場合は、Kryoはそれでも動作しますが、各オブジェクトの完全なクラス名を格納することになるでしょうが、これは無駄です。

メモリー チューニング

メモリの使用を調整するには3つの考慮すべき点があります: オブジェクトによって使用されるメモリ量(データセット全体をメモリに入れたいと思うかも知れません)。それらのオブジェクトにアクセスするためのコストガベージコレクションのオーバーヘッド (オブジェクトという点で回転数が高ければ)。

デフォルトでは、Javaオブジェクトはアクセスが速いですが、それらのフィールド中の"raw"データよりも2-5x倍の容量を簡単に消費しえます。これには幾つかの理由があります:

この章はSparkでのメモリ管理の概要から始め、ユーザが彼/彼女のアプリケーション内のメモリの効率的な管理をするのが習慣になるような具体的なストラテジについて議論するつもりです。特に、この章はオブジェクトのメモリの使用をどう決定するかと、それをどう改善するかの方法を説明するつもりです - データ構造を変更するか、シリアライズ化されたフォーマットでデータを格納するかのどちらかです。それから、SparkのキャッシュサイズとJavaのガベージコレクションの調整をカバーするつもりです。

メモリ管理概要

Sparkでのメモリの使用は2つに1つの分類に分類されます: 実行と保存。実行メモリはシャッフル、ジョイン、ソートおよび集約で使われるメモリを参照しますが、保存メモリは内部データをクラスタに渡ってキャッシングおよび増殖するために使われるメモリを参照します。Sparkでは、実行および保存は統一された領域(M)を共有します。実行メモリが使われない場合、保存メモリは全ての利用可能なメモリを手に入れることができ、逆もまたしかりです。必要であれば実行が保存を追い出しますが、総保存メモリの使用率がある閾値(R)未満になるまでです。別の言い方をすると、RM内の副領域で、キャッシュされたブロックが追い出されない領域を説明します。保存は実装の複雑さにより、実行を追い出せないでしょう。

この設計は幾つかの望ましい性質を保証します。まず、キャッシングを使わないアプリケーションは実行のために全空間を使うことができ、不必要なディスクへの零れ落ちを未然に防ぎます。次に、キャッシングを行わないアプリケーションは、追い出しに耐性のあるデータブロックの最小の保存領域(R)を保持することができます。最後に、このやり方は、どのようにメモリを内部的に分割するかの専門知識無しに、様々な作業の納得できるパフォーマンスを追加設定無しに提供します。

2つの関連する設定がありますが、デフォルトの値はほとんどの作業に適用可能なため、一般的なユーザはそれらを調整する必要はありません。

spark.memory.fractionの値は、JVMの古いあるいは"終身"世代内で総ヒープ空間に気持ちよく納めるために設定されなければなりません。詳細は下記の上級のGCチューニングの議論を見てください。

メモリ消費の決定

The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. そのページはRDDがどれだけのメモリを占有しているかを知らせるでしょう。

特定のオブジェクトのメモリの消費量を推測するには、SizeEstimatorestimate メソッドを使います。これは実験的に異なるデータのレイアウトを使ってメモリの使用を減らすのに役に立ち、各executor のヒープでブロードキャスト変数が占有するだろう総領域を決定するのに役立ちます。

データ構造のチューニング

メモリの消費を減らす一番最初の方法は、ポインターに基づいたデータ構造とオブジェクトのラッパーのようなオーバーヘッドを追加するJava機能を避けることです。これをするには幾つかの方法があります:

  1. データ構造を、標準のJavaあるいはScalaコレクションクラス(例えば、HashMap)の代わりに、オブジェクトの好ましい配列および基本タイプに設計します。fastutil ライブラリはJava標準ライブラリと互換性のある基本タイプのための便利なコレクションクラスを提供します。
  2. 可能であれば、大量の小さなオブジェクトとポインターを持つ入れ子の構造を避けます。
  3. キーとして文字列の代わりに数字のIDあるいはenumerationオブジェクトの使用を考慮します。
  4. RAMが32GB未満であれば、ポインターを8バイトの代わりに4バイトにするためにJVMフラグ-XX:+UseCompressedOops を設定します。これらのオプションをspark-env.shの中で追加することができます。

シリアライズ化されたRDDストレージ

この調整にも関わらず、オブジェクトがまだ効果的に格納するには大きすぎる場合、メモリの使用を減らすためのもっと簡単な方法は、それらをMEMORY_ONLY_SERのようなRDD 永続化 APIの中のシリアライズ化されたStorageLevelを使って serialized 形式で保存することです。そうすればSparkは各RDDパーティションを1つの大きなバイト配列として保存するでしょう。シリアライズ化形式でデータを保存する唯一の良くない点は、その場で各オブジェクトをデシリアライズしなければならないことによる、アクセス時の遅さです。Kryoの使用はJavaのシリアライズ化よりもかなりサイズが小さく(そしてもちろんraw Javaオブジェクトよりも)なるため、もしシリアライズ化形式でデータをキャッシュしたい場合はKryoの使用を強くお勧めします。

ガベージコレクションのチューニング

プログラムによって格納されるRDDの点で大きな"churn"がある場合、JVMのガベージコレクションは問題になるかも知れません。(RDDを一度だけ読み込み、その上で多くの操作を実行するプログラムでは通常問題になりません。) Javaが新しいオブジェクトに場所を空けるために古いオブジェクトを立ち退かせる必要がある場合は、全てのJavaオブジェクトを通じて追跡し使用されていないオブジェクトを見つける必要があるでしょう。ここで覚えておくべき主要なポイントは、ガベージコレクトのコストはJavaオブジェクトの数に比例するということです。つまり、少ないオブジェクトのデータ構造の使用(例えば、LinkedListの代わりにIntの配列) は大いにこのコストを小さくします。An even better method is to persist objects in serialized form, as described above: now there will be only one object (a byte array) per RDD partition. もしGCが問題であれば、他の技術を試す前に最初に試すべきものは、シリアライズ化されたキャッシュを使用することです。

GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes. これを緩和するためにRDDのキャッシュに割り当てられた領域を制御する方法について議論するつもりです。

GCの影響の測定

GCチューニングの最初のステップはガベージコレクションが起こる頻度とGCに費やされる総時間についての統計を集めることです。これはJavaのオプションに-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps を追加することで行うことができます。(JavaオプションをSparkジョブへ渡す方法についての情報は設定ガイド を見てください。) 次にSparkジョブが実行された時に、ガベージコレクションが起きる度にワーカーログにメッセージが表示されるのを見るでしょう。これらのログはドライバープログラムではなく、クラスターのワーカーノードにあることに注意してください(ワーカーディレクトリ内のstdout ファイル内)。

上級のGCチューニング

更にガベージコレクションを調整するには、まずJVMのメモリ管理についていくつかの基本的な情報を理解する必要があります:

SparkでのGCチューニングの目的は、長く存在しているRDDがOld世代に格納され、Young世代が生存が短いオブジェクトを格納するのに十分なサイズになることを保証することです。これはタスクの実行中に生成された一時的なオブジェクトを集めるためのfull GCを避けるのに役立つでしょう。有用かも知れない幾つかのステップ:

経験では、GCチューニングの効果はアプリケーションと利用可能なメモリ量に依存することが示唆されます。オンラインで説明される 多くのチューニングオプションがありますが、高レベルでは、full GCがどれだけの頻度で発生するかの管理がオーバーヘッドを減らすのに役立ちます。

executorのためのGC チューニングフラグはジョブの設定内のspark.executor.extraJavaOptionsの設定によって指定することができます。

他に考慮すべきこと

並行度のレベル

各操作についての並行度のレベルを十分に高くしない限りはクラスタは完全には利用されないでしょう。Spark はサイズに応じて各ファイル上で実行するために “map” タスクの数を自動的に設定し(しかし、任意のパラメータ SparkContext.textFile などを設定することで制御することができます)、groupByKeyreduceByKey のような分散された “reduce” 操作については、親のRDDのパーティションの最大の数を使用します。2つ目の引数として並行レベルを渡すことができます(spark.PairRDDFunctions ドキュメントを見てください)。あるいはデフォルトを変更するために設定プロパティspark.default.parallelism を設定してください。一般的に、クラスタ内のCPUコアあたり2-3のタスクがお勧めです。

reduceタスクのメモリの使用

時には、RDDがメモリ内に収まらないからではなく、groupByKey内のreduceタスクのうちの一つのような、タスクの一つの作業セットがあまりにも大きいために、OutOfMemoryErrorを受け取るかも知れません。Sparkのシャッフル操作(sortByKey, groupByKey, reduceByKey, joinなど)は各タスク内にグルーピングを実行するためにハッシュテーブルを作成します。これはしばしば大きくなりえます。これを修正するもっとも簡単な方法は、各タスクの入力セットが小さくなるように並行レベルの増加をすることです。Sparkは1つのexecutor JVMを多くのタスクに渡って再利用し、タスクの起動コストが低いため、Sparkは200msぐらいのタスクを効果的にサポートします。つまり、並行度レベルをクラスタ内のコアの数より大きな数に安全に増やすことができます。

大きな変数のブロードキャスト

SparkContextで利用可能な ブロードキャスト機能 を使うことは、各シリアライズ化タスクのサイズおよびクラスタを超えるジョブの起動のコストを大きく減らします。タスクがそれらの中のドライバプログラムから大きなオブジェクトを使う場合(例えば、静的な調査テーブル)、ブロードキャスト変数への調整を考えてください。Spark はマスター上の各タスクのシリアライズ化されたサイズを出力します。つまり、それを見てタスクが大きすぎないかを決めることができます; 一般的に20KBより大きなタスクは最適化する価値があります。

データの局所性

データの局所性はSparkジョブのパフォーマンスに大きな影響を持ちえます。もしデータとその上で実行するコードが一緒にあると、計算がより速くなる傾向があります。しかし、もしコードとデータが分かれていると、片方がもう片方に移動しなければなりません。一般的に、シリアライズ化されたコードを場所から場所へ転送することは、コードサイズがデータよりもかなり小さいために、データをぶつ切りするよりも速いです。Sparkはこのデータの局所性の一般的な原則に基づいてスケジューリングを構築します。

データの局所性はデータがどれだけ処理をするコードに近いかです。データの現在の場所に基づいて、局所性の幾つかのレベルがあります。近いものから遠いものの順に:

Sparkは全てのタスクを一番良い局所性レベルにスケジュールする方が良いですが、これは常に可能なわけではありません。処理されていないデータが仕事をしていないexecutor上にある場合、Sparkは局所性レベルを下げます。2つの選択肢があります: a) 忙しいCPUが同じサーバ上のデータ上のタスクを開始するために自由になるまで待つ。あるいは b) データがそこに移動される必要がある遠く離れた場所で新しいタスクをすぐに開始する。

一般的にSparkは忙しいCPUが自由になることを期待して少しだけ待ちます。タイムアウトが期限切れになると、遠くの場所から自由なCPUにデータの移動を開始します。各レベル間のフォールバックのタイムアウトは、個々にあるいは1つのパラメータで一斉に設定することができます; 詳細は設定ページspark.localityを見てください。タスクが長く局所性が乏しい場合はこれらの設定を増加する必要がありますが、通常はデフォルトで良く動作します。

概要

Sparkアプリケーションをチューニングする場合に知るべき主な考慮点、最も重要で、データのシリアライズ化およびメモリのチューニング、を示した短いガイドです 。ほとんどのプログラムについて、Kryoシリアライズ化とシリアライズ化形式のデータの永続化は、もっとも一般的なパフォーマンス問題を解決するでしょう。他のチューニングのベストプラクティスについては、気軽にSpark メーリングリスト で聞いてください。

TOP
inserted by FC2 system