SparkR (SparkでのR)

概要

SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 1.6.0では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。

SparkR データフレーム

データフレームは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。データフレームは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。

このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR シェルを使って実行することができます。

開始: SparkContext, SQLContext

SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkContextです。sparkR.initを使って SparkContextを生成することができ、アプリケーション名、依存するどのようなSparkパッケージなどをオプションで渡すことができます。更に、データフレームで作業をするために、SQLContextが必要でしょう。これはSparkContextから生成することができます。SparkRシェルから操作をしている場合は、SQLContext および SparkContext は既に生成されており、sparkR.initを呼び出す必要は無いでしょう。

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

RStudioからの開始

RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.initを呼び出してください。sparkR.initの呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkEnvir引数の中の他の設定プロパティをsparkR.init()に渡すように、それらを渡します。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))

以下のオプションはRStudioからsparkR.initを使ってsparkEnvirに渡すことができます。

プロパティ名プロパティグループspark-submit 等価
spark.driver.memory アプリケーションのプロパティ --driver-memory
spark.driver.extraClassPath ランタイム環境 --driver-class-path
spark.driver.extraJavaOptions ランタイム環境 --driver-java-options
spark.driver.extraLibraryPath ランタイム環境 --driver-library-path

データフレームの生成

SQLContextを使って、アプリケーションはローカルのRデータフレーム、Hive table、あるいはその他の data sourcesから、DataFrameを生成することができます。

ローカルデータフレームから

データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkR データフレームに変換する方法です。具体的には、SparkRデータフレームを生成するために、createDataFrameを使い、 ローカルのRデータフレームを渡すことができます。例として、以下はRのfaithfulを使ったDataFrameを生成します。

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

データソースから

SparkRは DataFrame インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。

データソースからデータフレームを作成する一般的なメソッドは read.dfです。このメソッドはSQLContext、ロードするファイルのパス、およびデータソースの種類を取ります。SparkRはそのままでJSONとParquetファイルの読み込みをサポートし、Spark Packagesを通じてCSV およびAvroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。これらのパッケージは spark-submitを使って--packages を指定するか、 sparkR コマンドで追加することができます。あるいはもしコンテキストをinit を使って生成する場合は、パッケージを packages引数を使って指定することができます。

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: integer (nullable = true)
#  |-- name: string (nullable = true)

データソースAPIはデータフレームを複数のファイルフォーマットに保存するために使用することもできます。例えば、データフレームを以前の例からwrite.dfを使ってParquetファイルへ保存することができます(Spark 1.6まで、書き込みのデフォルトのモードはappendでした。Spark 1.7ではScala APIに合致するように、errorに変更されました)

write.df(people, path="people.parquet", source="parquet", mode="overwrite")

Hiveテーブルから

SparkRデータフレームをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveContextを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。SQLContextとHiveContextの違いの詳細はSQL プログラミングガイドで見つけることができます。

# sc is an existing SparkContext.
hiveContext <- sparkRHive.init(sc)

sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql(hiveContext, "FROM src SELECT key, value")

# results is now a DataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

データフレームの操作

SparkR データフレームは構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:

行、列の選択

# Create the DataFrame
df <- createDataFrame(sqlContext, faithful)

# Get basic information about the DataFrame
df
## DataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

グルーピング、集約

SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithfulデータセットの中で待ち時間のヒストグラムを計算することができます。

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      81    13
##2      60     6
##3      68     1

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

##   waiting count
##1      78    15
##2      83    14
##3      81    13

列の操作

SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

SparkRからのSQLクエリの実行

SparkRデータフレームはSpark SQLの中の一時的なテーブルとして登録することもでき、テーブルとしてデータフレームを登録することでデータにSQLクエリを実行することができます。sql関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はデータフレームとして返されます。

# Load a JSON file
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")

# Register this DataFrame as a table.
registerTempTable(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

機械学習

SparkR glm() 関数を使ってDataFrame上に一般化された線形モデルを適合することができます。裏では、SparkRは特定のファミリーのモデルを訓練するためにMLlibを使用します。現在のところ、ガウシアンと二項式のファミリーがサポートされます。モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。

summary() 関数はglm()によって生成されるモデルの概要を渡します。

以下の例は、SparkRを使った gaussian GLM モデルと binomial GLM モデルの使い方を示します。

ガウスGLMモデル

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a gaussian GLM model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model summary are returned in a similar format to R's native glm().
summary(model)
##$devianceResiduals
## Min       Max     
## -1.307112 1.412532
##
##$coefficients
##                   Estimate  Std. Error t value  Pr(>|t|)    
##(Intercept)        2.251393  0.3697543  6.08889  9.568102e-09
##Sepal_Width        0.8035609 0.106339   7.556598 4.187317e-12
##Species_versicolor 1.458743  0.1121079  13.01195 0           
##Species_virginica  1.946817  0.100015   19.46525 0           

# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
##  Sepal_Length prediction
##1          5.1   5.063856
##2          4.9   4.662076
##3          4.7   4.822788
##4          4.6   4.742432
##5          5.0   5.144212
##6          5.4   5.385281

二値GLMモデル

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)
training <- filter(df, df$Species != "setosa")

# Fit a binomial GLM model over the dataset.
model <- glm(Species ~ Sepal_Length + Sepal_Width, data = training, family = "binomial")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)
##$coefficients
##               Estimate
##(Intercept)  -13.046005
##Sepal_Length   1.902373
##Sepal_Width    0.404655

R 関数名の衝突

R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。

以下の関数はSparkRパッケージによってマスクされています。

マスクされた関数どうやってアクセスするか
cov in package:stats
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
filter in package:stats
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
sample in package:base base::sample(x, size, replace = FALSE, prob = NULL)
table in package:base
base::table(...,
            exclude = if (useNA == "no") c(NA, NaN),
            useNA = c("no", "ifany", "always"),
            dnn = list.names(...), deparse.level = 1)

SparkRの部分は dplyr パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)あるいはdplyr::cume_dist(x)のようにパッケージ名を使った呼び出しをします。

search()を使ってR内の検索パスを調べることができます。

移行ガイド

SparkR 1.6から1.7へのアップグレード

TOP
inserted by FC2 system