SparkR (SparkでのR)

概要

SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 2.0.0では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。

SparkDataFrame

SparkDataFrameは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。SparkDataFrameは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。

このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR シェルを使って実行することができます。

開始: SparkSession

SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkSessionです。sparkR.sessionを使って SparkSessionを生成することができ、アプリケーション名、依存される全てのSparkパッケージなどのようなオプションを渡すことができます。更に、SparkSessionを使ってSparkDataFrameと連携することもできます。SparkRシェルから操作をしている場合は、SparkSessionは既に生成されており、<c7>sparkR.session/c7>を呼び出す必要は無いでしょう。

sparkR.session()

RStudioからの開始

RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.sessionを呼び出してください。sparkR.sessionの呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkConfig引数の中の他の設定プロパティをsparkR.session()に渡すように、それらを渡します。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下のSparkドライバーのプロパティはRStudioからsparkR.sessionを使って sparkConfigに設定することができます。

プロパティ名プロパティグループspark-submit 等価なもの
spark.driver.memory アプリケーションのプロパティ --driver-memory
spark.driver.extraClassPath ランタイム環境 --driver-class-path
spark.driver.extraJavaOptions ランタイム環境 --driver-java-options
spark.driver.extraLibraryPath ランタイム環境 --driver-library-path

SparkDataFramesの作成

SparkSessionを使って、アプリケーションはローカルのRデータフレーム、Hiveテーブル、あるいはその他の data sourcesから、SparkDataFrameを生成することができます。

ローカルデータフレームから

データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkDataFrameに変換する方法です。具体的には、as.DataFrameあるいはcreateDataFrameを使い、 SparkDataFrameを生成するためにローカルのRデータフレームを渡すことができます。例として、以下はRのfaithfulを使ったSparkDataFrameを生成します。

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

データソースから

SparkRは SparkDataFrame インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。

データソースからSparkDataFrameを作成する一般的なメソッドは read.dfです。このメソッドはロードするファイルのパスとデータソースの種類を取り、現在のところ有効なSparkSessionが自動的に使われるようにするでしょう。SparkRはそのままでJSON, CSVおよびParquetファイルの読み込みをサポートし、Spark Packagesを通じてAvroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。These packages can either be added by specifying --packages with spark-submit or sparkR commands, or if initializing SparkSession with sparkPackages parameter when in an interactive R shell or from RStudio.

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")

例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

データソースAPIは本質的にCSV形式の入力ファイルをサポートします。もっと詳しい情報はSparkRのrad.df APIドキュメントを参照してください。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

データソースAPIはSparkDataFramesを複数のファイルフォーマットに保存するために使用することもできます。例えば、SparkDataFrameを以前の例からwrite.dfを使ってParquetファイルへ保存することができます。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hiveテーブルから

SparkDataFrameをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveサポートのSparkSessionを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。詳細はSQL プログラミングガイドで見つけることができます。SparkRでは、デフォルトでHiveサポートが有効なSparkSessionを生成しようとするでしょう(enableHiveSupport = TRUE)。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames は構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:

行、列の選択

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

グルーピング、集約

SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithfulデータセットの中で待ち時間のヒストグラムを計算することができます。

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

列の操作

SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

ユーザ定義の関数の適用

SparkRでは、いくつかの種類のユーザ定義の関数をサポートします。

dapply あるいは dapplyCollectを使って巨大なデータセット上で指定した関数を実行する

dapply

SparkDataFrameの各パーティションへ関数を適用します。SparkDataFrame の各パーティションに適用される関数は1つだけパラメータを持たなければなりません。各パーティションに対応する data.frame がそれに渡されます。関数の出力はdata.frameであるべきです。スキーマはSparkDataFrameの結果の行のフォーマットを定義します。それは返り値のdata types に一致しなければなりません。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

dapplyのように、SparkDataFrameの各パーティションに関数を適用し、返ってくる結果をまとめます。関数の出力はdata.frameであるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、dapplyCollectは失敗するかも知れないことに注意してください。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

入力カラムによってグルーピングし、gapply あるいはgapplyCollectを使うことで、巨大なデータセット上で指定した関数を実行する

gapply

SparkDataFrameの各グループへ関数を適用します。The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. グループは SparkDataFrameのカラムから選択されます。関数の出力はdata.frameであるべきです。スキーマはSparkDataFrameの結果の行のフォーマットを定義します。それはSparkのデータタイプを基礎としてRの関数の出力スキーマを表さなければなりません。返り値のdata.frame のカラム名はユーザによって設定されます。以下はRとSparkの間でのデータタイプのマッピングです。

RとSparkの間でデータタイプをマッピング

RSpark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

