RDD プログラミング ガイド
- 概要
- Sparkとのリンク
- Sparkの初期化
- Resilient Distributed Datasets (RDDs)
- 共有変数
- クラスタの配備
- Java / Scala からのSparkジョブの起動
- ユニットテスト
- この後どうすればいいか
概要
高レベルでは、各Sparkアプリケーションはクラスタ上でユーザのmain
関数を実行し様々な並行操作を実行するドライバープログラムからなります。Sparkが提供する主な抽象概念はresilient distributed dataset (RDD)で、クラスタのノードを横断して分割された並行して操作されることができるエレメントのコレクションです。RDDはHadoopファイルシステム(あるいは他のHadoopをサポートするファイルシステム)内のファイル、あるいはドライバープログラム内の既存のScalaコレクションを使って開始し変換することで生成されます。並行操作間で効果的に再利用できるように、ユーザはSparkにRDDをメモリ内に永続するように依頼するかも知れません。最終的に、RDDはノード障害から自動的に復活します。
Sparkの二つ目の抽象概念は、並行操作で使用することができる共有変数です。デフォルトでは、Sparkが異なるノード上でタスクのセットとして並行して関数を実行する場合、Sparkは関数内で使われる各変数のコピーを各タスクにコピーします。タスクを横断、あるいはタスクおよびドライバプログラム間で、変数が共有される必要があることがあります。Sparkは二つのタイプの共有変数をサポートします: ブロードキャスト変数、これは全てのノード上のメモリ内で値をキャッシュするために使うことができます。 アキュムレイター、これはカウンターや集計のような"追加"のみされる変数です。
このガイドはSparkのサポートする言語でこれらの各機能を案内します。Sparkの対話シェル - Scalaシェルの bin/spark-shell
、Pythonシェルの bin/pyspark
どちらか - を起動して一緒に進めていくのが一番簡単です。
Sparkとのリンク
Spark 3.2.1 はデフォルトでScala 2.12で動作するようにビルドおよび配布されています。(Spark は他のバージョンのScalaでも動作するようにビルドされています) Scalaでアプリケーションを書くには、互換性のあるScalaバージョンを使う必要があるでしょう(例えば、2.12.X)。
Sparkアプリケーションを書くには、MavenのSparkの依存を追加する必要があるでしょう。Sparkは以下のようにMaven Centralを使って利用可能です:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.2.1
更に、HDFSクラスタにアクセスしたい場合は、HDFSのバージョンのためにhadoop-client
に依存性を追加する必要があります。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加してください:
(Spark 1.3.0 より前では、必須の暗黙的な変換を有効にするために、明示的にimport org.apache.spark.SparkContext._
する必要があります)。
Spark 3.2.1 は簡潔な書き込み機能のために lambda 表現 をサポートします。それ以外に、org.apache.spark.api.java.function パッケージの中にあるクラスを使うことができます。
Spark 2.2.0 の時点でJava 7のサポートが削除されたことに注意してください。
SparkアプリケーションをJavaで書くためにSparkに依存を追加する必要があります。Sparkは以下のようにMaven Centralを使って利用可能です:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.2.1
更に、HDFSクラスタにアクセスしたい場合は、HDFSのバージョンのためにhadoop-client
に依存性を追加する必要があります。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加してください:
Spark 3.2.1 は Python 3.6+ で動作します。それは標準のCPythonインタプリタを使うため、NumPyのようなC ライブラリが使われるかもしれません。それは PyPy 2.3+ でも動作します。
Python 2、3.4および3.5のサポートはSpark 3.1.0で削除されました。Python 3.6のサポートはSpark 3.2.1 で非推奨です。
PythonでのSparkアプリケーションは、実行時にSparkを含むbin/spark-submit
スクリプトを使って実行するか、それを以下のように setup.py スクリプトに含むことのどちらかで、実行することができます:
pipでインストールするPySpark無しにPythonでSparkアプリケーションを実行するには、Sparkディレクトリにある bin/spark-submit
スクリプトを使用してください。このスクリプトはSparkのJava/Scalaライブラリをロードし、クラスタにアプリケーションをサブミットすることができます。対話的なPythonシェルを起動するためにbin/pyspark
を使うこともできます。
HDFSデータにアクセスしたい場合には、そのバージョンのHDFSにリンクしているPySparkのビルドを使う必要があります。一般的なHDFSバージョンのためのPrebuilt パッケージ がSparkホームページでも利用可能です。
最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加します:
PySpark はドライバーおよびワーカーの両方で、Pythonの同じマイナーバージョンを必要とします。PATHにあるデフォルトのpythonバージョンを使います。Pythonのどちらのバージョンを使いたいかをPYSPARK_PYTHON
を使って指定することができます。例えば:
Sparkの初期化
Sparkプログラムが最初にしなければならないことは、SparkContextオブジェクトを作成する事です。これはSparkにクラスタにアクセスする方法を教えます。SparkContext
を作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。
JVMごとに一つのSparkContextだけがアクティブになることができます。新しいSparkContextを生成する前にアクティブなSparkContextをstop()
しなければなりません。
Sparkプログラムが最初にしなければならないことは、JavaSparkContext オブジェクトを作成することです。これはSparkにクラスタにアクセスする方法を教えます。SparkContext
を作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。
Sparkプログラムが最初にしなければならないことは、SparkContextオブジェクトを作成する事です。これはSparkにクラスタにアクセスする方法を教えます。SparkContext
を作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。
appName
パラメータはクラスタUI上で見るアプリケーションの名前です。master
は Spark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local"文字列です。実際には、クラスタ上で実行する場合にプログラム内で master
をハードコードせずに spark-submit
を使ってアプリケーションを起動し、そこで受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkを実行するために "local" を渡すことができます。
シェルの使用
Sparkシェルの中では、特別なインタプリターを認識しているSparkContext、sc
と呼ばれる変数が既に作成されています。独自のSparkContextを作っても動作しないでしょう。--master
引数を使ってどっちのマスターをコンテキストが接続するかを指定することができます。そして--jars
引数にカンマ区切りのリストで渡すことでクラスパスにJARを追加することができます。--packages
引数にカンマ区切りのMaven coordinateのリストを渡すことで、シェルセッションに依存性(例えばSparkパッケージ)を追加することもできます。依存性が存在するかも知れないどのような追加のリポジトリ(例えば、Sonatype)も--repositories
引数に渡すことができます。例えば、確実に4つのコア上でbin/spark-shell
を実行するには、以下を使います:
あるいは、クラスパスにcode.jar
も追加するには、以下を使います:
Maven coordinatesを使って依存性を追加するには:
オプションの完全なリストについては、spark-shell --help
を実行します。舞台裏で spark-shell
はもっと多くの一般的な spark-submit
スクリプトを呼び出します。
PySparkシェルの中では、特別なインタプリターを認識しているSparkContext、sc
と呼ばれる変数が既に作成されています。独自のSparkContextを作っても動作しないでしょう。--master
引数を使ってどっちのマスターコンテキストをコンテキストが接続するかを指定することができます。 そして--py-files
引数にカンマ区切りのリストを渡すことでランタイムにPythonの.zip, .egg あるいは .py ファイルを追加することができます。サードパーティのPythonの依存については、Python Package Managementを見てください。--packages
引数にカンマ区切りのMaven coordinateのリストを渡すことで、シェルセッションに依存性(例えばSparkパッケージ)を追加することもできます。依存性が存在するかも知れないどのような追加のリポジトリ(例えば、Sonatype)も--repositories
引数に渡すことができます。例えば、確実に4つのコア上でbin/pyspark
を実行するには、以下を使います:
あるいは、code.py
も検索パス(後でimport code
できるように)に追加するには、以下を使います:
オプションの完全なリストについては、pyspark --help
を実行します。舞台裏で pyspark
はもっと多くの一般的な spark-submit
スクリプトを呼び出します。
IPython内でPySparkシェル、拡張Pythonインタプリタ、を起動することもできます。PySparkはIPython 1.0.0 以降で動作します。IPythonを使用するには、bin/pyspark
を実行する時にPYSPARK_DRIVER_PYTHON
変数 ipython
に設定します:
Jupyter notebookを使うには (以前はthe IPython notebookとして知られていました)。
ipython
あるいは jupyter
コマンドを PYSPARK_DRIVER_PYTHON_OPTS
を設定することでカスタマイズすることができます。
Jupyter Notebookサーバが起動した後で、"Files"タブから新しいノートブックを作成することができます。notebookの中で、Jupyter notebookからSparkを開始使用とする前にnotebookの一部として %pylab inline
コマンドを入力することができます。
Resilient Distributed Datasets (RDDs)
Sparkはresilient distributed dataset (RDD)の概念を中心題目とします。これは並行して操作することが可能な要素の耐障害性のあるコレクションです。RDDを生成するには2つの方法があります: ドライバプログラム内の既存のコレクションを並列化。あるいは共有ファイルシステム、HDFS、HBaseあるいはHadoopの入力フォーマットを提供するデータソースのような外部ストレージシステム内のデータセットの参照。
並列化されたコレクション
並列化されたコレクションはドライバプログラム内(ScalaのSeq
)の既存のコレクション上でSparkContext
の parallelize
メソッドを呼ぶことで生成することができます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:
一度生成すると、分散データセット(distData
) は並列して操作されることが可能です。例えば、配列の要素を追加するためにdistData.reduce((a, b) => a + b)
を呼ぶかも知れません。後で分散されたデータセット上の操作を説明します。
並列化されたコレクションはドライバプログラム内の既存のコレクション
上でJavaSparkContext
の parallelize
メソッドを呼ぶことで生成されます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:
一度生成すると、分散データセット(distData
) は並列して操作されることが可能です。例えば、リストの要素を追加するためにdistData.reduce((a, b) => a + b)
を呼ぶかも知れません。後で分散されたデータセット上の操作を説明します。
並列化されたコレクションはドライバプログラム内の既存の繰り返しまたはコレクション上でSparkContext
の parallelize
メソッドを呼ぶことで生成されます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:
一度生成すると、分散データセット(distData
) は並列して操作されることが可能です。例えば、リストの要素を追加するために、distData.reduce(lambda a, b: a + b)
を呼ぶことができます。後で分散されたデータセット上の操作を説明します。
並列化コレクションの1つの重要なパラメータがデータセットを分割するパーティション の数です。Sparkはクラスタの各パーティションで1つのタスクを実行するでしょう。一般的はクラスタ内の各CPUについて2-4のパーティションが欲しいです。通常は、Sparkはクラスタに応じて自動的にパーティションの数を設定しようとします。しかし、parallelize
の2つ目のパラメータとして渡すことで手動で設定することもできます(例えばsc.parallelize(data, 10)
)。注意: コード内の幾つかの箇所でsliceという単語(パーティションの類義語)を後方互換性のために使います。
外部データベース
SparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。
テキストファイルのRDDはSparkContext
のtextFile
メソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://
, s3a://
, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:
一度生成すると、distFile
はデータセット操作で動作することができます。例えば、map
および reduce
操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(s => s.length).reduce((a, b) => a + b)
.
Sparkを使ったファイルの読み込みには幾つか注意点があります:
-
ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。
-
textFile
を含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory")
,textFile("/my/directory/*.txt")
およびtextFile("/my/directory/*.gz")
を使うことができます。複数のファイルを読み込む場合、パーティションの順番はファイルシステムから返されるファイルの順番に依存します。例えば、パスによるファイルの辞書式順番に従う場合とそうでない場合があります。パーティション内で、要素は元になるファイルの順番に従って順序付けられます。 -
textFile
メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは128MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。
テキストファイル以外に、SparkのScala APIは幾つかの他のデータフォーマットもサポートします:
-
SparkContext.wholeTextFiles
は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFile
と異なり、各ファイルごとに行あたり1つのレコードを返します。パーティショニングは場合によってはほんの少しのパーティションになるデータのローカリティによって決定されます。そのような場合については、wholeTextFiles
がパーティションの最小数を制御するための任意の二つ目の引数を提供します。 -
SequenceFilesについては、SparkContextの
sequenceFile[K, V]
メソッドを使用します。この時、K
およびV
はファイル内のキーの種類と値です。これらはIntWritable および TextのようにHadoopのWritableインタフェースのサブクラスでなければなりません。更に、2,3の共通のWritableのためにSparkによってネイティブなタイプを指定することができます。例えば、sequenceFile[Int, String]
は自動的にIntWritableおよびTextを読み込むでしょう。 -
他のHadoop 入力フォームに関しては、
SparkContext.hadoopRDD
メソッドを使うことができます。これは任意のJobConf
および、入力フォーマットクラス、キークラスと値クラスを取ります。同じようにしてHadoopジョブのためにこれらに入力ソースを設定するかもしれません。"新しい" MapReduce API (org.apache.hadoop.mapreduce
)に基づいたInputFormatのためのSparkContext.newAPIHadoopRDD
も使うことができます。 -
RDD.saveAsObjectFile
およびSparkContext.objectFile
はシリアライズ化されたJavaオブジェクトからなる単純な形式へのRDDの保存をサポートします。これはAvroのように特殊化されたフォーマットのように効率的ではありませんが、どのようなRDDを保存する簡単な方法を提供します。
SparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。
テキストファイルのRDDはSparkContext
のtextFile
メソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://
, s3a://
, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:
一度生成すると、distFile
はデータセット操作で動作することができます。例えば、map
および reduce
操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(s -> s.length()).reduce((a, b) => a + b)
.
Sparkを使ったファイルの読み込みには幾つか注意点があります:
-
ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。
-
textFile
を含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory")
,textFile("/my/directory/*.txt")
およびtextFile("/my/directory/*.gz")
を使うことができます。 -
textFile
メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは128MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。
テキストファイル以外に、SparkのJava APIは幾つか他のデータフォーマットもサポートします:
-
JavaSparkContext.wholeTextFiles
は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFile
と異なり、各ファイルごとに行あたり1つのレコードを返します。 -
SequenceFilesについては、SparkContextの
sequenceFile[K, V]
メソッドを使用します。この時、K
およびV
はファイル内のキーの種類と値です。これらはIntWritable および TextのようにHadoopのWritableインタフェースのサブクラスでなければなりません。 -
他のHadoop 入力フォームに関しては、
JavaSparkContext.hadoopRDD
メソッドを使うことができます。これは任意のJobConf
および、入力フォーマットクラス、キークラスと値クラスを取ります。同じようにしてHadoopジョブのためにこれらに入力ソースを設定するかもしれません。"新しい" MapReduce API (org.apache.hadoop.mapreduce
)に基づいたInputFormatのためのJavaSparkContext.newAPIHadoopRDD
も使うことができます。 -
JavaRDD.saveAsObjectFile
およびJavaSparkContext.objectFile
はシリアライズ化されたJavaオブジェクトからなる単純な形式へのRDDの保存をサポートします。これはAvroのように特殊化されたフォーマットのように効率的ではありませんが、どのようなRDDを保存する簡単な方法を提供します。
PySparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。
テキストファイルのRDDはSparkContext
のtextFile
メソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://
, s3a://
, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:
一度生成すると、distFile
はデータセット操作で動作することができます。例えば、map
および reduce
操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
.
Sparkを使ったファイルの読み込みには幾つか注意点があります:
-
ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。
-
textFile
を含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory")
,textFile("/my/directory/*.txt")
およびtextFile("/my/directory/*.gz")
を使うことができます。 -
textFile
メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは128MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。
テキストファイル以外に、SparkのPython APIは幾つか他のデータフォーマットもサポートします:
-
SparkContext.wholeTextFiles
は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFile
と異なり、各ファイルごとに行あたり1つのレコードを返します。 -
RDD.saveAsObjectFile
およびSparkContext.objectFile
はpickle化されたPythonオブジェクトからなる単純な形式へのRDDの保存をサポートします。バッチはデフォルトのバッチサイズ 10を使って、pickleシリアライズ化に使われます。 -
SequenceFile と Hadoop Input/Output フォーマット
注意 この機能は現在のところ 実験的なもの
とマークされていて、上級ユーザのためのものであることに注意してください。将来Spark SQLに基づいた read/writeサポートに置き換えられるかも知れませんが、いずれにしてもSpark SQLは好ましい方法です。
Writableサポート
PySpark SequenceFile はJava内のキー-値のRDDのロードをサポートし、Writableを基本のJavaのタイプに変換し、結果のJavaオブジェクトを Pyroliteを使ってピクル化します。キー-値のペアのRDDをSequenceFileに保存する場合、PySparkは逆のことをします。それはPythonオブジェクトをJavaオブジェクトにアンピクル化し、それらをWritableに変換します。以下のWritableは自動的に変換されます:
Writable タイプ | Python タイプ |
---|---|
テキスト | str |
IntWritable | dint |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
配列は追加設定無しで処理することができません。読み書きする場合には、ユーザは独自のArrayWritable
サブタイプを指定する必要があります。書き込む場合には、ユーザは配列を独自の ArrayWritable
サブタイプに変換する独自のコンバータも指定する必要があります。読み込む場合には、デフォルトのコンバータが独自の ArrayWritable
サブタイプを Java のObject[]
に変換するでしょう。これはPythonのタプルにピクル化されます。基本的なタイプの配列のためのPython array.array
を取得するには、ユーザは独自のコンバータを指定する必要があります。
SequenceFileの保存とロード
テキストファイルと似て、SequenceFiles はパスを指定することによって保存およびロードすることができます。キーと値のクラスは指定することができますが、標準のWritableの場合これは不要です。
他のHadoop Input/Output フォーマットの保存およびロード
PySparkは、'new'および'old' Hadoop MapReduce APIについて、任意のHadoop InputFormatの読み込み、あるいは任意のHadoopのOutputFormatの書き込みができます。必要であれば、Hadoop設定をPython dictとして渡すことができます。これはElasticsearch ESInputFormatを使った例です:
もしInputFormatが単純にHadoop設定 および/あるいは入力パスに依存する場合は、キーと値のクラスは上のテーブルに応じて簡単に変換でき、このやり方はそのような場合にうまく動作するでしょう。
(Cassandra/HBaseからロードしたデータなど)独自のシリアライズ化バイナリデータがある場合は、まずそのデータをPyroliteのピックラーが処理できるものにScala/Java側で変換しなければならないでしょう。このために Converter trait が提供されます。単にこのtraitを拡張し、変換コードをconvert
メソッドの中に実装します。InputFormat
にアクセスを必要とする全ての依存物と一緒にこのクラスはSparkジョブjarにパッケージされ、PySparkクラスパッスに含まれるようにすることを忘れないでください。
Cassandra / HBase の独自のコンバータを使ったInputFormat
および OutputFormat
の例は、Python examples および Converter examples を見てください。
RDD 操作
RDDは2つの種類のオペレータをサポートします: 変換、これは既存のものから新しいデータセットを生成します。actions、これはデータセット上で計算を実行した後でドライバープログラムに値を返します。例えば、map
は関数を通じて各データ要素を渡し、 結果を表す新しいRDDを返します。一方で、reduce
はなんらかの関数を使ってRDDの全ての要素を集約するアクションで、最後の結果はドライバープログラムに返ります(しかし、分散したデータセットを返す 並行reduceByKey
もあります)
Sparkでの全ての変換は、すぐには結果を計算しないという点で lazyです。代わりにそれらは幾つかのベースのデータセット(例えば、ファイル)に適用された変換を単に記憶します。アクションがドライバープログラムに返されるべき結果を必要とした時に、変換は単に計算されます。この設計によりSparkはもっと効率的に動作することができます。例えば、大きくマップされたデータセットではなく、map
を使って生成されたデータセットがreduce
の中で使われ、reduce
の結果だけがドライバーに返ることを理解できます。
デフォルトでは、それぞれの変換されたRDDはその上でアクションを実行するたびに再計算されます。しかし、persist
(あるいはcache
メソッドを使ってメモリ内のRDDを永続化するかも知れません。そのような場合、Sparkは次のクエリの時にもっと早くアクセスするために、クラスタ上の要素を維持するでしょう。ディスク上、あるいは複数のノードをまたがったリプリケートのためのサポートもあります。
基本
RDDの基本を説明するために、以下の単純なプログラムを考えます:
最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines
は単にファイルへのポインターです。二つ目の行はmap
変換の結果としての lineLengths
を定義します。ここでも、lazinessのために lineLengths
はすぐには計算されません。最後に、reduce
を実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。
後で再びlineLengths
を使用したい場合は、以下を追加するかも知れません:
reduce
の前に、最初にそれが計算された後でメモリ内に lineLengths
が保存されるでしょう。
RDDの基本を説明するために、以下の単純なプログラムを考えます:
最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines
は単にファイルへのポインターです。二つ目の行はmap
変換の結果としての lineLengths
を定義します。ここでも、lazinessのために lineLengths
はすぐには計算されません。最後に、reduce
を実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。
後で再びlineLengths
を使用したい場合は、以下を追加するかも知れません:
reduce
の前に、最初にそれが計算された後でメモリ内に lineLengths
が保存されるでしょう。
RDDの基本を説明するために、以下の単純なプログラムを考えます:
最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines
は単にファイルへのポインターです。二つ目の行はmap
変換の結果としての lineLengths
を定義します。ここでも、lazinessのために lineLengths
はすぐには計算されません。最後に、reduce
を実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。
後で再びlineLengths
を使用したい場合は、以下を追加するかも知れません:
reduce
の前に、最初にそれが計算された後でメモリ内に lineLengths
が保存されるでしょう。
Sparkに関数を渡す
SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。これをするには以下の2つのお勧めの方法があります:
- 匿名関数構文、これはコードの短い断片のために使うことができます。
- 大きなシングルトンプロジェクトの中の静的なメソッド。例えば、
object MyFunctions
を定義し、以下のようにMyFunctions.func1
を渡すことができます:
(シングルトンオブジェクトとは対照的に)クラスのインスタンスの中のメソッドにリファレンスを渡すことも可能ですが、これにはメソッドと一緒にクラスを含むオブジェクトを送信する必要があります。例えば、以下を考えてみてください:
ここで、もし新しいMyClass
インスタンスを生成し、その上で doStuff
を呼び出すと、その中のmap
は MyClass
インスタンスのfunc1
メソッドを参照します。つまり、オブジェクト全体がクラスタに送信される必要があります。それはrdd.map(x => this.func1(x))
を書くことに似ています。
同じように、外部のオブジェクトのフィールドへのアクセスはオブジェクト全体を参照するでしょう。
これは rdd.map(x => this.field + x)
を書くことと等価です。それは this
.の全てを参照しますこの問題を避けるために、最も簡単な方法は外部的にアクセスする代わりに field
をローカル変数にコピーすることです:
SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。Javaでは、関数はorg.apache.spark.api.java.functionパッケージ内のインタフェースを実装するクラスによって表現されます。そのような関数を生成する方法は2つあります:
- 匿名内部クラス、あるいは名前のあるクラスのどちらかでクラスへのFunctionインタフェースを実装し、Sparkへそのインスタンスを渡します。.
- 実装を簡潔に定義するためにlambda 表記を使ってください。
このガイドのほとんどは簡潔化のためにlambda構文を使用しますが、長い形式の全ての同じAPIを使うと簡単です。例えば、以下のように上のコードを書くことができます:
あるいは、インラインの関数を書くことが不恰好であれば:
Javaでの匿名内部クラスはfinal
の印をつけている限りスコープ内の変数にアクセスすることもできます。Sparkは他の言語のために各ワーカーノードへこれらの変数のコピーを転送するでしょう。
SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。これをするには3つのお勧めの方法があります:
- Lambda 表記、簡単な関数のために、表現として書くことができます。(Lambdaは、値を返す複数行の関数あるいは命令文をサポートしません)
- 長いコードのための、Sparkに呼ばれる関数内の
def
。 - モジュール内のトップレベルの関数。
例えば、lambda
を使ってサポートされる長い関数を渡すために、以下のコードを考えてみましょう:
(シングルトンオブジェクトとは対照的に)クラスのインスタンスの中のメソッドにリファレンスを渡すことも可能ですが、これにはメソッドと一緒にクラスを含むオブジェクトを送信する必要があります。例えば、以下を考えてみてください:
ここで、もしnew MyClass
を生成し、その上で doStuff
を呼び出すと、その中のmap
は MyClass
インスタンスのfunc
メソッドを参照します。つまり、オブジェクト全体がクラスタに送信される必要があります。
同じように、外部のオブジェクトのフィールドへのアクセスはオブジェクト全体を参照するでしょう。
この問題を避けるために、最も簡単な方法は外部的にアクセスする代わりに field
をローカル変数にコピーすることです:
クロージャーの理解
Sparkについてもっと難しいことの一つは、クラスタをまたいだコードを実行する時の変数とメソッドのスコープとライフサイクルの理解です。それらのスコープの外側で変数を修正するRDDオペレーションはしばしば混乱の原因となります。以下の例の中で、カウンターを増加するために foreach()
を使うコードを調べますが、同じような問題が他のオペレーションについても起こります。
例
下記の単純なRDD要素の合計を考えます。これは実行が同じJVM内で発生するかどうかに依存して異なる挙動をするかもしれません。これの一般的な例はSparkアプリケーションをクラスタに配備する(例えば、spark-submit を使って YARNへ)のに対してlocal
モードでSparkを実行する(--master = local[n]
)場合です:
ローカル vs. クラスタモード
上のコードの挙動は未定義で、意図したようには動作しないかも知れません。ジョブを実行するために、SparkはRDDのオペレーションの処理をタスクに分割します。各タスクはexecutorによって実行されます。実行に先立って、Sparkはタスクのclosureを計算します。closureはexecutorがRDD上で計算(この場合foreach()
)を実施するために、それらの変数とメソッドが見えなければなりません。このclosureはシリアライズ化され各executorに送信されます。
各executorに送信されたクロージャー内の変数はコピーを持ち、したがって、counter がforeach
関数内で参照された場合それはドライバーノード内の counter ではないということです。ドライバーノードのメモリ内にcounterはありますが、これはexecutorにはもう見えません。executorはシリアライズ化されたclosureからのコピーだけを見ることができます。従って、counter上の全てのオペレーションはシリアライズ化されたclosure内の値を参照していたため、counterの最後の値は0のままでしょう。
ローカルモードでは、ある条件下では foreach
関数がドライバーとして同じJVM内で実際に実行し、同じ元の counterを参照するでしょう。そしてそれを実際に更新するかも知れません。
これらの種類のシナリオでよく定義された挙動を保証するには、Accumulator
を使うべきです。Sparkでのaccumulatorは、クラスタ内のワーカーノードをまたがって実行を分割する場合に、特に変数を安全に更新するための仕組みを提供するために使われます。このガイドのaccumulatorの章でこれらは詳細に議論されます。
一般的に、closure - ループあるいはローカルで定義されるメソッド、はいくつかのグローバルの状態を変化するために使われるべきではありません。Sparkはclosureの外部から参照されるオブジェクトへの突然変異の挙動を定義あるいは保証しません。これを行う幾つかのコードはローカルモードで動作するかもしれませんが、それは単なる偶然で、そのようなコードは分散モードでは期待する動作をしないでしょう。グローバルな集約が必要な場合は代わりにaccumulatorを使ってください。
RDDの要素の出力
その他の一般的な慣用は、rdd.foreach(println)
あるいは rdd.map(println)
を使ってRDDの要素を出力しようとすることです。単一のマシーン上で、これは期待する出力を生成し、全てのRDDの要素を出力するでしょう。しかし、クラスター
モードでは、executorによって呼びだされたstdout
への出力は代わりにexecutorのstdout
に書き込まれ、ドライバーの出力ではありません。つまりドライバー上のstdout
はそれらを表示しないでしょう。ドライバー上の全ての要素を出力するために、最初にRDDをドライバーノードに持ってくる collect()
メソッドを使うことができます。したがって: rdd.collect().foreach(println)
。collect()
はRDD全体を一つのマシーンに取り出すために、これはドライバーのメモリ不足を起こすことがありえます; もしRDDの2,3の要素を出力するとするだけであれば、より安全な方法はtake()
を使うことです: rdd.take(100).foreach(println)
。
キーバリューペアとの連動
ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。
Scalaでは、これらの操作は、(言語内の組み込みの、単純に(a, b)
と書くことで生成される)、Tuple2 オブジェクトを含むRDD上で自動的に利用可能です。キー-値ペアの操作はPairRDDFunctionsクラスの中で利用可能です。これはtupleのRDDの周りを自動的に取り囲みます。
例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey
操作を使用します。
例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトの配列としてそれらを返すためにcounts.collect()
するために、counts.sortByKey()
を使うこともできます。
注意: 独自のオブジェクトをキー-値ペアの操作として使う場合、独自の equals()
メソッドがhashCode()
メソッドを使って実施されるようにしなければなりません。完全な詳細は、Object.hashCode() ドキュメントの中で説明される規約を見てください。
ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。
Javaでは、キー-値ペアはScalaの標準ライブラリのscala.Tuple2クラスを使って表現されます。tupleを生成するためには単純にnew Tuple2(a, b)
を呼び出し、tuple._1()
および tuple._2()
を使ってそのフィールドにアクセスすることができます。
キー-値ペアのRDDはJavaPairRDD クラスによって表現されます。mapToPair
および flatMapToPair
のようなmap
操作の特別なバージョンを使って、JavaRDDからJavaPairRDDを構築することができます。JavaPairRDDは標準的なRDD関数と特別なキー-値関数の両方を持つでしょう。
例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey
操作を使用します。
例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトの配列としてそれらを返すためにcounts.collect()
するために、counts.sortByKey()
を使うこともできます。
注意: 独自のオブジェクトをキー-値ペアの操作として使う場合、独自の equals()
メソッドがhashCode()
メソッドを使って実施されるようにしなければなりません。完全な詳細は、Object.hashCode() ドキュメントの中で説明される規約を見てください。
ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。
Pythonでは、これらの操作は(1, 2)
のような組み込みのPythonのtupleを含むRDD上で動作します。単純にそのようなtupleを生成し、期待する操作を呼び出します。
例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey
操作を使用します。
例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトのリストとしてそれらを返すためにcounts.collect()
するために、counts.sortByKey()
を使うこともできます。
変換
以下の表はSparkによってサポートされる一般的な変換の幾つかをリスト表示します。詳細は、RDD API ドキュメント (Scala, Java, Python, R) および、ペアRDD関数のドキュメント(Scala, Java) を参照してください。
アクション
以下のテーブルはSparkによってサポートされる一般的なアクションの幾つかをリスト表示します。詳細は、RDD API ドキュメントを参照してください(Scala, Java, Python, R)
詳細はペアRDD関数のドキュメントを参照してください(Scala, Java)。
アクション | 意味 |
---|---|
reduce(func) | 関数func を使ってデータセットの要素を集約します(これは2つの引数を取り、1つを返します)。関数は並列して正確に計算されるように交換可能で結合されていなければなりません。 |
collect() | ドライバープログラムにおいて配列としてデータセットの全ての要素を返します。これは一般的にデータの十分に小さなサブセットを返すフィルタあるいは操作の後で効果的です。 |
count() | データセット中の要素の数を返します。 |
first() | データセットの最初の要素を返します(take(1)に似ています)。 |
take(n) | データセットの最初のn個の要素の配列を返します。 |
takeSample(withReplacement, num, [seed]) | 任意でランダムな数値生成種を事前指定して、置換有りあるいは無しで、データセットのnum個の要素のランダムな標本から成る配列を返します。 |
takeOrdered(n, [ordering]) | 自然順あるいは独自の比較のどちらかを使ったRDDの最初の n個の要素を返します。 |
saveAsTextFile(path) | データセットの要素をテキストファイル(あるいはテキストファイルのセット)として、ローカルファイルシステム、HDFS、あるいはその他のHadoopがサポートするファイルシステムの指定されたディレクトリに書き出します。Sparkは各要素ごとにそれをファイル内のテキストの行に変換するためにtoStringを呼び出すでしょう。 |
saveAsSequenceFile(path) (Java および Scala) |
データセットの要素をHadoop SequenceFileとして、ローカルファイルシステム、HDFS、あるいはその他のHadoopがサポートするファイルシステムの指定されたディレクトリに書き出します。これはHadoopのWritableインタフェースを実装するキー-値ペアのRDDで利用可能です。Scalaでは、明示的にWritableに変換可能なタイプでも利用可能です(Sparkは Int, Dobule, Stringなどような基本タイプのための変換を含みます)。 |
saveAsObjectFile(path) (Java および Scala) |
Javaのシリアライズ化を使ってデータセットの要素を単純なフォーマットで書き込みます。これはその後SparkContext.objectFile() を使ってロードすることができます。 |
countByKey() | タイプ (K, V)のRDDにのみ利用可能です。各キーのカウントを持つ (K, Int)ペアのハッシュマップを返します。 |
foreach(func) | データセットの各エレメントに関数func を実行します。これは通常、 Accumulatorの更新、あるいは外部ストレージシステムとの対話のような副作用のために行われます。 注意: foreach の外側でAccumulator以外の変数を修正することは未定義の挙動に繋がるかも知れません。詳細はclosureの理解 を見てください。 |
Spark RDD API は foreach
についてのforeachAsync
のような、いくつかのアクションの非同期バージョンを公開します。これらはアクションの完了を遮ることなくただちにcallerにFutureAction
を返します。これはアクションの非同期実行を管理あるいは待つために使うことができます。
シャッフル操作
Sparkトリガー内の特定の操作はシャッフルとして知られるイベントを引き起こします。シャッフルはパーティションを横断して異なるグループ化をするようにデータを再配布するためのSparkの機構です。これは一般的にexecutorとマシーンをまたがったデータコピーを引き起こし、シャッフルを複雑かつコストの高いオペレーションにします。
背景
シャッフルの間に何が起きるかを理解するために、reduceByKey
操作の例を考えてことができます。reduceByKey
操作は、一つのキーについて全ての値が組として結合される新しいRDDを生成します - キーおよびそのキーに関係する全ての値に対してreduce関数を実行した結果。この試みは、一つのキーのために全ての値が同じパーティション、あるいは同じマシーン上にさえある必要はありませんが、結果を計算するために同じ場所にある必要があります。
Sparkでは、データは特定の操作のために必要な場所にあるように、パーティションを横断して分散されません。計算の間、一つのタスクは一つのパーティション上で操作されるでしょう - 従って、一つの reduceByKey
reduceタスクを実行するために全てのデータを整理するには、Sparkは全対全の操作を実施する必要があります。それは全てのキーのための全ての値を見つけるために全てのパーティションから読み込む必要があり、各キーのための最終結果を計算するためにパーティションを横断して値を集めます - これはshuffleと呼ばれます。
新しくシャッフルされたデータの各パーティション内の要素のセットは決定論的なもので、それはパーティション自身の順番付けですが、それらの要素の順場付けはそうではありません。もしシャッフルに従って予想どおりに並べられたデータを希望する場合は、以下を使って行うことができます:
mapPartitions
例えば、.sorted
を使って各パーティションを並べるためrepartitionAndSortWithinPartitions
同時に再パーティションする間にパーティションを効率的にソートするためsortBy
グローバルに順番付けされたRDDを作るため
シャッフルを起こしえるオペレーションはrepartition
および coalesce
のようなrepartition オペレーション、groupByKey
および reduceByKey
のような'ByKey オペレーション(カウントを除く)、そしてcogroup
および join
のようなjoin オペレーションを含みます。
パフォーマンスの影響
ShuffleはディスクI/O、データのシリアライズ、ネットワークI/Oを必要とするため、高価な操作です。シャッフルのためにデータを整理するために、Sparkはタスクのセットを生成します - データを整理するためにmap タスク、そしてそれを集約するためにreduceタスクのセット。この学名はMapReduceからきていて、Sparkの map
と reduce
操作には直接関係しません。
内部的には、各mapタスクからの結果はそれらが適さなくなるまでメモリ内に保持されます。そして、それらは目的のパーティションに基づいてソートされ、単一のファイルに書き込まれます。reduceの側では、タスクは関連するソートされたブロックを読みます。
特定のシャッフル操作は転送の前あるいは後にレコードを整理するためにメモリ内のデータ構造を採用しているため、大量の総ヒープメモリを消費するかも知れません。特に、reduceByKey
および aggregateByKey
はそれらの構造をmap側に生成し、'ByKey
操作はreduce側にそれらを生成します。データがメモリ内に収まらなくなった場合、Sparkはそれらのテーブルをディスクに流し込み、追加のディスクI/Oのオーバーヘッドとガベージコレクションの増加を起こします。
シャッフルはディスク上に大量の中間ファイルも生成します。Spack 1.3 の時点では、これらのファイルは対応するRDDがもう使われなくなりガベージコレクトされるまで保持されます。これは、系図が再計算される場合にシャッフルファイルが再生成する必要がないように行われます。もしアプリケーションがこれらのRDDへのリファレンスを維持するか、GCがしばしば起動されない場合には、ガベージコレクションはずっと後でのみ起こるかも知れません。この事は長く実行中のSparkジョブは大量のディスク領域を消費するかも知れないことを意味します。Sparkコンテキストを設定する場合に、一時ストレージディレクトリは spark.local.dir
設定パラメータによって指定されます。
シャッフルの挙動は様々な設定パラメータを調節することで調整することができます。Spark Configuration Guideの中の'シャッフルの挙動'の章を見てください。
RDD の永続性
Sparkでの最も重要な能力の一つは、操作に渡ってメモリ内のデータセットを永続化 (あるいは キャッシュ)することです。RDDを維持する場合、各ノードはメモリ内で計算する全てのパーティションを格納し、そのデータセット上(あるいはそれから派生したデータセット)での他のアクションの中でそれらを再使用します。これにより将来のアクションはもっと速くできます(時には10x以上)。キャッシングは繰り返しアルゴリズムおよび高速な対話的な使用のためにキーとなるツールです。
persist()
あるいは cache()
メソッドをRDDに使ってRDDを永続化されるように印をつけることができます。最初はアクション内で計算され、ノード上のメモリ内に保持されるでしょう。Sparkのキャッシュは耐障害性があります - RDDのいずれかのパーティションが喪失すると、最初にそれを生成した変換を使って自動的に再計算されます。
更に、各永続化されたRDDは異なるストレージレベルを使って格納することができ、例えば ディスク上にデータセットを永続化し、シリアライズ化されたJavaオブジェクトとして(保存領域に)メモリ内に永続化し、ノードをまたがってリプリケートすることができます。これらのレベルはStorageLevel
オブジェクト (Scala, Java, Python) を persist()
に渡すことで設定されます。cache()
メソッドはデフォルトのストレージレベルを使うための略記で、StorageLevel.MEMORY_ONLY
(メモリ内にデシリアライズさらたオブジェクトを格納する)です。ストレージレベルの完全なセットは以下の通りです:
ストレージ レベル | 意味 |
---|---|
MEMORY_ONLY | RDDをJVM内でデシリアライズされたJavaオブジェクトとして格納します。もしRDDがメモリ内に収まらない場合は、幾つかのパーティションがキャッシュされず、必要になるたびに毎回その場で再計算されるでしょう。これはデフォルトのレベルです。 |
MEMORY_AND_DISK | RDDをJVM内でデシリアライズされたJavaオブジェクトとして格納します。もし、RDDがメモリ内に収まらない場合は、収まらないパーティションをディスク上に保持し、必要な場合にそこから読み込みます。 |
MEMORY_ONLY_SER (Java および Scala) |
RDDを シリアライズ化された Java オブジェクトとして格納します(パーティションごとに1バイトの配列)。これは、特にfast serializerを使う場合は、読むためのCPU集約が多いですが、一般的にデシリアライズされたオブジェクトよりも容量効率が良いです。 |
MEMORY_AND_DISK_SER (Java および Scala) |
MEMORY_ONLY_SERと似てますが、メモリに収まらないパーティションは必要になる度にその場で再計算するのではなく、ディスクに流し入れます。 |
DISK_ONLY | RDDパーティションをディスク上にのみ格納します。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 など | 上のレベルと同じですが、2つのクラスタノードに各パーティションをレプリケートします。 |
OFF_HEAP (experimental) | MEMORY_ONLY_SER に似ていますが、データを off-heap メモリに格納します。これには off-heap メモリを有効にする必要があります。 |
注意: Pythonでは、格納されたオブジェクトは常に Pickleライブラリを使ってシリアライズされます。そのためシリアライズ化レベルを選択するかどうかは問題になりません。Pythonで利用可能なストレージレベルには、MEMORY_ONLY
, MEMORY_ONLY_2
, MEMORY_AND_DISK
, MEMORY_AND_DISK_2
, DISK_ONLY
, DISK_ONLY_2
およびDISK_ONLY_3
が含まれます。
Sparkはシャッフル操作の中で自動的に幾つかの中間データも永続化します(例えば、reduceByKey
); ユーザがpersist
を呼ばない場合でも。ノードがシャッフル中に故障した場合に全ての入力を再計算することを避けるために行われます。それでも、結果のRDDを再利用する場合は、ユーザが結果のRDDで persist
を呼び出すことをお勧めします。
どっちのストレージレベルを選ぶか?
Sparkのストレージレベルはメモリ利用率とCPU効率の間で異なるトレードオフを提供するためのものです。選択するには以下の過程を経ることをお勧めします:
-
デフォルトのストレージレベル(
MEMORY_ONLY
)を使ってRDDが気持ちよく収まる場合は、RDDをそのままにします。これは最もCPU効率が良い選択で、RDD上での操作をできるだけ速く実行することができます。 -
そうでなければ、オブジェクトがもっと空間効率が良くなるように、
MEMORY_ONLY_SER
および 高速なシリアライズライブラリ を使うことを試してください。しかしそれでもアクセスはまだかなり速いです。(Java および Scala) -
データセットを計算する関数が高価、あるいは関数が大量のデータをフィルターしない場合は、ディスクに流し込まないでください。そうしないと、パーティションの再計算はディスクからそれを読み込むのと同じくらいの速さかも知れません。
-
高速障害復旧をしたい場合(例えば、webアプリケーションからのリクエストに応えるためにSparkを使う場合)、リプリケートされたストレージレベルを使ってください。全てのストレージレベルは喪失データの再計算により完全な耐障害性を提供しますが、リプリケートされたデータにより喪失したパーティションの再計算を待つこと無しにRDD上でタスクを実行し続けることができます。
データの削除
Sparkは自動的に各ノード上のキャッシュの使用率を監視し、least-recently-used (LRU) 形式で古いデータのパーティションを削除します。キャッシュからRDDが抜け落ちるのを待つ代わりに手動でRDDを削除したい場合は、RDD.unpersist()
メソッドを使います。このメソッドはデフォルトではブロックされないことに注意してください。リソースが解放されるまでブロックするには、このメソッドを呼ぶ時にblocking=true
を指定します。
共有変数
通常、(map
あるいは reduce
のような)Spark操作に渡される関数は遠隔のクラスタノード上で実行される場合、関数内で使われる全ての変数のそれぞれのコピー上で動作します。これらの変数は各マシーンにコピーされ、リモートのマシーン上の変数への更新はドライバープログラムへ伝播されません。タスクを横断する一般的なread-write共有変数のサポートは効果的では無いでしょう。しかし、Sparkは2つの共通の使用パターンのための2つの制限された共有変数の種類を提供します: ブロードキャスト変数とaccumulatorです。
ブロードキャスト変数
ブロードキャスト変数によってプログラマはタスクによってコピーを発送するのではなく、各マシーン上にキャッシュされたread-only変数を維持することができます。それらは、例えば各ノードに多くの入力データセットのコピーを効率的な方法で渡すために使うことができます。Sparkは通信のコストを下げるために効果的なブロードキャストアルゴリズムを使ってブロードキャスト変数を分散しようともします。
Sparkのアクションは分散した"シャッフル"操作によって分割されたステージのセットによって実行されます。Sparkは各ステージ内でタスクによって必要とされる共通データを自動的にブロードキャストします。この方法でブロードキャストされたデータはシリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされます。これは明示的なブロードキャスト変数の生成は複数のステージを横断したタスクが同じデータを必要とするか、デシリアライズ形式のデータのキャッシュが重要な場合にのみ有用なことを意味します。
ブロードキャスト変数はSparkContext.broadcast(v)
を呼ぶことで変数v
から生成されます。ブロードキャスト変数は v
のラッパーで、その値はvalue
メソッドを呼ぶことでアクセスすることができます。以下のコードはこのことを示します:
ブロードキャスト変数が生成された後で、v
が1度以上ノードに送信されないように、クラスタ上で実行される全ての関数内で値v
の代わりに使われる必要があります。その上、オブジェクト v
は全てのノードがブロードキャスト変数と同じ値を取得できるようにブロードキャストされた後で修正してはいけません(例えば、変数は新しいノードへ後で送信される場合)。
ブロードキャスト変数がexecutorにコピーしたリソースを解放するには、.unpersist()
を呼びます。ブロードキャストが後で使われる場合、再度ブロードキャストされます。ブロードキャスト変数で使われる全てのリソースを永続的に解放するには、.destroy()
を呼びます。その後、ブロードキャスト変数は使えなくなります。これらのメソッドはデフォルトではブロックされないことに注意してください。リソースが解放されるまでブロックするには、それらを呼び出す時にblocking=true
を指定します。
集約器
accumulatorは結合および交換操作を使ってのみ"追加"される変数で、したがってaccumulatorは効率的に並行をサポートします。それらは(MapReduceのような)カウンターあるいは集計を実装するために使うことができます。Sparkは本来数字タイプのaccumulatorをサポートし、プログラマーは新しいタイプのサポートを追加することができます。
ユーザとして、名前付きあるいは名前無しのaccumulatorを作成することができます。下の図で分かるように、名前付きのaccumurator (この中ではインスタンスcounter
) はaccumulatorを修飾するステージのためのweb UI内で表示されるでしょう。Sparkは “Tasks” テーブル内のタスクによって修飾される各accumulatorのための値を表示します。
UI内でのaccumulatorの追跡は実行中のステージの進捗を理解するために有用かもしれません(注意: これはPythonではまだサポートされていません)。
LongあるいはDouble型のそれぞれの値を集約するために、SparkContext.longAccumulator()
または SparkContext.doubleAccumulator()
を呼び出すことで数字の集約器を生成することができます。クラスタ上で実行するタスクは、add
メソッドを使って追加することができます。しかし、それらはその値を読むことができません。ドライバープログラムのみが value
メソッドを使って集約器の値を読むことができます。
以下のコードは配列の要素を追加するために使われるaccumulatorを表します:
このコードはタイプLongのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorV2のサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorV2 abstract クラスは上書きされるべきいくつかのメソッドを持ちます: accumulatorを0にリセットするためのreset
、accumulatorに他の値を追加するためのadd
、他の同じタイプのaccumulatorを1つにマージするためのmerge
です。上書きされるべき他のメソッドはAPI ドキュメント内に含まれます。例えば、数学的なベクターを表す MyVector
クラスを持つとした場合、以下のように書けます:
プログラマが独自のタイプのAccumulatorV2を定義する場合、結果のタイプは要素の追加で違うものになるかも知れないことに注意してください。
LongあるいはDouble型のそれぞれの値を集約するために、SparkContext.longAccumulator()
または SparkContext.doubleAccumulator()
を呼び出すことで数字の集約器を生成することができます。クラスタ上で実行するタスクは、add
メソッドを使って追加することができます。しかし、それらはその値を読むことができません。ドライバープログラムのみが value
メソッドを使って集約器の値を読むことができます。
以下のコードは配列の要素を追加するために使われるaccumulatorを表します:
このコードはタイプLongのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorV2のサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorV2 abstract クラスは上書きされるべきいくつかのメソッドを持ちます: accumulatorを0にリセットするためのreset
、accumulatorに他の値を追加するためのadd
、他の同じタイプのaccumulatorを1つにマージするためのmerge
です。上書きされるべき他のメソッドはAPI ドキュメント内に含まれます。例えば、数学的なベクターを表す MyVector
クラスを持つとした場合、以下のように書けます:
プログラマが独自のタイプのAccumulatorV2を定義する場合、結果のタイプは要素の追加で違うものになるかも知れないことに注意してください。
Warning: Sparkのタスクが終了した時に、Sparkはこのタスク内の集約された更新をアキュムレータにマージしようとするでしょう。もしそれが失敗すると、Sparkは失敗を無視し、タスクをまだ成功とマークし、他のタスクを実行し続けるでしょう。従って、バグが多いアキュムレータはSparknジョブに影響しないでしょうが、たとえSparkジョブが成功しても正しく更新されないかもしれません。
accumulatorはSparkContext.accumulator(v)
を呼ぶことで初期値v
から生成されます。クラスタ上で実行するタスクは、add
メソッドあるいは +=
オペレータを使って追加することができます。しかし、それらはその値を読むことができません。ドライバープログラムだけがvalue
メソッドを使ってaccumulatorの値を読むことができます。
以下のコードは配列の要素を追加するために使われるaccumulatorを表します:
このコードはタイプIntのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorParamのサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorParam インタフェースは2つのメソッドを持ちます: データタイプのための"zero値"の提供のためのzero
、および2つの値を一緒に追加するためのaddInPlace
。例えば、数学的なベクターを表すVector
クラスを持つとした場合、以下のように書けます:
actions onlyの中で実施されるaccumulatorの更新のために、Sparkはaccumulatorへの各タスクの更新を1度だけ適用されるようにするでしょう。つまり、再起動されたタスクはその値を更新しないでしょう。変換の中で、もしタスクあるいはジョブのステージが再実行される場合は、ユーザは各タスクの更新が一度以上適用されるかも知れないことに気づいてください。
accumulator はSparkのlazy評価モデルを変更しません。もしそれらがRDD上の操作内で更新された場合は、一旦RDDがアクションの一部として計算されるとそれらの値が更新されます。結果として、accumulatorの更新は map()
のようなlazy変換の中で行われる場合に実行されることが保証されません。以下のコードの断片はこの性質を実証します:
クラスタの配備
アプリケーション サブミット ガイド はクラスタにアプリケーションをサブミットする方法をせつめいします。簡単にいうと、一旦アプリケーションをJAR(Java/Scalaの場合)、あるいは .py
または .zip
ファイル(Pythonの場合)にパッケージすると、bin/spark-submit
スクリプトは任意のサポートされるクラスタマネージャーへそれをサブミットすることができます。
Java / Scala からのSparkジョブの起動
org.apache.spark.launcher パッケージはSparkジョブを単純なJava APIを使用する子プロセスとして起動するためのクラスを提供します。
ユニットテスト
Sparkは一般的なユニットテストフレームワークを使ってユニットテストをし易いです。マスターURLをlocal
に設定してテスト内でSparkContext
を単純に作成し、操作を実行し、取り壊すためにSparkContext.stop()
を呼びます。Sparkは同じプログラム内で並行した2つのコンテキストの実行をサポートしないため、finally
ブロックあるいは、テストフレームワークのtearDown
メソッド内でコンテキストを必ず停止するようにしてください。
この後どうすればいいか
Spark webサイト上で幾つかの Spark プログラムの例を見つけることができます。更に、Sparkはexamples
ディレクトリ (Scala, Java, Python, R) の中に、幾つかの例を含んでいます。Sparkのbin/run-example
スクリプトにクラス名を渡すことで、JavaおよびScalaの例を実行することができます; 例えば:
./bin/run-example SparkPi
Pythonの例のためには、代わりに spark-submit
を使ってください:
./bin/spark-submit examples/src/main/python/pi.py
Rの例のためには、代わりにspark-submit
を使ってください:
./bin/spark-submit examples/src/main/r/dataframe.R
プログラムの最適化の手助けのために、設定 およびチューニングガイドはベストプラクティスの情報を提供します。データが効果的な形式でメモリ内に格納されるには、それらは特に重要です。デプロイの手助けのために、クラスタモードの概要 が分散化操作およびクラスタマネージャーのサポートを起こすコンポーネントを説明します。