クイックスタート
このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。完全なリファレンスはプログラミングガイドを見てください。
このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。
Spark シェルを使った対話的な解析
基本
SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:
./bin/spark-shell
Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:
scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:
scala> textFile.count() // Number of items in this RDD
res0: Long = 126
scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark
次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter
変換を使うつもりです。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
変換とアクションを繋げることができます:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/pyspark
Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:
>>> textFile = sc.textFile("README.md")
RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:
>>> textFile.count() # Number of items in this RDD
126
>>> textFile.first() # First item in this RDD
u'# Apache Spark'
次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter
変換を使うつもりです。
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
変換とアクションを繋げることができます:
>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?
15
RDD操作の詳細
RDDのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce
は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map
および reduce
への引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()
関数を使おうと思います:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
ここでは、(String, Int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap
, map
および reduceByKey
変換を組み合わせました。シェル内で単語の数を集めるために、collect
アクションを使うことができます:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15
この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce
は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map
および reduce
の引数はPhtyonの anonymous functions (lambdas)ですが、使いたいどのようなトップレベルのPython関数にでも渡すことができます。例えば、このコードを分かりやすくするために、 max
関数を定義します:
>>> def max(a, b):
... if a > b:
... return a
... else:
... return b
...
>>> textFile.map(lambda line: len(line.split())).reduce(max)
15
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
ここでは、(string, int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap
, map
および reduceByKey
変換を組み合わせました。シェル内で単語の数を集めるために、collect
アクションを使うことができます:
>>> wordCounts.collect()
[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]
キャッシング
Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark
データセットがキャッシュされるように印を付けてみましょう。
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 19
scala> linesWithSpark.count()
res9: Long = 19
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/spark-shell
を接続することで対話的にこれをすることもできます。
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
19
>>> linesWithSpark.count()
19
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/pyspark
を接続することで対話的にこれをすることもできます。
自己内包したアプリケーション
SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python での簡単なアプリケーションを見てみましょう。
Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scala
という名前です:
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
アプリケーションはscala.App
を拡張する代わりにmain()
メソッドを定義すべきであることに注意してください。scala.App
のサブクラスはおそらく正しく動かないでしょう。
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkContextを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkContextを初期化します。
SparkContextのコンストラクタにアプリケーションの情報を含むSparkConf オブジェクトを渡します。
アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。simple.sbt
はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
stbが正しく動作するには、SimpleApp.scala
とsimple.sbt
を代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit
スクリプトを使うことができます。
# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23
この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。
とても簡単なSparkアプリケーション SimpleApp.java
を作成します:
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
}
}
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。Scalaの例のようにSparkContextを初期化しますが、Javaと付き合いやすいようにJavaSparkContext
クラスを使用します。(JavaRDD
で表現される)RDDも作成し、それらに変換を実行します。最後に、 spark.api.java.function.Function
を継承したクラスを作成することでSparkに関数を渡します。これらの違いについての詳細はSpark プログラミングガイドで説明します。
プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xml
ファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit
を使って実行します。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。
例として、簡単なSparkアプリケーション、 SimpleApp.py
を作成します:
"""SimpleApp.py"""
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、RDDを作成するためにSparkContextを使用します。Pythonの関数をSparkに渡すことができ、それは自動的にそれらが参照している全ての変数とともにシリアライズされます。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit
の--py-files
に渡すことで、それらを追加することもできます。SimpleApp
はどのようなコードの依存を指定する必要もないくらい簡単なものです。
このアプリケーションを bin/spark-submit
スクリプトを使って実行することができます:
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
この後どうすればいいか
初めてのSparkアプリケーションの実行おめでとうございます!
- APIのもっと深い概要は、Spark プログラミングガイドあるいは他のコンポーネントの“プログラミング ガイド” メニューを見てください。
- クラスタ上でアプリケーションを実行するには、配備の概要に行って下さい。
- 最後に、Sparkは
examples
ディレクトリに幾つかの例があります (Scala, Java, Python, R)。以下のようにしてそれらを実行することができます:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R