クイックスタート
このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。
このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。
Spark 2.0 より前では、Sparkの主なプログラミング インタフェースは Resilient Distributed Dataset (RDD) であったことに注意してください。2.0 より後では、RDDはデータセットに置き換えられました。これはRDDのような強い型がありますが、その背後ではもっと豊富な最適化を持ちます。RDDインタフェースはまだサポートされており、もっと詳細なリファレンスを RDD プログラミング ガイドで取得することができます。しかし、データセットに切り替えることをお勧めします。RDDより良いパフォーマンスを持ちます。データセットについてのもっと詳しい情報はSQL プログラミング ガイド を見てください。
Spark シェルを使った対話的な解析
基本
SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:
./bin/spark-shell
Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいデータセットを作成してみましょう:
なんらかのアクションを呼ぶことで直接データセットから値を取得、あるいは新しいデータセットを取得するためにデータセットを変換することができます。詳細はAPI ドキュメントを読んでください。
では、このデータセットを新しいものに変換してみましょう。ファイル内の項目のサブセットを持つ新しいデータセットを返すfilter
を呼びます。
変換とアクションを繋げることができます:
./bin/pyspark
もしPySparkが現在の環境にpipを使ってインストールされた場合:
pyspark
Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。Pythonの動的な性質のために、データセットをPythonでの強い型にする必要はありません。結果として、Pythonでの全てのデータセットは Dataset[Row] で、PandaおよびRでのデータフレームの概念と一致する DataFrame
と呼びます。新しいデータフレームをSparkソースディレクトリ内のREADMEのテキストから作成してみましょう:
なんらかのアクションを呼ぶことで直接データフレームから値を取得、あるいは新しいデータフレームを取得するためにデータフレームを変換することができます。詳細はAPI ドキュメントを読んでください。
では、このデータフレームを新しいものに変換してみましょう。ファイル内の行のサブセットを持つ新しいデータフレームを返すfilter
を呼びます。
変換とアクションを繋げることができます:
データセット走査の詳細
データセットのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:
この一番最初のものは、新しいデータセットを生成して行を整数に写像します。reduce
は最も大きな単語のカウントを見つけるためにデータセット上で呼ばれます。map
および reduce
への引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()
関数を使おうと思います:
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
ここで、行のデータセットを単語のデータセットに変換するために flatMap
を呼び、ファイル中の単語あたりのカウントを (String, Long) のペアのデータセットとして計算するために groupByKey
と count
を呼びます。シェル内で単語のカウントを集めるために、collect
を呼ぶことができます:
これの最初は行を整数値にマップし、新しいデータフレームを作りながら “numWords” として別の名前を付けます。agg
は最も大きな単語のカウントを見つけるためにデータフレーム上で呼ばれます。select
および agg
の引数は共に Columnで、データフレームからカラムを取得するために df.colName
を使うことができます。pyspark.sql.functions もインポートすることができます。これは古いカラムから新しいカラムを構築するための多くの便利な関数を提供します。
Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:
ここで、行のデータセットを単語のデータセットに変換するためにselect
内でexplode
関数を使い、2つのカラムのデータフレームとしてファイル内で単語ごとのカウントを計算するためにgroupBy
と count
を組み合わせます: “word” と “count”。シェル内で単語のカウントを集めるために、collect
を呼ぶことができます:
キャッシング
Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark
データセットがキャッシュされるように印を付けてみましょう。
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミング ガイド で説明されるように、bin/spark-shell
をクラスタに接続することで対話的にこれを行うこともできます。
100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミングガイドで説明されるように、クラスタにbin/pyspark
を接続することで対話的にこれをすることもできます。
自己内包したアプリケーション
SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python (pip)での簡単なアプリケーションを見てみましょう。
Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scala
という名前です:
アプリケーションはscala.App
を拡張する代わりにmain()
メソッドを定義すべきであることに注意してください。scala.App
のサブクラスはおそらく正しく動かないでしょう。
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。
SparkSession
を構築するために SparkSession.builder
を呼び出し、続いてアプリケーション名を設定し、最後に SparkSession
インスタンスを取得するために getOrCreate
を呼びます。
アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。build.sbt
はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:
stbが正しく動作するには、SimpleApp.scala
とbuild.sbt
を代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit
スクリプトを使うことができます。
この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。
とても簡単なSparkアプリケーション SimpleApp.java
を作成します:
このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。
プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xml
ファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。
Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。
これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit
を使って実行します。
それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。
パッケージ化されたアプリケーションあるいはライブラリを構築している場合、以下のようにそれをsetup.pyファイルに追加することができます:
例として、簡単なSparkアプリケーション、 SimpleApp.py
を作成します:
このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、データセットを作成するためにSparkSession を使用します。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit
の--py-files
に渡すことで、それらを追加することもできます。SimpleApp
はどのようなコードの依存を指定する必要もないくらい簡単なものです。
このアプリケーションを bin/spark-submit
スクリプトを使って実行することができます:
環境にpipインストールしたPySparkがある場合(つまり pip install pyspark
)、通常のPythonインタプリタあるいを使ってアプリケーションを実行、あるいは好きな提供された ‘spark-submit’ を使うことができます。
Other dependency management tools such as Conda and pip can be also used for custom classes or third-party libraries. See also Python Package Management.
この後どうすればいいか
初めてのSparkアプリケーションの実行おめでとうございます!
- APIのもっと深い概要は、RDD プログラミングガイド および SQL プログラミングガイド、あるいは他のコンポーネントの“プログラミング ガイド" メニューを見てください。
- クラスタ上でアプリケーションを実行するには、配備の概要に行って下さい。
- 最後に、Sparkは
examples
ディレクトリに幾つかの例があります (Scala, Java, Python, R)。以下のようにしてそれらを実行することができます: