クイックスタート
このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。完全なリファレンスはプログラミングガイドを見てください。
このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。
Spark シェルを使った対話的な解析
基本
SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:
./bin/spark-shell
Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:
RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:
次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter
変換を使うつもりです。
変換とアクションを繋げることができます:
./bin/pyspark
Sparkの主要な抽象概念は Resilient Distributed Dataset (RDD) と呼ばれる項目の分散コレクションです。RDDはHadoopの(HDFSファイルのような)InputFormatあるいは他のRDDから変換して生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいRDDを作成してみましょう:
RDDには値を返すactionsと、新しいRDDへのポインタを返す transformationsがあります。幾つかのアクションを使ってみましょう:
次に変換を使ってみましょう。ファイル内の項目のサブセットの新しいRDDを返す filter
変換を使うつもりです。
変換とアクションを繋げることができます:
RDD操作の詳細
RDDのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:
この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce
は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map
および reduce
への引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()
関数を使おうと思います:
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
ここでは、(String, Int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap
, map
および reduceByKey
変換を組み合わせました。シェル内で単語の数を集めるために、collect
アクションを使うことができます:
この一番最初のものは、新しいRDDを生成して行を整数に写像します。reduce
は最も大きな行のカウントを見つけるためにRDD上で呼ばれます。map
および reduce
の引数はPhtyonの anonymous functions (lambdas)ですが、使いたいどのようなトップレベルのPython関数にでも渡すことができます。例えば、このコードを分かりやすくするために、 max
関数を定義します:
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
ここでは、(string, int)ペアのRDDとしてファイル内の単語辺りの数を計算するために、flatMap
, map
および reduceByKey
変換を組み合わせました。シェル内で単語の数を集めるために、collect
アクションを使うことができます:
キャッシング
Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark
データセットがキャッシュされるように印を付けてみましょう。
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/spark-shell
を接続することで対話的にこれをすることもできます。
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。プログラミングガイドで説明されるように、クラスタにbin/pyspark
を接続することで対話的にこれをすることもできます。
自己内包したアプリケーション
SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python での簡単なアプリケーションを見てみましょう。
Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scala
という名前です:
アプリケーションはscala.App
を拡張する代わりにmain()
メソッドを定義すべきであることに注意してください。scala.App
のサブクラスはおそらく正しく動かないでしょう。
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkContextを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkContextを初期化します。
SparkContextのコンストラクタにアプリケーションの情報を含むSparkConf オブジェクトを渡します。
アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。simple.sbt
はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:
stbが正しく動作するには、SimpleApp.scala
とsimple.sbt
を代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit
スクリプトを使うことができます。
この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。
とても簡単なSparkアプリケーション SimpleApp.java
を作成します:
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。Scalaの例のようにSparkContextを初期化しますが、Javaと付き合いやすいようにJavaSparkContext
クラスを使用します。(JavaRDD
で表現される)RDDも作成し、それらに変換を実行します。最後に、 spark.api.java.function.Function
を継承したクラスを作成することでSparkに関数を渡します。これらの違いについての詳細はSpark プログラミングガイドで説明します。
プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xml
ファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。
Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。
これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit
を使って実行します。
それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。
例として、簡単なSparkアプリケーション、 SimpleApp.py
を作成します:
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、RDDを作成するためにSparkContextを使用します。Pythonの関数をSparkに渡すことができ、それは自動的にそれらが参照している全ての変数とともにシリアライズされます。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit
の--py-files
に渡すことで、それらを追加することもできます。SimpleApp
はどのようなコードの依存を指定する必要もないくらい簡単なものです。
このアプリケーションを bin/spark-submit
スクリプトを使って実行することができます:
この後どうすればいいか
初めてのSparkアプリケーションの実行おめでとうございます!
- APIのもっと深い概要は、Spark プログラミングガイドあるいは他のコンポーネントの“プログラミング ガイド" メニューを見てください。
- クラスタ上でアプリケーションを実行するには、配備の概要に行って下さい。
- 最後に、Sparkは
examples
ディレクトリに幾つかの例があります (Scala, Java, Python, R)。以下のようにしてそれらを実行することができます: