パフォーマンス チューニング

ちょっとした次善策として、メモリにデータをキャッシュ、あるいは幾つかの実験的なオプションを調整することでパフォーマンスを改善することができます。

メモリへのデータのキャッシュ

Spark SQL は spark.catalog.cacheTable("tableName") あるいはdataFrame.cache()を呼ぶことでインメモリのコラム形式のフォーマットを使ってテーブルをキャッシュすることができます。そして、Spark SQLは必要なカラムだけをスキャンし、メモリの使用量とGCの圧力を最小化するために圧縮を自動的に調整するでしょう。メモリからテーブルを削除するためにspark.catalog.uncacheTable("tableName")またはdataFrame.unpersist()を呼ぶことができます。

メモリ内キャッシングの設定はSparkSessionsetConfメソッドあるいは SQLを使ってSET key=valueコマンドを実行することで行うことができます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.inMemoryColumnarStorage.compressed true trueに設定した場合はSpark SQLはデータの統計に基づいて各カラムの圧縮コーディックを自動的に選択するでしょう。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 カラムキャッシュのためのバッチのサイズを制御します。バッチのサイズを大きくするとメモリの利用率と圧縮が改善できますが、データをキャッシュする時にOOMのリスクがあります。 1.1.1

他の設定オプション

以下のオプションもクエリ実行のパフォーマンスを調整するために使用することができます。これらのオプションはもっと多くの最適化が自動的に行われるため、将来のリリースでは非推奨になるかもしれません。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.files.maxPartitionBytes 134217728 (128 MB) ファイルを読む時に1つのパーティションに詰め込む最大のバイト数。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) ファイルを開くための予測コストは同じ時間で操作することができるバイト数によって計測することができます。これは複数のファイルを1つのパーティションに配置する場合に使われます。過剰に予測するほうが良いです。そうれうば、小さなファイルを持つパーティションは大きなファイルを持つパーティションよりも高速になるでしょう(最初にスケジュールされます)。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 2.0.0
spark.sql.files.minPartitionNum デフォルトの並行度 推奨される(保証されていない)分割ファイルパーティションの最小数。設定されていない場合、デフォルト値は`spark.default.parallelism`です。この構成は Parquet、JSON、ORC などのファイルベースのソースを使う場合にのみ有効です。 3.1.0
spark.sql.broadcastTimeout 300

ブロードキャストjoinでのブロードキャスト待ち時間のタイムアウト秒数

1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan が実行された場合、現在のところ統計はHiveメタストアテーブルのみがサポートされることに注意してください。 1.1.0
spark.sql.shuffle.partitions 200 joinあるいは集約のためにデータをシャッフルする時に使用するパーティションの数を設定します。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 ジョブ入力パスの並列リストを有効にするための閾値を設定します。入力パスの数がこの閾値よりも大きい場合、SparkはSpark分散ジョブを使ってファイルを一覧表示します。それ以外の場合は、順次リストにフォールバックします。この設定は、Parquet, ORC, JSONのようなファイルベースのデータソースを使う場合にのみ有効です。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 ジョブ入力パスの最大リスト並行処理を設定します。入力パスの数がこの値よりも多い場合、この値を使うように絞られます。上記と同じように、この設定は、Parquet, ORC, JSONのようなファイルベースのデータソースを使う場合にのみ有効です。 2.1.1

SQL クエリのための Join ストラテジ ヒント

joinの戦略のヒント、つまりBROADCAST, MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL は、別のリレーションと結合する時に指定されたリレーションでヒント付きの戦略を使うようにSparkに指示します。For example, when the BROADCAST hint is used on table ‘t1’, broadcast join (either broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key) with ‘t1’ as the build side will be prioritized by Spark even if the size of table ‘t1’ suggested by the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold.

joinの両側で異なるjoin戦略ヒントが指定された場合、Sparkは以下の順で優先します。BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL。両側がBROADCASTヒントあるいはSHUFFLE_HASHヒントで指定されている場合、Sparkはjoin型とリレーションのサイズに基づいてビルド側を選択します。

特定の戦略が全てのjoin型をサポートするわけではないため、Sparkがヒントで指定されたjoin戦略を選択する保証はないことに注意してください。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

詳細については、Join ヒントのドキュメントを参照してください。

SQL クエリのための Coalesce ヒント

Coalesceヒントにより、Spark SQL ユーザはDataset APIのcoalesce, repartition, repartitionByRangeと同じように、出力ファイルの数を制御できます。パフォーマンスの調整と出力ファイルの削減に使われます。“COALESCE”ヒントはパラメータとしてパーティション番号のみを取ります。“REPARTITION”ヒントには、パラメータとして、パーティション番号、列、またはそれらの両方/どちらかを取ります。“REPARTITION_BY_RANGE” ヒントは列名が必須で、パーティション番号はオプションです。

SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
SELECT /*+ REBALANCE */ * FROM t
SELECT /*+ REBALANCE(c) */ * FROM t

詳細については、パーティショニング ヒントのドキュメントを参照してください。

適応クエリ実行

