Spark プログラミング ガイド

概要

高レベルでは、各Sparkアプリケーションはクラスタ上でユーザのmain関数を実行し様々な並行操作を実行するドライバープログラムからなります。Sparkが提供する主な抽象概念はresilient distributed dataset (RDD)で、クラスタのノードを横断して分割された並行して操作されることができるエレメントのコレクションです。RDDはHadoopファイルシステム(あるいは他のHadoopをサポートするファイルシステム)内のファイル、あるいはドライバープログラム内の既存のScalaコレクションを使って開始し変換することで生成されます。並行操作間で効果的に再利用できるように、ユーザはSparkにRDDをメモリ内に永続するように依頼するかも知れません。最終的に、RDDはノード障害から自動的に復活します。

Sparkの二つ目の抽象概念は、並行操作で使用することができる共有変数です。デフォルトでは、Sparkが異なるノード上でタスクのセットとして並行して関数を実行する場合、Sparkは関数内で使われる各変数のコピーを各タスクにコピーします。タスクを横断、あるいはタスクおよびドライバプログラム間で、変数が共有される必要があることがあります。Sparkは二つのタイプの共有変数をサポートします: ブロードキャスト変数、これは全てのノード上のメモリ内で値をキャッシュするために使うことができます。 アキュムレイター、これはカウンターや集計のような"追加"のみされる変数です。

このガイドはSparkのサポートする言語でこれらの各機能を案内します。Sparkの対話シェル - Scalaシェルの bin/spark-shell、Pythonシェルの bin/pyspark どちらか - を起動して一緒に進めていくのが一番簡単です。

Sparkとのリンク

Spark 2.0.0 はデフォルトでScala 2.11で動作するようにビルドおよび配布されています。(Spark は他のバージョンのScalaでも動作するようにビルドされています) Scalaでアプリケーションを書くには、互換性のあるScalaバージョンを使う必要があるでしょう(例えば、2.11.X)。

Sparkアプリケーションを書くには、MavenのSparkの依存を追加する必要があるでしょう。Sparkは以下のようにMaven Centralを使って利用可能です:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.0.0

更に、HDFSクラスタにアクセスしたい場合は、HDFSのバージョンのためにhadoop-clientに依存性を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加してください:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Spark 1.3.0 より前では、必須の暗黙的な変換を有効にするために、明示的にimport org.apache.spark.SparkContext._する必要があります)。

Spark 2.0.0 はJava 7以上で動作します。Java8を使っている場合は、Sparkは簡潔な書き込み機能のために lambda 表現 をサポートします。それ以外に、org.apache.spark.api.java.function パッケージの中にあるクラスを使うことができます。

SparkアプリケーションをJavaで書くためにSparkに依存を追加する必要があります。Sparkは以下のようにMaven Centralを使って利用可能です:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.0.0

更に、HDFSクラスタにアクセスしたい場合は、HDFSのバージョンのためにhadoop-clientに依存性を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加してください:

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.SparkConf

Spark 2.0.0 はPython 2.6+ あるいは Python 3.4+ で動作します。それは標準のCPythonインタプリタを使うため、NumPyのようなC ライブラリが使われるかもしれません。それは PyPy 2.3+ でも動作します。

PythonでSparkアプリケーションを実行するには、Sparkディレクトリにある bin/spark-submit スクリプトを使用してください。このスクリプトはSparkのJava/Scalaライブラリをロードし、クラスタにアプリケーションをサブミットすることができます。対話的なPythonシェルを起動するためにbin/pysparkを使うこともできます。

HDFSデータにアクセスしたい場合には、そのバージョンのHDFSにリンクしているPySparkのビルドを使う必要があります。一般的なHDFSバージョンのためのPrebuilt パッケージ がSparkホームページでも利用可能です。

最後に、プログラムに幾つかのSparkクラスをインポートする必要があります。以下の行を追加します:

from pyspark import SparkContext, SparkConf

PySpark はドライバーおよびワーカーの両方で、Pythonの同じマイナーバージョンを必要とします。PATHにあるデフォルトのpythonバージョンを使います。Pythonのどちらのバージョンを使いたいかをPYSPARK_PYTHONを使って指定することができます。例えば:

$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

Sparkの初期化

Sparkプログラムが最初にしなければならないことは、SparkContextオブジェクトを作成する事です。これはSparkにクラスタにアクセスする方法を教えます。SparkContextを作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。

JVMごとに一つのSparkContextだけがアクティブになることができます。新しいSparkContextを生成する前にアクティブなSparkContextをstop()しなければなりません。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Sparkプログラムが最初にしなければならないことは、JavaSparkContext オブジェクトを作成することです。これはSparkにクラスタにアクセスする方法を教えます。SparkContextを作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

Sparkプログラムが最初にしなければならないことは、SparkContextオブジェクトを作成する事です。これはSparkにクラスタにアクセスする方法を教えます。SparkContextを作成するために、最初にアプリケーションについての情報を含むSparkConf オブジェクトをビルドする必要があります。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName パラメータはクラスタUI上で見るアプリケーションの名前です。masterSpark, Mesos あるいは YARN クラスタの URLか、ローカルモードで動作するための特別な"local"文字列です。実際には、クラスタ上で実行する場合にプログラム内で masterをハードコードせずに spark-submitを使ってアプリケーションを起動し、そこで受け取りたいでしょう。しかし、ローカルテストおよびユニットテストのために、処理中にSparkを実行するために "local" を渡すことができます。

シェルの使用

Sparkシェルの中では、特別なインタプリターを認識しているSparkContext、scと呼ばれる変数が既に作成されています。独自のSparkContextを作っても動作しないでしょう。--master 引数を使ってどっちのマスターをコンテキストが接続するかを指定することができます。そして--jars 引数にカンマ区切りのリストで渡すことでクラスパスにJARを追加することができます。--packages 引数にカンマ区切りのmaven coordinateのリストを渡すことで、シェルセッションに依存性(例えばSparkパッケージ)を追加することもできます。依存性が存在するかも知れないどのような追加のリポジトリ(例えば、SonaType)も--repositories 引数に渡すことができます。例えば、確実に4つのコア上でbin/spark-shellを実行するには、以下を使います:

$ ./bin/spark-shell --master local[4]

あるいは、クラスパスにcode.jarも追加するには、以下を使います:

$ ./bin/spark-shell --master local[4] --jars code.jar

maven coordinatesを使って依存性を追加するには:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

オプションの完全なリストについては、spark-shell --helpを実行します。舞台裏で spark-shellはもっと多くの一般的な spark-submit スクリプトを呼び出します。

PySparkシェルの中では、特別なインタプリターを認識しているSparkContext、scと呼ばれる変数が既に作成されています。独自のSparkContextを作っても動作しないでしょう。--master引数を使ってどっちのマスターコンテキストをコンテキストが接続するかを指定することができます。 そして--py-files引数にカンマ区切りのリストを渡すことでランタイムにPythonの.zip, .egg あるいは .py ファイルを追加することができます。--packages 引数にカンマ区切りのmaven coordinateのリストを渡すことで、シェルセッションに依存性(例えばSparkパッケージ)を追加することもできます。依存性が存在するかも知れないどのような追加のリポジトリ(例えば、SonaType)も--repositories 引数に渡すことができます。Sparkパッケージが持つどのようなpython依存性(パッケージのrequirements.txtにリスト化されています)も、必要な時にpipを使って手動でインストールされなければなりません。例えば、確実に4つのコア上でbin/pysparkを実行するには、以下を使います:

