クイックスタート
このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。
このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。
Spark 2.0 より前では、Sparkの主なプログラミング インタフェースは Resilient Distributed Dataset (RDD) であったことに注意してください。2.0 より後では、RDDはデータセットに置き換えられました。これはRDDのような強い型がありますが、その背後ではもっと豊富な最適化を持ちます。RDDインタフェースはまだサポートされており、もっと完全なリファレンスを RDD プログラミング ガイドで取得することができます。しかし、データセットに切り替えることをお勧めします。RDDより良いパフォーマンスを持ちます。データセットについてのもっと詳しい情報はSQL プログラミング ガイド を見てください。
Spark シェルを使った対話的な解析
基本
SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:
./bin/spark-shell
Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいデータセットを作成してみましょう:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
なんらかのアクションを呼ぶことで直接データセットから値を取得、あるいは新しいデータセットを取得するためにデータセットを変換することができます。詳細はAPI ドキュメントを読んでください。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
では、このデータセットを新しいものに変換してみましょう。ファイル内の項目のサブセットを持つ新しいデータセットを返すfilter
を呼びます。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
変換とアクションを繋げることができます:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/pyspark
Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。Pythonの動的な性質のために、データセットをPythonでの強い型にする必要はありません。結果として、Pythonでの全てのデータセットは Dataset[Row] で、PandaおよびRでのデータフレームの概念と一致する DataFrame
と呼びます。新しいデータフレームをSparkソースディレクトリ内のREADMEのテキストから作成してみましょう:
>>> textFile = spark.read.text("README.md")
なんらかのアクションを呼ぶことで直接データフレームから値を取得、あるいは新しいデータフレームを取得するためにデータフレームを変換することができます。詳細はAPI ドキュメントを読んでください。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
では、このデータフレームを新しいものに変換してみましょう。ファイル内の行のサブセットを持つ新しいデータフレームを返すfilter
を呼びます。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
変換とアクションを繋げることができます:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
データセット走査の詳細
データセットのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
この一番最初のものは、新しいデータセットを生成して行を整数に写像します。reduce
は最も大きな単語のカウントを見つけるためにデータセット上で呼ばれます。map
および reduce
への引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()
関数を使おうと思います:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
ここで、行のデータセットを単語のデータセットに変換するために flatMap
を呼び、ファイル中の単語あたりのカウントを (String, Long) のペアのデータセットとして計算するために groupByKey
と count
を呼びます。シェル内で単語のカウントを集めるために、collect
を呼ぶことができます:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]
これの最初は行を整数値にマップし、新しいデータフレームを作りながら “numWords” として別の名前を付けます。agg
は最も大きな単語のカウントを見つけるためにデータフレーム上で呼ばれます。select
および agg
の引数は共に Columnで、データフレームからカラムを取得するために df.colName
を使うことができます。pyspark.sql.functions もインポートすることができます。これは古いカラムから新しいカラムを構築するための多くの便利な関数を提供します。
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count()
Here, we use the explode
function in select
, to transfrom a Dataset of lines to a Dataset of words, and then combine groupBy
and count
to compute the per-word counts in the file as a DataFrame of 2 columns: “word” and “count”. シェル内で単語のカウントを集めるために、collect
を呼ぶことができます:
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
キャッシング
Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark
データセットがキャッシュされるように印を付けてみましょう。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミング ガイド で説明されるように、bin/spark-shell
をクラスタに接続することで対話的にこれを行うこともできます。
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミングガイドで説明されるように、クラスタにbin/pyspark
を接続することで対話的にこれをすることもできます。
自己内包したアプリケーション
SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python での簡単なアプリケーションを見てみましょう。
Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scala
という名前です:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
アプリケーションはscala.App
を拡張する代わりにmain()
メソッドを定義すべきであることに注意してください。scala.App
のサブクラスはおそらく正しく動かないでしょう。
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。
[[SparkSession]] を構築するために SparkSession.builder
を呼び出し、続いてアプリケーション名を設定し、最後に [[SparkSession]] インスタンスを取得するために getOrCreate
を呼びます。
アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。build.sbt
はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
stbが正しく動作するには、SimpleApp.scala
とbuild.sbt
を代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit
スクリプトを使うことができます。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。
とても簡単なSparkアプリケーション SimpleApp.java
を作成します:
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read.textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。
プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xml
ファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit
を使って実行します。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。
例として、簡単なSparkアプリケーション、 SimpleApp.py
を作成します:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、データセットを作成するためにSparkSession を使用します。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit
の--py-files
に渡すことで、それらを追加することもできます。SimpleApp
はどのようなコードの依存を指定する必要もないくらい簡単なものです。
このアプリケーションを bin/spark-submit
スクリプトを使って実行することができます:
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
この後どうすればいいか
初めてのSparkアプリケーションの実行おめでとうございます!
- APIのもっと深い概要は、RDD プログラミングガイド および SQL プログラミングガイド、あるいは他のコンポーネントの“プログラミング ガイド" メニューを見てください。
- クラスタ上でアプリケーションを実行するには、配備の概要に行って下さい。
- 最後に、Sparkは
examples
ディレクトリに幾つかの例があります (Scala, Java, Python, R)。以下のようにしてそれらを実行することができます:
# For Scala and Java, use run-example:
./bin/run-example SparkPi
# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py
# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R