クイックスタート

このチュートリアルはSparkの使用の素早い導入を提供します。まずSparkの対話的シェル(PythonあるいはScala)を使ってAPIを紹介し、Java, ScalaおよびPythonでアプリケーションを書く方法を示します。

このガイドに従っていくには、まず Spark webサイトからSparkのパッケージ化されたリリースをダウンロードします。HDFSを使わないため、Hadoopのどのバージョンのパッケージでもダウンロードすることができます。

Spark 2.0 より前では、Sparkの主なプログラミング インタフェースは Resilient Distributed Dataset (RDD) であったことに注意してください。2.0 より後では、RDDはデータセットに置き換えられました。これはRDDのような強い型がありますが、その背後ではもっと豊富な最適化を持ちます。RDDインタフェースはまだサポートされており、もっと詳細なリファレンスを RDD プログラミング ガイドで取得することができます。しかし、データセットに切り替えることをお勧めします。RDDより良いパフォーマンスを持ちます。データセットについてのもっと詳しい情報はSQL プログラミング ガイド を見てください。

Spark シェルを使った対話的な解析

基本

SparkのシェルはAPIを学ぶための簡単な方法を提供し、データの解析を対話的に行う強力なツールです。Scala(Java VM上で動作し、従って既存のJavaライブラリを使うのが良い方法です)あるいはPythonのどちらからで利用することができます。Sparkディレクトリで以下のように実行して始めます:

./bin/spark-shell

Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。SparkソースディレクトリのREADMEファイルのテキストから新しいデータセットを作成してみましょう:

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

なんらかのアクションを呼ぶことで直接データセットから値を取得、あるいは新しいデータセットを取得するためにデータセットを変換することができます。詳細はAPI ドキュメントを読んでください。

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

では、このデータセットを新しいものに変換してみましょう。ファイル内の項目のサブセットを持つ新しいデータセットを返すfilter を呼びます。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

変換とアクションを繋げることができます:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
./bin/pyspark

もしPySparkが現在の環境にpipを使ってインストールされた場合:

pyspark

Sparkの一番重要な抽象化はデータセットと呼ばれる項目の分散コレクションです。データセットは(HDFSファイルのような)Hadoop InputFormat あるいは他のデータセットからの変換によって生成することができます。Pythonの動的な性質のために、データセットをPythonでの強い型にする必要はありません。結果として、Pythonでの全てのデータセットは Dataset[Row] で、PandaおよびRでのデータフレームの概念と一致する DataFrame と呼びます。新しいデータフレームをSparkソースディレクトリ内のREADMEのテキストから作成してみましょう:

>>> textFile = spark.read.text("README.md")

なんらかのアクションを呼ぶことで直接データフレームから値を取得、あるいは新しいデータフレームを取得するためにデータフレームを変換することができます。詳細はAPI ドキュメントを読んでください。

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

では、このデータフレームを新しいものに変換してみましょう。ファイル内の行のサブセットを持つ新しいデータフレームを返すfilter を呼びます。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

変換とアクションを繋げることができます:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

データセット走査の詳細

データセットのアクションと変換はもっと複雑な計算に使うことができます。最も単語が多い行を見つけてみましょう:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

この一番最初のものは、新しいデータセットを生成して行を整数に写像します。reduce は最も大きな単語のカウントを見つけるためにデータセット上で呼ばれます。map および reduceへの引数はスカラ関数のリテラル(クロージャー)で、Scala/Javaライブラリのどのような言語の機能も使うことができます。例えば、どこで宣言された関数でも簡単に呼ぶことができます。このコードを分かりやすくするために Math.max()関数を使おうと思います:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

ここで、行のデータセットを単語のデータセットに変換するために flatMap を呼び、ファイル中の単語あたりのカウントを (String, Long) のペアのデータセットとして計算するために groupByKeycount を呼びます。シェル内で単語のカウントを集めるために、collectを呼ぶことができます:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

これの最初は行を整数値にマップし、新しいデータフレームを作りながら “numWords” として別の名前を付けます。agg は最も大きな単語のカウントを見つけるためにデータフレーム上で呼ばれます。select および agg の引数は共に Columnで、データフレームからカラムを取得するために df.colName を使うことができます。pyspark.sql.functions もインポートすることができます。これは古いカラムから新しいカラムを構築するための多くの便利な関数を提供します。

Hadoopで普及したように、一般的なデータフローのパターンにMapReduceがあります。SparkはMapReduceのフローを簡単に実装することができます:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