適応クエリ実行(AQE)は、ランタイム統計を利用して最も効率的なクエリ実行プランを選択するSpark SQLの最適化手法で、Apache Spark 3.2.0からデフォルトで有効になっています。Spark SQLはアンブレラ構成としてspark.sql.adaptive.enabledによって、AOEのオンとオフを切り替えることができます。Spark 3.0の時点で、AOEには3つの主要な機能があります: シャッフル後のパーティションの結合、ソートマージjoinからブロードキャストjoinへの返還、スキューjoinの最適化です。

シャッフル後のパーティションの結合

この機能は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled設定の両方がtrueの時に、マップ出力統計に基づいてシャッフル後のパーティションを結合します。この機能により、クエリを実行する時のシャッフルパーティション番号の調整が簡単になります。データセットに合わせて適切なシャッフルパーティション番号を設定する必要はありません。spark.sql.adaptive.coalescePartitions.initialPartitionNum設定でシャッフルパーティションの初期数を十分大きく設定すると、Sparkは実行時に適切なシャッフルパーティション番号を選択できます。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.coalescePartitions.enabled true これが true で、spark.sql.adaptive.enabledが true の場合、Spark はターゲットサイズ(spark.sql.adaptive.advisoryPartitionSizeInBytesで指定される)に従って連続するシャッフルパーティションを合体させ、小さなタスクが多すぎないようにします。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true これがtrueの場合、Sparkは連続するシャッフルパーティションを結合する時に、spark.sql.adaptive.advisoryPartitionSizeInBytes (デフォルト 64MB) で指定されたターゲットサイズを無視し、並列化を最大化するためにspark.sql.adaptive.coalescePartitions.minPartitionSize (デフォルト 1MB)で指定された最小パーティションサイズのみを尊重します。これは、適応クエリ実行を有効にするときに、パフォーマンスの低下を回避するためです。この設定をfalseに設定し、spark.sql.adaptive.advisoryPartitionSizeInBytesで指定されたターゲットサイズを尊重することをお勧めします。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 合体後のシャッフルパーティションの最小サイズ。その値は最大でspark.sql.adaptive.advisoryPartitionSizeInBytesの20%にすることができます。これは、パーティションの結合中にターゲットサイズが無視される場合に役立ちます。これはデフォルトの場合です。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (none) 合体する前のシャッフルパーティションの初期数。設定されない場合は、spark.sql.shuffle.partitionsと同じです。この設定は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled の両方が true の場合にのみ効果があります。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 適応最適化中のシャッフルパーティションの推奨サイズのバイト数 (spark.sql.adaptive.enabledが true の場合)。Sparkが小さなシャッフルパーティションを結合するか、スキューされたパーティションを分割する時に有効になります。 3.0.0

sort-merge join から broadcast join への変換

AQEは、join側の実行時統計が適応型ブロードキャストハッシュjoinの閾値よりも小さい場合に、ソートマージjoinをブロードキャストハッシュjoinに変換します。これは最初にブロードキャストハッシュjoinを計画するほどには効率的ではありませんが、両方のjoin側のソートを保存し、シャッフルファイルをローカルで読み込んでネットワークトラフィックを節約できるため、ソートマージjoinを実行し続けるよりも優れています(spark.sql.adaptive.localShuffleReader.enabled がtrueの場合)

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.autoBroadcastJoinThreshold (none) joinを実行する時に全てのワーカーノードにブロードキャストされるテーブルのための最大サイズをバイトで設定します。この値を-1に設定することでブロードキャストは無効にされます。デフォルトの値は spark.sql.autoBroadcastJoinThreshold と同じです。これは適応フレームワークでのみ使われることに注意してください。 3.2.0

ソートマージjoinからシャッフルハッシュjoinへの変換

AQE converts sort-merge join to shuffled hash join when all post shuffle partitions are smaller than a threshold, the max threshold can see the config spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold.

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 ローカルハッシュマップをビルドできるパーティションあたりの最大サイズをバイト単位で設定します。この値がspark.sql.adaptive.advisoryPartitionSizeInBytesより小さくなく、全てのパーティションサイズがこの設定よりも大きくない場合、joinの選択では spark.sql.join.preferSortMergeJoinの値に関係なく、ソートマージjoinではなくシャッフルハッシュjoinを使います。 3.2.0

スキューJoinの最適化

データスキューはjoinクエリのパフォーマンスを大幅に低下させる可能性があります。この機能は、スキューされたタスクをほぼ均等なサイズのタスクに分割(および必要に応じて複製)することで、ソートマージjoinのスキューを動的に処理します。spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled設定の両方が有効になっている場合に効果があります。

プロパティ名デフォルト意味これ以降のバージョンから
spark.sql.adaptive.skewJoin.enabled true これがtrueでspark.sql.adaptive.enabledがtrueの場合、Sparkはスキューされたパーティションを分割(および必要に応じて複製)することで、ソートマージjoinでスキューを動的に処理します。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 パーティションのサイズがこの係数にパーティションサイズの中央値を掛けたものより大きく、またspark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytesよりも大きい場合、パーティションはスキューされていると見なされます。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB バイト単位のサイズがこの閾値よりも大きく、またspark.sql.adaptive.skewJoin.skewedPartitionFactorにパーティションサイズの中央値を掛けたものよりも大きい場合、パーティションはスキューされていると見なされます。理想的には、この設定はspark.sql.adaptive.advisoryPartitionSizeInBytesより大きく設定する必要があります。 3.0.0
TOP
inserted by FC2 system