$ ./bin/pyspark --master local[4]

あるいは、code.pyも検索パス(後でimport codeできるように)に追加するには、以下を使います:

$ ./bin/pyspark --master local[4] --py-files code.py

オプションの完全なリストについては、pyspark --helpを実行します。舞台裏で pysparkはもっと多くの一般的な spark-submit スクリプトを呼び出します。

IPython内でPySparkシェル、拡張Pythonインタプリタ、を起動することもできます。PySparkはIPython 1.0.0 以降で動作します。IPythonを使用するには、bin/pysparkを実行する時にPYSPARK_DRIVER_PYTHON 変数 ipython に設定します:

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

Jupyter notebookを使うには (以前はthe IPython notebookとして知られていました)。

$ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark

ipython あるいは jupyter コマンドを PYSPARK_DRIVER_PYTHON_OPTSを設定することでカスタマイズすることができます。

Jupyter Notebookサーバが起動した後で、"Files"タブから新しい"Python 2" を作成することができます。notebookの中で、Jupyter notebookからSparkを開始使用とする前にnotebookの一部として %pylab inlineコマンドを入力することができます。

Resilient Distributed Datasets (RDDs)

Sparkはresilient distributed dataset (RDD)の概念を中心題目とします。これは並行して操作することが可能な要素の耐障害性のあるコレクションです。RDDを生成するには2つの方法があります: ドライバプログラム内の既存のコレクションを並列化。あるいは共有ファイルシステム、HDFS、HBaseあるいはHadoopの入力フォーマットを提供するデータソースのような外部ストレージシステム内のデータセットの参照。

並列化されたコレクション

並列化されたコレクションはドライバプログラム内(ScalaのSeq)の既存のコレクション上でSparkContextparallelizeメソッドを呼ぶことで生成することができます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

一度生成すると、分散データセット(distData) は並列して操作されることが可能です。例えば、配列の要素を追加するためにdistData.reduce((a, b) => a + b) を呼ぶかも知れません。後で分散されたデータセット上の操作を説明します。

並列化されたコレクションはドライバプログラム内の既存のコレクション上でJavaSparkContextparallelize メソッドを呼ぶことで生成されます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

一度生成すると、分散データセット(distData) は並列して操作されることが可能です。例えば、リストの要素を追加するためにdistData.reduce((a, b) => a + b) を呼ぶかも知れません。後で分散されたデータセット上の操作を説明します。

注意: このガイドでは、Java関数を指定するために簡潔なJava8ラムダ構文をしばしば使用しますが、古いバージョンのJavaでorg.apache.spark.api.java.function パッケージの中のインタフェースを実装することができます。詳細はSparkへ関数を渡す で説明します。

並列化されたコレクションはドライバプログラム内の既存の繰り返しまたはコレクション上でSparkContextparallelize メソッドを呼ぶことで生成されます。コレクションの要素は並列して操作可能な分散データセットからコピーされます。例えば、これは数字の1から5を持つ並列化されたコレクションの生成の仕方です:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

一度生成すると、分散データセット(distData) は並列して操作されることが可能です。例えば、リストの要素を追加するために、distData.reduce(lambda a, b: a + b) を呼ぶことができます。後で分散されたデータセット上の操作を説明します。

並列化コレクションの1つの重要なパラメータがデータセットを分割するパーティション の数です。Sparkはクラスタの各パーティションで1つのタスクを実行するでしょう。一般的はクラスタ内の各CPUについて2-4のパーティションが欲しいです。通常は、Sparkはクラスタに応じて自動的にパーティションの数を設定しようとします。しかし、parallelizeの2つ目のパラメータとして渡すことで手動で設定することもできます(例えばsc.parallelize(data, 10))。注意: コード内の幾つかの箇所でsliceという単語(パーティションの類義語)を後方互換性のために使います。

外部データベース

SparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。

テキストファイルのRDDはSparkContexttextFileメソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://, s3n://, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

一度生成すると、distFileはデータセット操作で動作することができます。例えば、map および reduce 操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(s => s.length).reduce((a, b) => a + b).

