Sparkのチューニング
ほとんどのSparkの計算のメモリ内の性質により、Sparkプログラムはクラスタ内のリソースによってボトルネックになるかも知れません: CPU, ネットワーク帯域、あるいはメモリ。よくあるのは、データがメモリに収まる場合にネットワークの帯域がボトルネックなることですが、時には、メモリの使用を減らすためにRDDをシリアライズ形式で格納など幾つか調整をする必要があるかも知れません。このガイドは2つの主要なトピックをカバーします: データのシリアライズ化、これはネットワークパフォーマンスが良いときに重要で、メモリの使用を減らすこともでき、メモリのチューニングに重要です。幾つかの小さなトピックについても概説します。
データのシリアライズ
シリライズ化は分散されたアプリケーションのパフォーマンスにといて重要な役割を果たします。オブジェクトをシリアライズするのが遅いフォーマット、あるいはバイトを大量に消費するフォーマットは、計算をとても遅くするでしょう。しばしば、これはSparkアプリケーションを最適化するために調整しなければならない最初のことでしょう。Sparkは便利さ(操作の中での任意のJavaタイプを動かすことができる)とパフォーマンスをうまく両立させることを狙っています。2つのシリアライズのライブラリを提供します:
- Java シリアライズ化: デフォルトで、SparkはJavaの
ObjectOutputStream
フレームワークを使ってオブジェクトをシリアライズし、java.io.Serializable
を実装するどのようなクラスを使っても動作することができます。java.io.Externalizable
を拡張することでもっと綿密にシリアライズ化のパフォーマンスを制御することもできます。Java のシリアライズ化は柔軟ですが、しばしばかなり遅く、結果として多くのクラスのための大きなシリアライズ化フォーマットになります。 - Kryo シリアライズ化: Spark はオブジェクトをもっと高速にシリアライズするためにKryo ライブラリ(バージョン2)を使うこともできます。Kryoはとても速く、Javaのシリアライズ化よりもコンパクト(時には10x倍)ですが、全ての
Serializable
タイプをサポートしておらず、ベストパフォーマンスのためには前もって使うプログラム内のクラスに登録しなければなりません。
ジョブを SparkConfを使って初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
を呼び出すことで、Kryoを使うように切り替えることができます。この設定は、ワーカー間でデータをシャッフルするためだけでなく、RDDをディスクにシリアライズ化するための使われるシリアライザーを設定します。Kryoが標準ではない唯一の理由は独自の登録が必要なためですが、ネットワーク集約的なアプリケーションの中で使ってみることをお勧めします。Spark 2.0.0 から、単純な型、単純な型の配列、あるいは文字列型を使ってRDDをシャッフルする時に内部的にKryoシリアライザを使います。
Spark は、AllScalaRegistrar内でカバーされる多くの一般的に使用されるコアScalaクラスのために、Twitter chill ライブラリからKryoシリアライザを自動的にインクルードします。
独自のクラスをKryoに登録するには、registerKryoClasses
メソッドを使います。
Kryo ドキュメントは、独自のシリアライズ化を追加するような、もっと進んだ登録のオプションを説明します。
オブジェクトが大きければ、spark.kryoserializer.buffer
設定を大きくする必要もあるかも知れません。この値はシリアライズ化しようと思っているもっとも大きな オブジェクトを十分に保持できる必要があります。
最終的にもし独自のクラスを登録できない場合は、Kryoはそれでも動作しますが、各オブジェクトの完全なクラス名を格納することになるdせほうが、これは無駄です。
メモリー チューニング
メモリの使用を調整するには3つの考慮すべき点があります: オブジェクトによって使用される総メモリ量(データセット全体をメモリに入れたいと思うかも知れません)。それらのオブジェクトにアクセスするためのコスト。ガベージコレクションのオーバーヘッド (オブジェクトという点で回転数が高ければ)。
デフォルトでは、Javaオブジェクトはアクセスが速いですが、それらのフィールド中の"raw"データよりも2-5x倍の容量を簡単に消費しえます。これには幾つかの理由があります:
- 各別個のJavaオブジェクトは"object header"を持ちます。これは約16バイトで、クラスへのポイントなどの情報を含みます。中にほとんどデータを持たないオブジェクトの場合(例えば、一つの
Int
フィールド)、これはデータよりも大きくなりえます。 - Javaの
String
はraw文字列データより約40バイトのオーバーヘッドを持ちます(Char
の配列の中に格納し、長さなどの余分なデータを保持するからです)。そして、各文字をString
の内部のUTF16エンコーディングの使用のために twoバイトとして格納します。したがって10文字の文字列は簡単に60バイトを消費します。 HashMap
やLinkedList
のような一般的なコレクションクラスは、リンクされたデータ構造を使用し、各エントリについて"ラッパー"オブジェクトがあります(例えば、Map.Entry
)。このオブジェクトはヘッダーを持つだけでなく、リストの次のオブジェクトへのポインター(一般的にはそれぞれ8バイト)を持ちます。- 基本的なタイプのコレクションはしばしばそれらを
java.lang.Integer
のような"boxed"オブジェクトとして格納します。
この章はSparkでのメモリ管理の概要から始め、ユーザが彼/彼女のアプリケーション内のメモリの効率的な管理をするのが習慣になるような具体的なストラテジについて議論するつもりです。特に、この章はオブジェクトのメモリの使用をどう決定するかと、それをどう改善するかの方法を説明するつもりです - データ構造を変更するか、シリアライズ化されたフォーマットでデータを格納するかのどちらかです。それから、SparkのキャッシュサイズとJavaのガベージコレクションの調整をカバーするつもりです。
メモリ管理概要
Sparkでのメモリの使用は2つに1つの分類に分類されます: 実行と保存。実行メモリはシャッフル、ジョイン、ソートおよび集約で使われるメモリを参照しますが、保存メモリは内部データをクラスタに渡ってキャッシングおよび増殖するために使われるメモリを参照します。Sparkでは、実行および保存は統一された領域(M)を共有します。実行メモリが使われない場合、保存メモリは全ての利用可能なメモリを手に入れることができ、逆もまたしかりです。必要であれば実行が保存を追い出しますが、総保存メモリの使用率がある閾値(R)未満になるまでです。別の言い方をすると、R
はM
内の副領域で、キャッシュされたブロックが追い出されない領域を説明します。保存は実装の複雑さにより、実行を追い出せないでしょう。
この設計は幾つかの望ましい性質を保証します。まず、キャッシングを使わないアプリケーションは実行のために全空間を使うことができ、不必要なディスクへの零れ落ちを未然に防ぎます。次に、キャッシングを行わないアプリケーションは、追い出しに耐性のあるデータブロックの最小の保存領域(R)を保持することができます。最後に、このやり方は、どのようにメモリを内部的に分割するかの専門知識無しに、様々な作業の納得できるパフォーマンスを追加設定無しに提供します。
2つの関連する設定がありますが、デフォルトの値はほとんどの作業に適用可能なため、一般的なユーザはそれらを調整する必要はありません。
spark.memory.fraction
は(JVM heap space - 300MB)の断片としてM
のサイズを表します (デフォルト 0.6)。残りの領域(40%)はユーザデータの構造、Sparkの内部的なメタデータ、sparseおよび不意の大きなデータの場合のOOMエラーに対する保護のために保持されます。spark.memory.storageFraction
はM
の断片としてR
のサイズを表します (デフォルトは 0.5)。R
はM
内の保存空間で、キャッシュされたブロックは実行による追い出しに耐性があります。
spark.memory.fraction
の値は、JVMの古いあるいは"終身"世代内で総ヒープ空間に気持ちよく納めるために設定されなければなりません。詳細は下記の上級のGCチューニングの議論を見てください。
メモリ消費の決定
The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI. そのページはRDDがどれだけのメモリを占有しているかを知らせるでしょう。
特定のオブジェクトのメモリの消費量を推測するには、SizeEstimator
の estimate
メソッドを使います。これは実験的に異なるデータのレイアウトを使ってメモリの使用を減らすのに役に立ち、各executor のヒープでブロードキャスト変数が占有するだろう総領域を決定するのに役立ちます。
データ構造のチューニング
メモリの消費を減らす一番最初の方法は、ポインターに基づいたデータ構造とオブジェクトのラッパーのようなオーバーヘッドを追加するJava機能を避けることです。これをするには幾つかの方法があります:
- データ構造を、標準のJavaあるいはScalaコレクションクラス(例えば、
HashMap
)の代わりに、オブジェクトの好ましい配列および基本タイプに設計します。fastutil ライブラリはJava標準ライブラリと互換性のある基本タイプのための便利なコレクションクラスを提供します。 - 可能であれば、大量の小さなオブジェクトとポインターを持つ入れ子の構造を避けます。
- キーとして文字列の代わりに数字のIDあるいはenumerationオブジェクトの使用を考慮します。
- RAMが32GB未満であれば、ポインターを8バイトの代わりに4バイトにするためにJVMフラグ
-XX:+UseCompressedOops
を設定します。これらのオプションをspark-env.sh
の中で追加することができます。
シリアライズ化されたRDDストレージ
この調整にも関わらず、オブジェクトがまだ効果的に格納するには大きすぎる場合、メモリの使用を減らすためのもっと簡単な方法は、それらをMEMORY_ONLY_SER
のようなRDD 永続化 APIの中のシリアライズ化されたStorageLevelを使って serialized 形式で保存することです。そうすればSparkは各RDDパーティションを1つの大きなバイト配列として保存するでしょう。シリアライズ化形式でデータを保存する唯一の良くない点は、その場で各オブジェクトをデシリアライズしなければならないことによる、アクセス時の遅さです。Kryoの使用はJavaのシリアライズ化よりもかなりサイズが小さく(そしてもちろんraw Javaオブジェクトよりも)なるため、もしシリアライズ化形式でデータをキャッシュしたい場合はKryoの使用を強くお勧めします。
ガベージコレクションのチューニング
プログラムによって格納されるRDDの点で大きな"churn"がある場合、JVMのガベージコレクションは問題になるかも知れません。(RDDを一度だけ読み込み、その上で多くの操作を実行するプログラムでは通常問題になりません。) Javaが新しいオブジェクトに場所を空けるために古いオブジェクトを立ち退かせる必要がある場合は、全てのJavaオブジェクトを通じて追跡し使用されていないオブジェクトを見つける必要があるでしょう。ここで覚えておくべき主要なポイントは、ガベージコレクトのコストはJavaオブジェクトの数に比例するということです。つまり、少ないオブジェクトのデータ構造の使用(例えば、LinkedList
の代わりにInt
の配列) は大いにこのコストを小さくします。An even better method is to persist objects in serialized form, as described above: now there will be only one object (a byte array) per RDD partition. もしGCが問題であれば、他の技術を試す前に最初に試すべきものは、シリアライズ化されたキャッシュを使用することです。
GC can also be a problem due to interference between your tasks’ working memory (the amount of space needed to run the task) and the RDDs cached on your nodes. これを緩和するためにRDDのキャッシュに割り当てられた領域を制御する方法について議論するつもりです。
GCの影響の測定
GCチューニングの最初のステップはガベージコレクションが起こる頻度とGCに費やされる総時間についての統計を集めることです。これはJavaのオプションに-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
を追加することで行うことができます。(JavaオプションをSparkジョブへ渡す方法についての情報は設定ガイド を見てください。) 次にSparkジョブが実行された時に、ガベージコレクションが起きる度にワーカーログにメッセージが表示されるのを見るでしょう。これらのログはドライバープログラムではなく、クラスターのワーカーノードにあることに注意してください(ワーカーディレクトリ内のstdout
ファイル内)。
上級のGCチューニング
更にガベージコレクションを調整するには、まずJVMのメモリ管理についていくつかの基本的な情報を理解する必要があります:
-
Javaのヒープ領域は2つの領域、YoungおよびOldに分割されます。Young世代は短期間のオブジェクトを保持するためですが、Old世代は長い生存期間を持つオブジェクトのためのものです。
-
Young世代は更に3つの領域 [Eden, Survivor1, Survivor2] に分割されます。
-
ガベージコレクションの手順の簡易化した説明: Edenが一杯になると、Eden上でminor GCが実行され、EdenおよびSurvivor1から活動しているオブジェクトはSurvivor2にコピーされます。Survivor領域はスワップされます。オブジェクトが十分古いかSurvivor2が一杯になると、それはOldに移動されます。最後に、Oldが一杯に近くなると、full GCが起動されます。
SparkでのGCチューニングの目的は、長く存在しているRDDがOld世代に格納され、Young世代が生存が短いオブジェクトを格納するのに十分なサイズになることを保証することです。これはタスクの実行中に生成された一時的なオブジェクトを集めるためのfull GCを避けるのに役立つでしょう。有用かも知れない幾つかのステップ:
-
GC状態を集めることであまりに多くのガベージコレクションがあるかどうかを調べる。タスクが完了する前に full GCが複数回起動された場合、それはタスクを実行するのに利用可能な十分なメモリがないことを意味します。
-
minorコレクションが多いがmajor GCが少ない場合、もっとメモリをEdenに割り当てることが助けになります。どれだけのメモリを各タスクが必要とするだろうかの過大評価に、Edenのサイズを設定することができます。Edenのサイズが
E
に決定された場合、オプション-Xmn=4/3*E
を使ってYoung世代のサイズを設定することができます。(The scaling up by 4/3 is to account for space used by survivor regions as well.) -
In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching by lowering
spark.memory.fraction
; it is better to cache fewer objects than to slow down task execution. 別のやり方として、若い世代のサイズを減らすことを考慮します。このことは、上のように設定した場合には-Xmn
を低くすることを意味します。そうでなければ、JVMのNewRatio
パラメータの値を変更してみてください。多くのJVMではこのデフォルトは2で、古い世代がヒープの2/3を占めることを意味します。この分数がspark.memory.fraction
を超えるように十分大きくなければなりません。 -
-XX:+UseG1GC
を使ってG1GCガベージコレクションを試してください。ガベージコレクションがボトルネックの幾つかの状況でパフォーマンスを改善することができます。大きなexecutorヒープサイズを使う場合に-XX:G1HeapRegionSize
を使ってG1 region size を増やすことが重要かも知れないことに注意してください。 -
例として、タスクがHDFSからデータを読み込んでいる場合、タスクによって使用される総メモリ量はHDFSから読むデータブロックのサイズを使って予測することができます。解凍されたブロックのサイズはしばしばブロックの2または3倍のサイズであることに注意してください。So if we wish to have 3 or 4 tasks’ worth of working space, and the HDFS block size is 128 MB, we can estimate size of Eden to be
4*3*128MB
. -
Monitor how the frequency and time taken by garbage collection changes with the new settings.
経験では、GCチューニングの効果はアプリケーションと利用可能なメモリ量に依存することが示唆されます。オンラインで説明される 多くのチューニングオプションがありますが、高レベルでは、full GCがどれだけの頻度で発生するかの管理がオーバーヘッドを減らすのに役立ちます。
GC tuning flags for executors can be specified by setting spark.executor.extraJavaOptions
in
a job’s configuration.
他に考慮すべきこと
並行度のレベル
Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile
, etc), and for distributed “reduce” operations, such as groupByKey
and reduceByKey
, it uses the largest parent RDD’s number of partitions. 2つ目の引数として並行レベルを渡すことができます(spark.PairRDDFunctions
ドキュメントを見てください)。あるいはデフォルトを変更するために設定プロパティspark.default.parallelism
を設定してください。一般的に、クラスタ内のCPUコアあたり2-3のタスクがお勧めです。
reduceタスクのメモリの使用
時には、RDDがメモリ内に収まらないからではなく、groupByKey
内のreduceタスクのうちの一つのような、タスクの一つの作業セットがあまりにも大きいために、OutOfMemoryErrorを受け取るかも知れません。Sparkのシャッフル操作(sortByKey
, groupByKey
, reduceByKey
, join
など)は各タスク内にグルーピングを実行するためにハッシュテーブルを作成します。これはしばしば大きくなりえます。これを修正するもっとも簡単な方法は、各タスクの入力セットが小さくなるように並行レベルの増加をすることです。Spark can efficiently support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has a low task launching cost, so you can safely increase the level of parallelism to more than the number of cores in your clusters.
大きな変数のブロードキャスト
SparkContext
で利用可能な ブロードキャスト機能 を使うことは、各シリアライズ化タスクのサイズおよびクラスタを超えるジョブの起動のコストを大きく減らします。If your tasks use any large object from the driver program inside of them (e.g. a static lookup table), consider turning it into a broadcast variable. Spark prints the serialized size of each task on the master, so you can look at that to decide whether your tasks are too large; in general tasks larger than about 20 KB are probably worth optimizing.
データの局所性
データの局所性はSparkジョブのパフォーマンスに大きな影響を持ちえます。もしデータとその上で実行するコードが一緒にあると、計算がより速くなる傾向があります。しかし、もしコードとデータが分かれていると、片方がもう片方に移動しなければなりません。一般的に、シリアライズ化されたコードを場所から場所へ転送することは、コードサイズがデータよりもかなり小さいために、データをぶつ切りするよりも速いです。Sparkはこのデータの局所性の一般的な原則に基づいてスケジューリングを構築します。
データの局所性はデータがどれだけ処理をするコードに近いかです。データの現在の場所に基づいて、局所性の幾つかのレベルがあります。近いものから遠いものの順に:
PROCESS_LOCAL
データは実行中のコードと同じJVMにある。This is the best locality possibleNODE_LOCAL
データが同じノードにある。例としては同じノードにあるHDFS、あるいは同じノード上の他のexecutorかも知れません。データはプロセス間を行き来する必要があるため、これはPROCESS_LOCAL
よりも少し遅いです。NO_PREF
data is accessed equally quickly from anywhere and has no locality preferenceRACK_LOCAL
データがサーバの同じラックにある。データは同じラックの異なるサーバにあり、ネットワーク、一般的にはシングルスイッチを経由して送信される必要があります。ANY
データはネットワーク上のどこか他にあり、同じラックにありません。
Sparkは全てのタスクを一番良い局所性レベルにスケジュールする方が良いですが、これは常に可能なわけではありません。処理されていないデータが仕事をしていないexecutor上にある場合、Sparkは局所性レベルを下げます。2つの選択肢があります: a) 忙しいCPUが同じサーバ上のデータ上のタスクを開始するために自由になるまで待つ。あるいは b) データがそこに移動される必要がある遠く離れた場所で新しいタスクをすぐに開始する。
一般的にSparkは忙しいCPUが自由になることを期待して少しだけ待ちます。タイムアウトが期限切れになると、遠くの場所から自由なCPUにデータの移動を開始します。The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality
parameters on the configuration page for details. タスクが長く局所性が乏しい場合はこれらの設定を増加する必要がありますが、通常はデフォルトで良く動作します。
概要
Sparkアプリケーションをチューニングする場合に知るべき主な考慮点、最も重要で、データのシリアライズ化および目厭離のチューニング、を示した短いガイドです 。ほとんどのプログラムについて、Kryoシリアライズ化とシリアライズ化形式のデータの永続化は、もっとも一般的なパフォーマンス問題を解決するでしょう。他のチューニングのベストプラクティスについては、気軽にSpark メーリングリスト で聞いてください。