クイックスタート

このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。完全なリファレンスはプログラミングガイドを見てください。

このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。

Spark シェルを使った対話的な解析

基本

SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:

./bin/spark-shell

Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter 変換を使うつもりです。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

変換とアクションを繋げることができます:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/pyspark

Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:

>>> textFile = sc.textFile("README.md")

RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:

>>> textFile.count() # Number of items in this RDD
126

>>> textFile.first() # First item in this RDD
u'# Apache Spark'

次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter 変換を使うつもりです。

>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)

変換とアクションを繋げることができます:

>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"?
15

RDD操作の詳細

RDDのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map および reduceへの引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()関数を使おうと思います:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

ここでは、(String, Int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap, map および reduceByKey 変換を組み合わせました。シェル内で単語の数を集めるために、collect アクションを使うことができます:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
15

この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map および reduce の引数はPhtyonの anonymous functions (lambdas)ですが、使いたいどのようなトップレベルのPython関数にでも渡すことができます。例えば、このコードを分かりやすくするために、 max 関数を定義します:

>>> def max(a, b):
...     if a > b:
...         return a
...     else:
...         return b
...

>>> textFile.map(lambda line: len(line.split())).reduce(max)
15

Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:

>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

ここでは、(string, int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap, map および reduceByKey 変換を組み合わせました。シェル内で単語の数を集めるために、collect アクションを使うことができます:

>>> wordCounts.collect()
[(u'and', 9), (u'A', 1), (u'webpage', 1), (u'README', 1), (u'Note', 1), (u'"local"', 1), (u'variable', 1), ...]

キャッシング

Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark データセットがキャッシュされるように印を付けてみましょう。

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/spark-shell を接続することで対話的にこれをすることもできます。

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
19

>>> linesWithSpark.count()
19

100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/pyspark を接続することで対話的にこれをすることもできます。

自己内包したアプリケーション

SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python での簡単なアプリケーションを見てみましょう。

Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scalaという名前です:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

アプリケーションはscala.Appを拡張する代わりにmain()メソッドを定義すべきであることに注意してください。scala.Appのサブクラスはおそらく正しく動かないでしょう。

このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkContextを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkContextを初期化します。

SparkContextのコンストラクタにアプリケーションの情報を含むSparkConf オブジェクトを渡します。

アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。simple.sbt はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

stbが正しく動作するには、SimpleApp.scalasimple.sbtを代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit スクリプトを使うことができます。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。

とても簡単なSparkアプリケーション SimpleApp.java を作成します:

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。Scalaの例のようにSparkContextを初期化しますが、Javaと付き合いやすいようにJavaSparkContext クラスを使用します。(JavaRDDで表現される)RDDも作成し、それらに変換を実行します。最後に、 spark.api.java.function.Function を継承したクラスを作成することでSparkに関数を渡します。これらの違いについての詳細はSpark プログラミングガイドで説明します。

プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xmlファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
  </dependencies>
</project>

Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit を使って実行します。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。

例として、簡単なSparkアプリケーション、 SimpleApp.pyを作成します:

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、RDDを作成するためにSparkContextを使用します。Pythonの関数をSparkに渡すことができ、それは自動的にそれらが参照している全ての変数とともにシリアライズされます。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit--py-filesに渡すことで、それらを追加することもできます。SimpleAppはどのようなコードの依存を指定する必要もないくらい簡単なものです。

このアプリケーションを bin/spark-submit スクリプトを使って実行することができます:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

この後どうすればいいか

初めてのSparkアプリケーションの実行おめでとうございます!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
TOP
inserted by FC2 system