Sparkを使ったファイルの読み込みには幾つか注意点があります:

  • ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。

  • textFileを含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory"), textFile("/my/directory/*.txt") および textFile("/my/directory/*.gz") を使うことができます。

  • textFile メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは64MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。

テキストファイル以外に、SparkのScala APIは幾つかの他のデータフォーマットもサポートします:

  • SparkContext.wholeTextFiles は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFileと異なり、各ファイルごとに行あたり1つのレコードを返します。

  • SequenceFilesについては、SparkContextの sequenceFile[K, V] メソッドを使用します。この時、K および V はファイル内のキーの種類と値です。これらはIntWritable および TextのようにHadoopのWritableインタフェースのサブクラスでなければなりません。更に、2,3の共通のWritableのためにSparkによってネイティブなタイプを指定することができます。例えば、sequenceFile[Int, String] は自動的にIntWritableおよびTextを読み込むでしょう。

  • 他のHadoop 入力フォームに関しては、SparkContext.hadoopRDD メソッドを使うことができます。これは任意のJobConf および、入力フォーマットクラス、キークラスと値クラスを取ります。同じようにしてHadoopジョブのためにこれらに入力ソースを設定するかもしれません。"新しい" MapReduce API (org.apache.hadoop.mapreduce)に基づいたInputFormatのためのSparkContext.newAPIHadoopRDDも使うことができます。

  • RDD.saveAsObjectFile およびSparkContext.objectFileはシリアライズ化されたJavaオブジェクトからなる単純な形式へのRDDの保存をサポートします。これはAvroのように特殊化されたフォーマットのように効率的ではありませんが、どのようなRDDを保存する簡単な方法を提供します。

SparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。

テキストファイルのRDDはSparkContexttextFileメソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://, s3n://, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:

JavaRDD<String> distFile = sc.textFile("data.txt");

一度生成すると、distFileはデータセット操作で動作することができます。例えば、map および reduce 操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(s -> s.length()).reduce((a, b) => a + b).

Sparkを使ったファイルの読み込みには幾つか注意点があります:

  • ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。

  • textFileを含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory"), textFile("/my/directory/*.txt") および textFile("/my/directory/*.gz") を使うことができます。

  • textFile メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは64MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。

テキストファイル以外に、SparkのJava APIは幾つか他のデータフォーマットもサポートします:

  • JavaSparkContext.wholeTextFiles は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFileと異なり、各ファイルごとに行あたり1つのレコードを返します。

  • SequenceFilesについては、SparkContextの sequenceFile[K, V] メソッドを使用します。この時、K および V はファイル内のキーの種類と値です。これらはIntWritable および TextのようにHadoopのWritableインタフェースのサブクラスでなければなりません。

  • 他のHadoop 入力フォームに関しては、JavaSparkContext.hadoopRDD メソッドを使うことができます。これは任意のJobConf および、入力フォーマットクラス、キークラスと値クラスを取ります。同じようにしてHadoopジョブのためにこれらに入力ソースを設定するかもしれません。"新しい" MapReduce API (org.apache.hadoop.mapreduce)に基づいたInputFormatのためのJavaSparkContext.newAPIHadoopRDDも使うことができます。

  • JavaRDD.saveAsObjectFile およびJavaSparkContext.objectFileはシリアライズ化されたJavaオブジェクトからなる単純な形式へのRDDの保存をサポートします。これはAvroのように特殊化されたフォーマットのように効率的ではありませんが、どのようなRDDを保存する簡単な方法を提供します。

PySparkはHadoopでサポートされているローカルファイルシステム, HDFS, Cassandra, HBase, Amazon S3などを含む、どのようなストレージソースからでも分散データセットを作成することができます。Spark はテキストファイル、SequenceFilesおよびどのような他のHadoop InputFormatをサポートします。

テキストファイルのRDDはSparkContexttextFileメソッドを使って生成することができます。このメソッドはファイルのURI(マシーン上のローカルファイルまたは hdfs://, s3n://, などURIのどちらか)を取り、行のコレクションとして読み込みます。実施の例です:

>>> distFile = sc.textFile("data.txt")

一度生成すると、distFileはデータセット操作で動作することができます。例えば、map および reduce 操作を使って以下のように全ての行のサイズを集計することができます: distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).

Sparkを使ったファイルの読み込みには幾つか注意点があります:

  • ローカルファイルシステム上のパスを使用する場合、ファイルはワーカーノードでも同じパスでアクセス可能でなければなりません。ファイルを全てのワーカーにコピーする、あるいはネットワークでマウントされた共有ファイルシステムを使うかどちらかです。

  • textFileを含むSparkのファイルベースの全ての入力メソッドは、ディレクトリ、圧縮ファイルおよびワイルドカードでの実行もサポートします。例えば、textFile("/my/directory"), textFile("/my/directory/*.txt") および textFile("/my/directory/*.gz") を使うことができます。

  • textFile メソッドはファイルのパーティション数を制御するための任意の2つ目のパラメータも取ります。デフォルトではSparkはファイルの各ブロック(HDFSではブロックはデフォルトでは64MB)に1つのパーティションを生成しますが、大きな値を渡すことでより大きな数のパーティションを要求することもできます。ブロック数より少ないパーティションを持つことができないことに注意してください。

テキストファイル以外に、SparkのPython APIは幾つか他のデータフォーマットもサポートします:

  • SparkContext.wholeTextFiles は複数の小さなテキストファイルを含むディレクトリを読み込み、それらのペア(ファイル名,内容)を返します。これはtextFileと異なり、各ファイルごとに行あたり1つのレコードを返します。

  • RDD.saveAsObjectFile およびSparkContext.objectFileはpickle化されたPythonオブジェクトからなる単純な形式へのRDDの保存をサポートします。バッチはデフォルトのバッチサイズ 10を使って、pickleシリアライズ化に使われます。

  • SequenceFile と Hadoop Input/Output フォーマット

注意 この機能は現在のところ 実験的なもの とマークされていて、上級ユーザのためのものであることに注意してください。将来Spark SQLに基づいた read/writeサポートに置き換えられるかも知れませんが、いずれにしてもSpark SQLは好ましい方法です。

Writableサポート

PySpark SequenceFile はJava内のキー-値のRDDのロードをサポートし、Writableを基本のJavaのタイプに変換し、結果のJavaオブジェクトを Pyroliteを使ってピクル化します。キー-値のペアのRDDをSequenceFileに保存する場合、PySparkは逆のことをします。それはPythonオブジェクトをJavaオブジェクトにアンピクル化し、それらをWritableに変換します。以下のWritableは自動的に変換されます:

Writable タイプPython タイプ
テキストunicode 文字列
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict

配列は追加設定無しで処理することができません。読み書きする場合には、ユーザは独自のArrayWritableサブタイプを指定する必要があります。書き込む場合には、ユーザは配列を独自の ArrayWritable サブタイプに変換する独自のコンバータも指定する必要があります。読み込む場合には、デフォルトのコンバータが独自の ArrayWritable サブタイプを Java のObject[]に変換するでしょう。これはPythonのタプルにピクル化されます。基本的なタイプの配列のためのPython array.arrayを取得sるうには、ユーザは独自のコンバータを指定する必要があります。

SequenceFileの保存とロード

テキストファイルと似て、SequenceFiles はパスを指定することによって保存およびロードすることができます。キーと値のクラスは指定することができますが、標準のWritableの場合これは不要です。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

他のHadoop Input/Output フォーマットの保存およびロード

PySparkは、'new'および'old' Hadoop MapReduce APIについて、任意のHadoop InputFormatの読み込み、あるいは任意のHadoopのOutputFormatの書き込みができます。必要であれば、Hadoop設定をPython dictとして渡すことができます。これはElasticsearch ESInputFormatを使った例です:

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {"es.resource" : "index/type"}   # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
>>> rdd.first()         # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

もしInputFormatが単純にHadoop設定 および/あるいは入力パスに依存する場合は、キーと値のクラスは上のテーブルに応じて簡単に変換でき、このやり方はそのような場合にうまく動作するでしょう。

(Cassandra/HBaseからロードしたデータなど)独自のシリアライズ化バイナリデータがある場合は、まずそのデータをPyroliteのピックラーが処理できるものにScala/Java側で変換しなければならないでしょう。このために Converter trait が提供されます。単にこのtraitを拡張し、変換コードをconvertメソッドの中に実装します。InputFormatにアクセスを必要とする全ての依存物と一緒にこのクラスはSparkジョブjarにパッケージされ、PySparkクラスパッスに含まれるようにすることを忘れないでください。

Cassandra / HBase の独自のコンバータを使ったInputFormat および OutputFormatの例は、Python examples および Converter examples を見てください。

RDD 操作

RDDは2つの種類のオペレータをサポートします: 変換、これは既存のものから新しいデータセットを生成します。actions、これはデータセット上で計算を実行した後でドライバープログラムに値を返します。例えば、mapは関数を通じて各データ要素を渡し、 結果を表す新しいRDDを返します。一方で、reduce はなんらかの関数を使ってRDDの全ての要素を集約するアクションで、最後の結果はドライバープログラムに返ります(しかし、分散したデータセットを返す 並行reduceByKey もあります)

Sparkでの全ての変換は、すぐには結果を計算しないという点で lazyです。代わりにそれらは幾つかのベースのデータセット(例えば、ファイル)に適用された変換を単に記憶します。アクションがドライバープログラムに返されるべき結果を必要とした時に、変換は単に計算されます。この設計によりSparkはもっと効率的に動作することができます。例えば、大きくマップされたデータセットではなく、mapを使って生成されたデータセットがreduce の中で使われ、reduceの結果だけがドライバーに返ることを理解できます。

デフォルトでは、それぞれの変換されたRDDはその上でアクションを実行するたびに再計算されます。しかし、persist(あるいはcacheメソッドを使ってメモリ内のRDDを永続化するかも知れません。そのような場合、Sparkは次のクエリの時にもっと早くアクセスするために、クラスタ上の要素を維持するでしょう。ディスク上、あるいは複数のノードをまたがったリプリケートのためのサポートもあります。

基本

RDDの基本を説明するために、以下の単純なプログラムを考えます:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines は単にファイルへのポインターです。二つ目の行はmap変換の結果としての lineLengths を定義します。ここでも、lazinessのために lineLengthsはすぐには計算されません。最後に、reduceを実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。

後で再びlineLengthsを使用したい場合は、以下を追加するかも知れません:

lineLengths.persist()

reduceの前に、最初にそれが計算された後でメモリ内に lineLengths が保存されるでしょう。

RDDの基本を説明するために、以下の単純なプログラムを考えます:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines は単にファイルへのポインターです。二つ目の行はmap変換の結果としての lineLengths を定義します。ここでも、lazinessのために lineLengthsはすぐには計算されません。最後に、reduceを実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。

後で再びlineLengthsを使用したい場合は、以下を追加するかも知れません:

lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduceの前に、最初にそれが計算された後でメモリ内に lineLengths が保存されるでしょう。

RDDの基本を説明するために、以下の単純なプログラムを考えます:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

最初の行は外部ファイルからの基本のRDDを定義します。このデータセットはメモリにロードされないか、あるいは以下のように振る舞います: lines は単にファイルへのポインターです。二つ目の行はmap変換の結果としての lineLengths を定義します。ここでも、lazinessのために lineLengthsはすぐには計算されません。最後に、reduceを実行します。これはアクションです。この時点でSparkは個々のマシーン上で実行するために計算をタスクに分割し、各マシーンはmapとローカルのreductionを実行し、ドライバープログラムに回答のみを返します。

後で再びlineLengthsを使用したい場合は、以下を追加するかも知れません:

lineLengths.persist()

reduceの前に、最初にそれが計算された後でメモリ内に lineLengths が保存されるでしょう。

Sparkに関数を渡す

SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。これをするには以下の2つのお勧めの方法があります:

  • 匿名関数構文、これはコードの短い断片のために使うことができます。
  • 大きなシングルトンプロジェクトの中の静的なメソッド。例えば、object MyFunctionsを定義し、以下のようにMyFunctions.func1を渡すことができます:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

(シングルトンオブジェクトとは対照的に)クラスのインスタンスの中のメソッドにリファレンスを渡すことも可能ですが、これにはメソッドと一緒にクラスを含むオブジェクトを送信する必要があります。例えば、以下を考えてみてください:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

ここで、もし新しいMyClassインスタンスを生成し、その上で doStuffを呼び出すと、その中のmap MyClass インスタンスのfunc1メソッドを参照します。つまり、オブジェクト全体がクラスタに送信される必要があります。それはrdd.map(x => this.func1(x))を書くことに似ています。

同じように、外部のオブジェクトのフィールドへのアクセスはオブジェクト全体を参照するでしょう。

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

これは rdd.map(x => this.field + x)を書くことと等価です。それは this.の全てを参照しますこの問題を避けるために、最も簡単な方法は外部的にアクセスする代わりに field をローカル変数にコピーすることです:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。Javaでは、関数はorg.apache.spark.api.java.functionパッケージ内のインタフェースを実装するクラスによって表現されます。そのような関数を生成する方法は2つあります:

  • 匿名内部クラス、あるいは名前のあるクラスのどちらかでクラスへのFunctionインタフェースを実装し、Sparkへそのインスタンスを渡します。.
  • Java8では、実装を簡潔に定義するためにlambda 表記を使います。

このガイドのほとんどは簡潔化のためにlambda構文を使用しますが、長い形式の全ての同じAPIを使うと簡単です。例えば、以下のように上のコードを書くことができます:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

あるいは、インラインの関数を書くことが不恰好であれば:

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

Javaでの匿名内部クラスはfinalの印をつけている限りスコープ内の変数にアクセスすることもできます。Sparkは他の言語のために各ワーカーノードへこれらの変数のコピーを転送するでしょう。

SparkのAPIはクラスター上で実行するためにドライバープログラム内で渡す関数に強く依存します。これをするには3つのお勧めの方法があります:

  • Lambda 表記、簡単な関数のために、表現として書くことができます。(Lambdaは、値を返す複数行の関数あるいは命令文をサポートしません)
  • 長いコードのための、Sparkに呼ばれる関数内のdef
  • モジュール内のトップレベルの関数。

例えば、lambdaを使ってサポートされる長い関数を渡すために、以下のコードを考えてみましょう:

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

(シングルトンオブジェクトとは対照的に)クラスのインスタンスの中のメソッドにリファレンスを渡すことも可能ですが、これにはメソッドと一緒にクラスを含むオブジェクトを送信する必要があります。例えば、以下を考えてみてください:

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

ここで、もしnew MyClass を生成し、その上で doStuffを呼び出すと、その中のmap MyClass インスタンスのfuncメソッドを参照します。つまり、オブジェクト全体がクラスタに送信される必要があります。

同じように、外部のオブジェクトのフィールドへのアクセスはオブジェクト全体を参照するでしょう。

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

この問題を避けるために、最も簡単な方法は外部的にアクセスする代わりに field をローカル変数にコピーすることです:

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

クロージャーの理解

Sparkについてもっと難しいことの一つは、クラスタをまたいだコードを実行する時の変数とメソッドのスコープとライフサイクルの理解です。それらのスコープの外側で変数を修正するRDDオペレーションはしばしば混乱の原因となります。以下の例の中で、カウンターを増加するために foreach()を使うコードを調べますが、同じような問題が他のオペレーションについても起こります。

下記の単純なRDD要素の合計を考えます。これは実行が同じJVM内で発生するかどうかに依存して異なる挙動をするかもしれません。これの一般的な例はSparkアプリケーションをクラスタに配備する(例えば、spark-submit を使って YARNへ)のに対してlocal モードでSparkを実行する(--master = local[n])場合です:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

ローカル vs. クラスタモード

上のコードの挙動は未定義で、意図したようには動作しないかも知れません。ジョブを実行するために、SparkはRDDのオペレーションの処理をタスクに分割します。各タスクはexecutorによって実行されます。実行に先立って、Sparkはタスクのclosureを計算します。closureはexecutorがRDD上で計算(この場合foreach())を実施するために、それらの変数とメソッドが見えなければなりません。このclosureはシリアライズ化され各executorに送信されます。

各executorに送信されたクロージャー内の変数はコピーを持ち、したがって、counterforeach 関数内で参照された場合それはドライバーノード内の counter ではないということです。ドライバーノードのメモリ内にcounterはありますが、これはexecutorにはもう見えません。executorはシリアライズ化されたclosureからのコピーだけを見ることができます。従って、counter上の全てのオペレーションはシリアライズ化されたclosure内の値を参照していたため、counterの最後の値は0のままでしょう。

ローカルモードでは、ある条件下では foreach 関数がドライバーとして同じJVM内で実際に実行し、同じ元の counterを参照するでしょう。そしてそれを実際に更新するかも知れません。

これらの種類のシナリオでよく定義された挙動を保証するには、Accumulatorを使うべきです。Sparkでのaccumulatorは、クラスタ内のワーカーノードをまたがって実行を分割する場合に、特に変数を安全に更新するための仕組みを提供するために使われます。このガイドのaccumulatorの章でこれらは詳細に議論されます。

一般的に、closure - ループあるいはローカルで定義されるメソッド、はいくつかのグローバルの状態を変化するために使われるべきではありません。Sparkはclosureの外部から参照されるオブジェクトへの突然変異の挙動を定義あるいは保証しません。これを行う幾つかのコードはローカルモードで動作するかもしれませんが、それは単なる偶然で、そのようなコードは分散モードでは期待する動作をしないでしょう。グローバルな集約が必要な場合は代わりにaccumulatorを使ってください。

RDDの要素の出力

その他の一般的な慣用は、rdd.foreach(println) あるいは rdd.map(println) を使ってRDDの要素を出力しようとすることです。単一のマシーン上で、これは期待する出力を生成し、全てのRDDの要素を出力するでしょう。しかし、クラスター モードでは、executorによって呼びだされたstdoutへの出力は代わりにexecutorのstdoutに書き込まれ、ドライバーの出力ではありません。つまりドライバー上のstdoutはそれらを表示しないでしょう。ドライバー上の全ての要素を出力するために、最初にRDDをドライバーノードに持ってくる collect() メソッドを使うことができます。したがって: rdd.collect().foreach(println)collect()はRDD全体を一つのマシーンに取り出すために、これはドライバーのメモリ不足を起こすことがありえます; もしRDDの2,3の要素を出力するとするだけであれば、より安全な方法はtake()を使うことです: rdd.take(100).foreach(println)

キーバリューペアとの連動

ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。

Scalaでは、これらの操作は、(言語内の組み込みの、単純に(a, b)と書くことで生成される)、Tuple2 オブジェクトを含むRDD上で自動的に利用可能です。キー-値ペアの操作はPairRDDFunctionsクラスの中で利用可能です。これはtupleのRDDの周りを自動的に取り囲みます。

例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey操作を使用します。

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトの配列としてそれらを返すためにcounts.collect()するために、counts.sortByKey()を使うこともできます。

注意: 独自のオブジェクトをキー-値ペアの操作として使う場合、独自の equals() メソッドがhashCode() メソッドを使って実施されるようにしなければなりません。完全な詳細は、Object.hashCode() ドキュメントの中で説明される規約を見てください。

ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。

Javaでは、キー-値ペアはScalaの標準ライブラリのscala.Tuple2クラスを使って表現されます。tupleを生成するためには単純にnew Tuple2(a, b)を呼び出し、tuple._1() および tuple._2()を使ってそのフィールドにアクセスすることができます。

キー-値ペアのRDDはJavaPairRDD クラスによって表現されます。mapToPair および flatMapToPairのようなmap操作の特別なバージョンを使って、JavaRDDからJavaPairRDDを構築することができます。JavaPairRDDは標準的なRDD関数と特別なキー-値関数の両方を持つでしょう。

例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey操作を使用します。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトの配列としてそれらを返すためにcounts.collect()するために、counts.sortByKey()を使うこともできます。

注意: 独自のオブジェクトをキー-値ペアの操作として使う場合、独自の equals() メソッドがhashCode() メソッドを使って実施されるようにしなければなりません。完全な詳細は、Object.hashCode() ドキュメントの中で説明される規約を見てください。

ほとんどのSpark操作はあらゆる種類のオブジェクトを含むRDD上で動作しますが、2,3の特別な操作はキー-値ペアのRDD上でのみ利用可能です。もっとも一般的なものは、キーを使って要素をグループあるいは集約するような分散型"shuffle"操作です。

Pythonでは、これらの操作は(1, 2)のような組み込みのPythonのtupleを含むRDD上で動作します。単純にそのようなtupleを生成し、期待する操作を呼び出します。

例えば、以下のコードはファイル内のテキストの各行が何回現れるかを数えるためにキー-値ペア上のreduceByKey操作を使用します。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

例えば、ペアをアルファベット順にソートし、最後にドライバープログラムにオブジェクトのリストとしてそれらを返すためにcounts.collect()するために、counts.sortByKey()を使うこともできます。

変換

以下の表はSparkによってサポートされる一般的な変換の幾つかをリスト表示します。詳細は、RDD API ドキュメント (Scala, Java, Python, R) および、ペアRDD関数のドキュメント(Scala, Java) を参照してください。

変換意味
map(func) func関数を使ってソース上の各要素を渡すことで構成される新しい分散型データセットを返します。
filter(func) funcがtrueを返すソース上のそれらの要素を選択することで構成される新しいデータセットを返します。
flatMap(func) mapに似ていますが、各入力項目は0あるいはもっと多くの種欝力項目にマップされるかも知れません(つまりfuncは単一の項目ではなくSeqを返すはずです)。
mapPartitions(func) mapに似ていますが、RDDの各パーティション(block)上で個々に実行します。つまりタイプTのRDDで実行する場合は、funcはIterator<T> => Iterator<U> の種類でなければなりません。
mapPartitionsWithIndex(func) mapPartitions に似ていますが、パーティションのインデックスを表す整数値を使ったfunc も提供します。つまりタイプTのRDDで実行する場合は、funcは (Int, Iterator<T>) => Iterator<U> の種類でなければなりません。
sample(withReplacement, fraction, seed) 指定されたランダム数の生成種を使って、置換有りあるいは無しで、データの断片 fractionを標本化します。
union(otherDataset) ソースデータセットと引数の中の要素の結合を含む新しいデータセットを返します。
intersection(otherDataset) ソースデータセットと引数の中の要素の積集合を含む新しいRDDを返します。
distinct([numTasks])) ソースデータセットのdistinct要素を含む新しいデータセットを返します。
groupByKey([numTasks]) (K, V) ペアのデータセット上で呼ばれた場合、(K, Iterable<V>) ペアのデータセットを返します。
注意: 各キーに対して(sumあるいはaverageのような)集約を実施するためにグルーピングをする場合は、reduceByKey あるいは aggregateByKey を使うことでより良いパフォーマンスをもたらすでしょう。
注意: デフォルトでは、出力の並行レベルは親のRDDのパーティション数に依存します。タスクの異なる数を設定するために任意の numTasks 引数を渡すことができます。
reduceByKey(func, [numTasks]) (K, V)ペアのデータセット上で呼び出されると、指定されたreduce関数funcを使って各キーの値が集約された(K, V)ペアのデータセットが返ります。これはタイプ(V, V) => V でなければなりません。groupByKeyのように、reduceタスクの数は任意の2つ目の引数を使って設定されます。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) (K, V)ペアのデータセット上で呼び出されると、指定された結合関数と中立の"zero"値を使って各キーの値が集約された(K, U)ペアのデータセットが返ります。Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. groupByKeyのように、任意の2つ目の引数を使ってreduceタスクの数が設定可能です。
sortByKey([ascending], [numTasks]) KがOrderedを実装した(K, V)ペアのデータセット上で呼びだされると、ascending引数のbooleanで指定されるように昇順あるいは降順にキーでソートされた(K, V)ペアのデータセットを返します。
join(otherDataset, [numTasks]) (K, V) と (K, W)のペアの2つのデータセットで呼ばれると、各キーの要素の全てのペアからなる (K, (V, W)) のペアのデータセットを返します。leftOuterJoin, rightOuterJoin および fullOuterJoin を使って外部結合がサポートされます。
cogroup(otherDataset, [numTasks]) タイプ(K, V)および(K, W)のデータセットで呼ばれると、 (K, (Iterable<V>, Iterable<W>)) タプルのデータセットを返します。この操作はgroupWithとも呼ばれます。
cartesian(otherDataset) タイプ T と U のデータセットで呼ばれると、(T, U) ペア(要素の全ペア)のデータセットを返します。
pipe(command, [envVars]) 例えば、Perlあるいはbashスクリプトのシェルコマンドを使ってRDDの各パーティションをパイプします。RDDの要素はプロセスのstdinおよびstdoutの行出力に書き込まれ、文字列のRDDとして返されます。
coalesce(numPartitions) RDD内のパーティションの数をnumPartitionにまで減らします。大きなデータセットをフィルターした後でより効率的に操作を実行するのに役立ちます。
repartition(numPartitions) より少なくあるいは多くのパーティションを生成するために、RDD中のデータをランダムにreshuffleし、それらの間でバランスを取ります。これは常にネットワーク越しに全てのデータをシャッフルします。
repartitionAndSortWithinPartitions(partitioner) 指定されたpartitionerに従ってRDDを再パーティションし、各結果のパーティション内で、それらのキーによってレコードをソートします。ソートをシャッフル機構の中に押し込めることができるため、これはrepartition を呼び出してそれから各パーティション内でソートするよりも効果的です。

アクション

以下のテーブルはSparkによってサポートされる一般的なアクションの幾つかをリスト表示します。詳細は、RDD API ドキュメントを参照してください(Scala, Java, Python, R)

詳細はペアRDD関数のドキュメントを参照してください(Scala, Java)。

アクション意味
reduce(func) 関数func を使ってデータセットの要素を集約します(これは2つの引数を取り、1つを返します)。関数は並列して正確に計算されるように交換可能で結合されていなければなりません。
collect() ドライバープログラムにおいて配列としてデータセットの全ての要素を返します。これは一般的にデータの十分に小さなサブセットを返すフィルタあるいは操作の後で効果的です。
count() データセット中の要素の数を返します。
first() データセットの最初の要素を返します(take(1)に似ています)。
take(n) データセットの最初のn個の要素の配列を返します。
takeSample(withReplacement, num, [seed]) 任意でランダムな数値生成種を事前指定して、置換有りあるいは無しで、データセットのnum個の要素のランダムな標本から成る配列を返します。
takeOrdered(n, [ordering]) 自然順あるいは独自の比較のどちらかを使ったRDDの最初の n個の要素を返します。
saveAsTextFile(path) データセットの要素をテキストファイル(あるいはテキストファイルのセット)として、ローカルファイルシステム、HDFS、あるいはその他のHadoopがサポートするファイルシステムの指定されたディレクトリに書き出します。Sparkは各要素ごとにそれをファイル内のテキストの行に変換するためにtoStringを呼び出すでしょう。
saveAsSequenceFile(path)
(Java および Scala)
データセットの要素をHadoop SequenceFileとして、ローカルファイルシステム、HDFS、あるいはその他のHadoopがサポートするファイルシステムの指定されたディレクトリに書き出します。これはHadoopのWritableインタフェースを実装するキー-値ペアのRDDで利用可能です。Scalaでは、明示的にWritableに変換可能なタイプでも利用可能です(Sparkは Int, Dobule, Stringなどような基本タイプのための変換を含みます)。
saveAsObjectFile(path)
(Java および Scala)
Javaのシリアライズ化を使ってデータセットの要素を単純なフォーマットで書き込みます。これはその後SparkContext.objectFile()を使ってロードすることができます。
countByKey() タイプ (K, V)のRDDにのみ利用可能です。各キーのカウントを持つ (K, Int)ペアのハッシュマップを返します。
foreach(func) データセットの各エレメントに関数func を実行します。これは通常、 Accumulatorの更新、あるいは外部ストレージシステムとの対話のような副作用のために行われます。
注意: foreachの外側でAccumulator以外の変数を修正することは未定義の挙動に繋がるかも知れません。詳細はclosureの理解 を見てください。

Spark RDD API は foreachについてのforeachAsync のような、いくつかのアクションの非同期バージョンを公開します。これらはアクションの完了を遮ることなくただちにcallerにFutureAction を返します。これはアクションの非同期実行を管理あるいは待つために使うことができます。

シャッフル操作

Sparkトリガー内の特定の操作はシャッフルとして知られるイベントを引き起こします。シャッフルはパーティションを横断して異なるグループ化をするようにデータを再配布するためのSparkの機構です。これは一般的にexecutorとマシーンをまたがったデータコピーを引き起こし、シャッフルを複雑かつコストの高いオペレーションにします。

背景

シャッフルの間に何が起きるかを理解するために、reduceByKey 操作の例を考えてことができます。reduceByKey 操作は、一つのキーについて全ての値が組として結合される新しいRDDを生成します - キーおよびそのキーに関係する全ての値に対してreduce関数を実行した結果。この試みは、一つのキーのために全ての値が同じパーティション、あるいは同じマシーン上にさえある必要はありませんが、結果を計算するために同じ場所にある必要があります。

Sparkでは、データは特定の操作のために必要な場所にあるように、パーティションを横断して分散されません。計算の間、一つのタスクは一つのパーティション上で操作されるでしょう - 従って、一つの reduceByKey reduceタスクを実行するために全てのデータを整理するには、Sparkは全対全の操作を実施する必要があります。それは全てのキーのための全ての値を見つけるために全てのパーティションから読み込む必要があり、各キーのための最終結果を計算するためにパーティションを横断して値を集めます - これはshuffleと呼ばれます。

新しくシャッフルされたデータの各パーティション内の要素のセットは決定論的なもので、それはパーティション自身の順番付けですが、それらの要素の順場付けはそうではありません。もしシャッフルに従って予想どおりに並べられたデータを希望する場合は、以下を使って行うことができます:

シャッフルを起こしえるオペレーションはrepartition および coalesceのようなrepartition オペレーション、groupByKey および reduceByKeyのような'ByKey オペレーション(カウントを除く)、そしてcogroup および joinのようなjoin オペレーションを含みます。

パフォーマンスの影響

ShuffleはディスクI/O、データのシリアライズ、ネットワークI/Oを必要とするため、高価な操作です。シャッフルのためにデータを整理するために、Sparkはタスクのセットを生成します - データを整理するためにmap タスク、そしてそれを集約するためにreduceタスクのセット。この学名はMapReduceからきていて、Sparkの mapreduce 操作には直接関係しません。

内部的には、各mapタスクからの結果はそれらが適さなくなるまでメモリ内に保持されます。そして、それらは目的のパーティションに基づいてソートされ、単一のファイルに書き込まれます。reduceの側では、タスクは関連するソートされたブロックを読みます。

特定のシャッフル操作は転送の前あるいは後にレコードを整理するためにメモリ内のデータ構造を採用しているため、大量の総ヒープメモリを消費するかも知れません。特に、reduceByKey および aggregateByKey はそれらの構造をmap側に生成し、'ByKey 操作はreduce側にそれらを生成します。データがメモリ内に収まらなくなった場合、Sparkはそれらのテーブルをディスクに流し込み、追加のディスクI/Oのオーバーヘッドとガベージコレクションの増加を起こします。

シャッフルはディスク上に大量の中間ファイルも生成します。Spack 1.3 の時点では、これらのファイルは対応するRDDがもう使われなくなりガベージコレクトされるまで保持されます。これは、系図が再計算される場合にシャッフルファイルが再生成する必要がないように行われます。もしアプリケーションがこれらのRDDへのリファレンスを維持するか、GCがしばしば起動されない場合には、ガベージコレクションはずっと後でのみ起こるかも知れません。この事は長く実行中のSparkジョブは大量のディスク領域を消費するかも知れないことを意味します。Sparkコンテキストを設定する場合に、一時ストレージディレクトリは spark.local.dir 設定パラメータによって指定されます。

シャッフルの挙動は様々な設定パラメータを調節することで調整することができます。Spark Configuration Guideの中の'シャッフルの挙動'の章を見てください。

RDD の永続性

Sparkでの最も重要な能力の一つは、操作に渡ってメモリ内のデータセットを永続化 (あるいは キャッシュ)することです。RDDを維持する場合、各ノードはメモリ内で計算する全てのパーティションを格納し、そのデータセット上(あるいはそれから派生したデータセット)での他のアクションの中でそれらを再使用します。これにより将来のアクションはもっと速くできます(時には10x以上)。キャッシングは繰り返しアルゴリズムおよび高速な対話的な使用のためにキーとなるツールです。

persist() あるいは cache() メソッドをRDDに使ってRDDを永続化されるように印をつけることができます。最初はアクション内で計算され、ノード上のメモリ内に保持されるでしょう。Sparkのキャッシュは耐障害性があります - RDDのいずれかのパーティションが喪失すると、最初にそれを生成した変換を使って自動的に再計算されます。

更に、各永続化されたRDDは異なるストレージレベルを使って格納することができ、例えば ディスク上にデータセットを永続化し、シリアライズ化されたJavaオブジェクトとして(保存領域に)メモリ内に永続化し、ノードをまたがってリプリケートすることができます。これらのレベルはStorageLevel オブジェクト (Scala, Java, Python) を persist()に渡すことで設定されます。cache() メソッドはデフォルトのストレージレベルを使うための略記で、StorageLevel.MEMORY_ONLY (メモリ内にデシリアライズさらたオブジェクトを格納する)です。ストレージレベルの完全なセットは以下の通りです:

ストレージ レベル意味
MEMORY_ONLY RDDをJVM内でデシリアライズされたJavaオブジェクトとして格納します。もしRDDがメモリ内に収まらない場合は、幾つかのパーティションがキャッシュされず、必要になるたびに毎回その場で再計算されるでしょう。これはデフォルトのレベルです。
MEMORY_AND_DISK RDDをJVM内でデシリアライズされたJavaオブジェクトとして格納します。もし、RDDがメモリ内に収まらない場合は、収まらないパーティションをディスク上に保持し、必要な場合にそこから読み込みます。
MEMORY_ONLY_SER
(Java および Scala)
RDDを シリアライズ化された Java オブジェクトとして格納します(パーティションごとに1バイトの配列)。これは、特にfast serializerを使う場合は、読むためのCPU集約が多いですが、一般的にデシリアライズされたオブジェクトよりも容量効率が良いです。
MEMORY_AND_DISK_SER
(Java および Scala)
MEMORY_ONLY_SERと似てますが、メモリに収まらないパーティションは必要になる度にその場で再計算するのではなく、ディスクに流し入れます。
DISK_ONLY RDDパーティションをディスク上にのみ格納します。
MEMORY_ONLY_2, MEMORY_AND_DISK_2 など 上のレベルと同じですが、2つのクラスタノードに各パーティションをレプリケートします。
OFF_HEAP (experimental) MEMORY_ONLY_SER に似ていますが、データを off-heap メモリに格納します。これには off-heap メモリを有効にする必要があります。

注意: Pythonでは、格納されたオブジェクトは常に Pickleライブラリを使ってシリアライズされます。そのためシリアライズ化レベルを選択するかどうかは問題になりません。Pythonで利用可能なストレージレベルには、MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY および DISK_ONLY_2が含まれます。

Sparkはシャッフル操作の中で自動的に幾つかの中間データも永続化します(例えば、reduceByKey); ユーザがpersistを呼ばない場合でも。ノードがシャッフル中に故障した場合に全ての入力を再計算することを避けるために行われます。それでも、結果のRDDを再利用する場合は、ユーザが結果のRDDで persist を呼び出すことをお勧めします。

どっちのストレージレベルを選ぶか?

Sparkのストレージレベルはメモリ利用率とCPU効率の間で異なるトレードオフを提供するためのものです。選択するには以下の過程を経ることをお勧めします:

データの削除

Sparkは自動的に各ノード上のキャッシュの使用率を監視し、least-recently-used (LRU) 形式で古いデータのパーティションを削除します。キャッシュからRDDが抜け落ちるのを待つ代わりに手動でRDDを削除したい場合は、RDD.unpersist() メソッドを使います。

共有変数

通常、(map あるいは reduceのような)Spark操作に渡される関数は遠隔のクラスタノード上で実行される場合、関数内で使われる全ての変数のそれぞれのコピー上で動作します。これらの変数は各マシーンにコピーされ、リモートのマシーン上の変数への更新はドライバープログラムへ伝播されません。タスクを横断する一般的なread-write共有変数のサポートは効果的では無いでしょう。しかし、Sparkは2つの共通の使用パターンのための2つの制限された共有変数の種類を提供します: ブロードキャスト変数とaccumulatorです。

ブロードキャスト変数

ブロードキャスト変数によってプログラマはタスクによってコピーを発送するのではなく、各マシーン上にキャッシュされたread-only変数を維持することができます。それらは、例えば各ノードに多くの入力データセットのコピーを効率的な方法で渡すために使うことができます。Sparkは通信のコストを下げるために効果的なブロードキャストアルゴリズムを使ってブロードキャスト変数を分散しようともします。

Sparkのアクションは分散した"シャッフル"操作によって分割されたステージのセットによって実行されます。Sparkは各ステージ内でタスクによって必要とされる共通データを自動的にブロードキャストします。この方法でブロードキャストされたデータはシリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされます。これは明示的なブロードキャスト変数の生成は複数のステージを横断したタスクが同じデータを必要とするか、デシリアライズ形式のデータのキャッシュが重要な場合にのみ有用なことを意味します。

ブロードキャスト変数はSparkContext.broadcast(v)を呼ぶことで変数vから生成されます。ブロードキャスト変数は vのラッパーで、その値はvalueメソッドを呼ぶことでアクセスすることができます。以下のコードはこのことを示します:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

ブロードキャスト変数が生成された後で、vが1度以上ノードに送信されないように、クラスタ上で実行される全ての関数内で値vの代わりに使われる必要があります。その上、オブジェクト v は全てのノードがブロードキャスト変数と同じ値を取得できるようにブロードキャストされた後で修正してはいけません(例えば、変数は新しいノードへ後で送信される場合)。

集約器

accumulatorは結合および交換操作を使ってのみ"追加"される変数で、したがってaccumulatorは効率的に並行をサポートします。それらは(MapReduceのような)カウンターあるいは集計を実装するために使うことができます。Sparkは本来数字タイプのaccumulatorをサポートし、プログラマーは新しいタイプのサポートを追加することができます。

accumulatorが名前付きで生成されると、それらはSparkのUIの中で表示することができます。これは実行中のステージの進捗を理解するために有用かもしれません(注意: これはPythonではまだサポートされていません)。

Spark UIでの集約器

accumulatorはSparkContext.accumulator(v)を呼ぶことで初期値vから生成されます。クラスタ上で実行中のタスクはaddメソッドあるいは += オペレータ(ScalaおよびPython)を使うことでクラスタに追加することができます。しかし、それらはその値を読むことができません。ドライバープログラムだけがvalueメソッドを使ってaccumulatorの値を読むことができます。

以下のコードは配列の要素を追加するために使われるaccumulatorを表します:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

このコードはタイプLongのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorV2のサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorV2 abstract クラスは上書きされるべきいくつかのメソッドを持ちます: accumulatorを0にリセットするためのreset、accumulatorに他の値を追加するためのadd、他の同じタイプのaccumulatorを1つにマージするためのmergeです。上書きする必要がある他のメソッドはscalaのAPIドキュメントで参照することができます。例えば、数学的なベクターを表す MyVectorクラスを持つとした場合、以下のように書けます:

object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  val vec_ : MyVector = MyVector.createZeroVector
  def reset(): MyVector = {
    vec_.reset()
  }
  def add(v1: MyVector, v2: MyVector): MyVector = {
    vec_.add(v2)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

プログラマが独自のタイプのAccumulatorV2を定義する場合、結果のタイプは同じになるか、あるいは要素の追加で違うものになるかも知れないことに注意してください。

LongAccumulator accum = sc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

このコードはタイプIntegerのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorParamのサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorParam インタフェースは2つのメソッドを持ちます: データタイプのための"zero値"の提供のためのzero、および2つの値を一緒に追加するためのaddInPlace。例えば、数学的なベクターを表すVectorクラスを持つとした場合、以下のように書けます:

class VectorAccumulatorParam implements AccumulatorParam<Vector> {
  public Vector zero(Vector initialValue) {
    return Vector.zeros(initialValue.size());
  }
  public Vector addInPlace(Vector v1, Vector v2) {
    v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

Javaにおいて、Sparkは、結果のタイプが追加された要素と同じでは無い場所でデータを集めるためのもっと一般的なAccumulable インタフェース(つまり、要素を集めることでリストを構築する)も提供します。

>>> accum = sc.accumulator(0)
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10

このコードはタイプIntのaccumulatorのための組み込みのサポートに使われますが、プログラマは AccumulatorParamのサブクラス化をすることで独自のタイプを生成することもできます。AccumulatorParam インタフェースは2つのメソッドを持ちます: データタイプのための"zero値"の提供のためのzero、および2つの値を一緒に追加するためのaddInPlace。例えば、数学的なベクターを表すVectorクラスを持つとした場合、以下のように書けます:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

actions onlyの中で実施されるaccumulatorの更新のために、Sparkはaccumulatorへの各タスクの更新を1度だけ適用されるようにするでしょう。つまり、再起動されたタスクはその値を更新しないでしょう。変換の中で、もしタスクあるいはジョブのステージが再実行される場合は、ユーザは各タスクの更新が一度以上適用されるかも知れないことに気づいてください。

accumulator はSparkのlazy評価モデルを変更しません。もしそれらがRDD上の操作内で更新された場合は、一旦RDDがアクションの一部として計算されるとそれらの値が更新されます。結果として、accumulatorの更新は map()のようなlazy変換の中で行われる場合に実行されることが保証されません。以下のコードの断片はこの性質を実証します:

val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = sc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
accum = sc.accumulator(0)
def g(x):
  accum.add(x)
  return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

クラスタの配備

アプリケーション サブミット ガイド はクラスタにアプリケーションをサブミットする方法をせつめいします。簡単にいうと、一旦アプリケーションをJAR(Java/Scalaの場合)、あるいは .py または .zipファイル(Pythonの場合)にパッケージすると、bin/spark-submit スクリプトは任意のサポートされるクラスタマネージャーへそれをサブミットすることができます。

Java / Scala からのSparkジョブの起動

org.apache.spark.launcher パッケージはSparkジョブを単純なJava APIを使用する子プロセスとして起動するためのクラスを提供します。

ユニットテスト

Sparkは一般的なユニットテストフレームワークを使ってユニットテストをし易いです。マスターURLをlocalに設定してテスト内でSparkContextを単純に作成し、操作を実行し、取り壊すためにSparkContext.stop()を呼びます。Sparkは同じプログラム内で並行した2つのコンテキストの実行をサポートしないため、finally ブロックあるいは、テストフレームワークのtearDownメソッド内でコンテキストを必ず停止するようにしてください。

Spark のpre-1.0バージョンからの移設

Spark 1.0 はSparkコアのAPIを 1.X系列のためにフリーズします。ですので、"実験的"あるいは"開発者API"と印が付けられていない今利用可能な全てのAPIは将来のバージョンでサポートされるでしょう。Scalaユーザのための唯一の変更はグループ化の操作です。例えば groupByKey, cogroup および joinは (Key, Seq[Value]) ペアを返すことから (Key, Iterable[Value])を返すように変わりました。

Spark 1.0 はSparkコアのAPIを 1.X系列のためにフリーズします。ですので、"実験的"あるいは"開発者API"と印が付けられていない今利用可能な全てのAPIは将来のバージョンでサポートされるでしょう。幾つかの変換がJava APIに行われました:

  • org.apache.spark.api.java.function内のFunctionクラスは1.0でインタフェースになりました。これはFunctionを継承する古いコードは代わりに Functionを継承 しなければなりません。
  • mapToPairmapToDoubleのように、map変換の新しい変数が特別なデータタイプのRDDを生成するために追加されました。
  • groupByKey, cogroup および join のようなグルーピング操作は、(Key, List<Value>)ペアを返すことから (Key, Iterable<Value>)を返すように変わりました。

Spark 1.0 はSparkコアのAPIを 1.X系列のためにフリーズします。ですので、"実験的"あるいは"開発者API"と印が付けられていない今利用可能な全てのAPIは将来のバージョンでサポートされるでしょう。Pythonユーザのための唯一の変更はグループ化の操作です。例えば groupByKey, cogroup および joinは、(key, list of values) ペアを返すことから (key, iterable of values) を返すように変わりました。

Spark Streaming, MLlib および GraphX のための移行ガイドも利用可能です。

この後どうすればいいか

Spark webサイト上で幾つかの Spark プログラムの例を見つけることができます。更に、Sparkはexamples ディレクトリ (Scala, Java, Python, R) の中に、幾つかの例を含んでいます。Sparkのbin/run-exampleスクリプトにクラス名を渡すことで、JavaおよびScalaの例を実行することができます; 例えば:

./bin/run-example SparkPi

Pythonの例のためには、代わりに spark-submit を使ってください:

./bin/spark-submit examples/src/main/python/pi.py

Rの例のためには、代わりにspark-submit を使ってください:

./bin/spark-submit examples/src/main/r/dataframe.R

プログラムの最適化の手助けのために、設定 およびチューニングガイドはベストプラクティスの情報を提供します。データが効果的な形式でメモリ内に格納されるには、それらは特に重要です。デプロイの手助けのために、クラスタモードの概要 が分散化操作およびクラスタマネージャーのサポートを起こすコンポーネントを説明します。

最後に、完全なAPIドキュメントはScala, Java, Python および Rで利用可能です。

TOP
inserted by FC2 system