ここで、行のデータセットを単語のデータセットに変換するためにselect内でexplode 関数を使い、2つのカラムのデータフレームとしてファイル内で単語ごとのカウントを計算するためにgroupBycount を組み合わせます: “word” と “count”。シェル内で単語のカウントを集めるために、collectを呼ぶことができます:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

キャッシング

Sparkはクラスタ単位のメモリキャッシュ内へのデータセットの取り込みもサポートしています。これは、小さな"hot"データセットをクエリする場合やPageRankのような繰り返しのアルゴリズムを実行する場合などのように、データがたびたびアクセスされる場合に有用です。簡単な例として、linesWithSpark データセットがキャッシュされるように印を付けてみましょう。

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミング ガイド で説明されるように、bin/spark-shell をクラスタに接続することで対話的にこれを行うこともできます。

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

100行のテキストファイルを調べたりキャッシュするためにSparkを使うことはばかばかしく思えるかも知れません。興味深い点は、同じ関数がたとえストラインピングされた数十、数百のノードであっても、巨大なデータセットに使うことができるということです。RDD プログラミングガイドで説明されるように、クラスタにbin/pyspark を接続することで対話的にこれをすることもできます。

自己内包したアプリケーション

SparkAPIを使用した自己内包アプリケーションを書きたいとします。Scala (with sbt), Java (with Maven), および Python (pip)での簡単なアプリケーションを見てみましょう。

Scalaでとても簡単なSparkアプリケーションを作成するつもりです - 実際簡単なため、SimpleApp.scalaという名前です:

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

アプリケーションはscala.Appを拡張する代わりにmain()メソッドを定義すべきであることに注意してください。scala.Appのサブクラスはおそらく正しく動かないでしょう。

このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。

SparkSession を構築するために SparkSession.builder を呼び出し、続いてアプリケーション名を設定し、最後に SparkSession インスタンスを取得するために getOrCreate を呼びます。

アプリケーションがSpark APIに依存しているため、sbtの設定ファイルも含みます。build.sbt はSparkの依存性を説明します。このファイルはSparkが依存するリポジトリも追加します:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1"

stbが正しく動作するには、SimpleApp.scalabuild.sbtを代表的なディレクトリ構造に応じて配置する必要があるでしょう。適切な場所に一旦配置すると、アプリケーションのコードを含むJARパッケージを作成することができ、プログラムを実行するspark-submit スクリプトを使うことができます。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

この例はMavenをアプリケーションJARをコンパイルするために使用しますが、どの似たようなビルドシステムでも動作するでしょう。

とても簡単なSparkアプリケーション SimpleApp.java を作成します:

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

このプログラムはSparkのREADMEの 'a'を含む行の数と、'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。自身のSparkSessionを初期化したSparkシェルの以前の例と違って、プログラムの一部としてSparkSessionを初期化します。

プログラムをビルドするために、Sparkの依存関係をリスト化したpom.xmlファイルも書きます。SparkのartifactはScalaのバージョンでタグを付けられていることに注意してください。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.2.1</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

Mavenの正統なディレクトリ構造に従ってそれらのファイルを配置します。

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

これでMavenを使ってアプリケーションをパッケージ化することができ、./bin/spark-submit を使って実行します。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

それでは、Python API (PySpark)を使ってアプリケーションを書く方法を示そうと思います。

パッケージ化されたアプリケーションあるいはライブラリを構築している場合、以下のようにそれをsetup.pyファイルに追加することができます:

    install_requires=[
        'pyspark==3.2.1'
    ]

例として、簡単なSparkアプリケーション、 SimpleApp.pyを作成します:

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

このプログラムはテキストファイルの'a'を含む行の数と'b'を含む行の数を数えるだけです。YOUR_SPARK_HOMEをSparkがインストールされた場所に置き換える必要があることに注意してください。ScaleおよびJavaの例と同じように、データセットを作成するためにSparkSession を使用します。独自のクラスあるいはサードパーティのライブラリを使うアプリケーションのために、コードの依存物を.zipファイルにまとめてspark-submit--py-filesに渡すことで、それらを追加することもできます。SimpleAppはどのようなコードの依存を指定する必要もないくらい簡単なものです。

このアプリケーションを bin/spark-submit スクリプトを使って実行することができます:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

環境にpipインストールしたPySparkがある場合(つまり pip install pyspark)、通常のPythonインタプリタあるいを使ってアプリケーションを実行、あるいは好きな提供された ‘spark-submit’ を使うことができます。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

Other dependency management tools such as Conda and pip can be also used for custom classes or third-party libraries. See also Python Package Management.

この後どうすればいいか

初めてのSparkアプリケーションの実行おめでとうございます!

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R
TOP
inserted by FC2 system