gapplyのように、SparkDataFrameの各パーティションに関数を適用し、Rのdata.frameに返ってくる結果をまとめます。関数の出力はdata.frameであるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、gapplyCollectは失敗するかも知れないことに注意してください。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

spark.lapplyを使って分散されたローカルのR関数を実行する

spark.lapply

ネイティブのRのlapply と似て、spark.lapply は要素のリスト上に関数を実行し、Sparkを使って計算を分配します。doParallel あるいは lapply に似た方法でリストの要素に関数を適用します。全ての計算の結果は1つのマシーン内に収まらなければなりません。そうでなければ、df <- createDataFrame(list)のようなことをすることができ、そしてdapplyを使います。

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

SparkRからのSQLクエリの実行

SparkDataFrameはSpark SQLの中の一時的なビューとして登録することもでき、そのデータにSQLクエリを実行することができます。sql関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrameとして返されます。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

機械学習

SparkR は現在のところ以下の機械学習アルゴリズムをサポートします: 一般化線形モデル, 加速度故障時間(AFT)生存回帰モデル, ナイーブベイズモデル および K平均モデル。裏では、SparkRはモデルを訓練するためにMLlibを使用します。ユーザは合致したモデルのサマリを出力するために summary を呼ぶことができ、新しいデータ上で予想をするためにpredictを呼ぶことができ、合致したモデルを保存/ロードするためにwrite.ml/read.mlを呼ぶことができます。SparkRは、モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。

アルゴリズム

一般化線形モデル

spark.glm() あるいは glm() はSparkのデータフレームに対して一般化線形モデルを適合します。現在のところ、"ガウシアン", "二項式", "ポワソン" および "ガンマ"のファミリーがサポートされます。

irisDF <- suppressWarnings(createDataFrame(iris))
# Fit a generalized linear model of family "gaussian" with spark.glm
gaussianDF <- irisDF
gaussianTestDF <- irisDF
gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")

# Model summary
summary(gaussianGLM)

# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
showDF(gaussianPredictions)

# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(Sepal_Length ~ Sepal_Width + Species, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
binomialDF <- filter(irisDF, irisDF$Species != "setosa")
binomialTestDF <- binomialDF
binomialGLM <- spark.glm(binomialDF, Species ~ Sepal_Length + Sepal_Width, family = "binomial")

# Model summary
summary(binomialGLM)

# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
showDF(binomialPredictions)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/ml.R" で見つかります。

Accelerated Failure Time (AFT) Survival Regression Model

spark.survreg() はSparkDataFrame上に加速度故障時間モデル(AFT)生存回帰モデルを適合します。現在のところspark.survreg() の公式はオペレータ '.' をサポートしないことに注意してください。

# Use the ovarian dataset available in R survival package
library(survival)

# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)

# Model summary
summary(aftModel)

# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
showDF(aftPredictions)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/ml.R" で見つかります。

ナイーブベイズ モデル

spark.naiveBayes() はSparkDataFrameにベルヌーイ ナイーブ ベイズ モデルを適合します。分類データのみがサポートされます。

# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
showDF(nbPredictions)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/ml.R" で見つかります。

KMeans モデル

spark.kmeans() SparkのDataFrameに対し、Rのkmeans()に似たk-meansクラスタリングモデルを適合します。

# Fit a k-means model with spark.kmeans
irisDF <- suppressWarnings(createDataFrame(iris))
kmeansDF <- irisDF
kmeansTestDF <- irisDF
kmeansModel <- spark.kmeans(kmeansDF, ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
showDF(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
showDF(kmeansPredictions)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/ml.R" で見つかります。

モデルの連続性

以下の例はSparkRによってMLlibのモデルをどうやって保存/ロードするかを示します。

irisDF <- suppressWarnings(createDataFrame(iris))
# Fit a generalized linear model of family "gaussian" with spark.glm
gaussianDF <- irisDF
gaussianTestDF <- irisDF
gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
showDF(gaussianPredictions)

unlink(modelPath)
例の完全なコードは Spark のリポジトリの "examples/src/main/r/ml.R" で見つかります。

R 関数名の衝突

R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。

以下の関数はSparkRパッケージによってマスクされています。

マスクされた関数どうやってアクセスするか
cov in package:stats
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
filter in package:stats
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
sample in package:base base::sample(x, size, replace = FALSE, prob = NULL)

SparkRの部分は dplyr パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)あるいはdplyr::cume_dist(x)のようにパッケージ名を使った呼び出しをします。

search()を使ってR内の検索パスを調べることができます。

移行ガイド

SparkR 1.5.x から1.6.x へのアップグレード

SparkR 1.6.x から2.0 へのアップグレード

TOP
inserted by FC2 system