SparkR (SparkでのR)
- 概要
- SparkDataFrame
- 機械学習
- R 関数名の衝突
- 移行ガイド
概要
SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 2.1.1では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。
SparkDataFrame
SparkDataFrameは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。SparkDataFrameは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。
このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR
シェルを使って実行することができます。
開始: SparkSession
SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkSession
です。sparkR.session
を使って SparkSession
を生成することができ、アプリケーション名、依存される全てのSparkパッケージなどのようなオプションを渡すことができます。更に、SparkSession
を使ってSparkDataFrameと連携することもできます。SparkR
シェルから操作をしている場合は、SparkSession
は既に生成されており、<c7>sparkR.session/c7>を呼び出す必要は無いでしょう。
sparkR.session()
RStudioからの開始
RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.session
を呼び出してください。Sparkのインストレーションの調査を行い、もし見つからなければ自動的にダウンロードしキャッシュするでしょう。他のやり方として、install.spark
を手動で実行することもできます。
sparkR.session
の呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkConfig
引数の中の他の設定プロパティをsparkR.session()
に渡すように、それらを渡します。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
以下のSparkドライバーのプロパティはRStudioからsparkR.session
を使って sparkConfig
に設定することができます。
プロパティ名 | プロパティグループ | spark-submit 等価なもの |
---|---|---|
spark.master |
アプリケーションのプロパティ | --master |
spark.yarn.keytab |
アプリケーションのプロパティ | --keytab |
spark.yarn.principal |
アプリケーションのプロパティ | --principal |
spark.driver.memory |
アプリケーションのプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
SparkDataFramesの作成
SparkSession
を使って、アプリケーションはローカルのRデータフレーム、Hiveテーブル、あるいはその他の data sourcesから、SparkDataFrame
を生成することができます。
ローカルデータフレームから
データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkDataFrameに変換する方法です。具体的には、as.DataFrame
あるいはcreateDataFrame
を使い、 SparkDataFrameを生成するためにローカルのRデータフレームを渡すことができます。例として、以下はRのfaithful
を使ったSparkDataFrame
を生成します。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
データソースから
SparkRは SparkDataFrame
インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。
データソースからSparkDataFrameを作成する一般的なメソッドは read.df
です。このメソッドはロードするファイルのパスとデータソースの種類を取り、現在のところ有効なSparkSessionが自動的に使われるようにするでしょう。SparkRはそのままでJSON, CSVおよびParquetファイルの読み込みをサポートし、サードパーティ プロジェクトのようなソースから利用可能なパッケージを使って、Avroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。These packages can either be added by specifying --packages
with spark-submit
or sparkR
commands, or if initializing SparkSession with sparkPackages
parameter when in an interactive R shell or from RStudio.
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
データソースAPIは本質的にCSV形式の入力ファイルをサポートします。もっと詳しい情報はSparkRのrad.df APIドキュメントを参照してください。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
データソースAPIはSparkDataFramesを複数のファイルフォーマットに保存するために使用することもできます。例えば、SparkDataFrameを以前の例からwrite.df
を使ってParquetファイルへ保存することができます。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
Hiveテーブルから
SparkDataFrameをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveサポートのSparkSessionを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。詳細はSQL プログラミングガイドで見つけることができます。SparkRでは、デフォルトでHiveサポートが有効なSparkSessionを生成しようとするでしょう(enableHiveSupport = TRUE
)。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrame 操作
SparkDataFrames は構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:
行、列の選択
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
グルーピング、集約
SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithful
データセットの中で待ち
時間のヒストグラムを計算することができます。
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
列の操作
SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
ユーザ定義の関数の適用
SparkRでは、いくつかの種類のユーザ定義の関数をサポートします。
dapply
あるいは dapplyCollect
を使って巨大なデータセット上で指定した関数を実行する
dapply
SparkDataFrame
の各パーティションへ関数を適用します。SparkDataFrame
の各パーティションに適用される関数は1つだけパラメータを持たなければなりません。各パーティションに対応する data.frame
がそれに渡されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それは返り値のdata types に一致しなければなりません。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
dapply
のように、SparkDataFrame
の各パーティションに関数を適用し、返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、dapplyCollect
は失敗するかも知れないことに注意してください。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
入力カラムによってグルーピングし、gapply
あるいはgapplyCollect
を使うことで、巨大なデータセット上で指定した関数を実行する
gapply
SparkDataFrame
の各グループへ関数を適用します。関数はSparkDataFrame
の各グループへ適用され、二つのパラメータだけを持つ必要があります: グルーピングのキーとそのキーに対応するRのdata.frame
。グループは SparkDataFrame
のカラムから選択されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それはSparkのデータタイプを基礎としてRの関数の出力スキーマを表さなければなりません。返り値のdata.frame
のカラム名はユーザによって設定されます。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
gapply
のように、SparkDataFrame
の各パーティションに関数を適用し、Rのdata.frameに返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、gapplyCollect
は失敗するかも知れないことに注意してください。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
RとSparkの間でデータタイプをマッピング
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | string |
string | string |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
spark.lapply
を使って分散されたローカルのR関数を実行する
spark.lapply
ネイティブのRのlapply
と似て、spark.lapply
は要素のリスト上に関数を実行し、Sparkを使って計算を分配します。doParallel
あるいは lapply
に似た方法でリストの要素に関数を適用します。全ての計算の結果は1つのマシーン内に収まらなければなりません。そうでなければ、df <- createDataFrame(list)
のようなことをすることができ、そしてdapply
を使います。
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)
SparkRからのSQLクエリの実行
SparkDataFrameはSpark SQLの中の一時的なビューとして登録することもでき、そのデータにSQLクエリを実行することができます。sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrame
として返されます。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
機械学習
アルゴリズム
SparkR は現在のところ以下の機械学習アルゴリズムをサポートします:
分類
spark.logit
:Logistic Regression
spark.mlp
:Multilayer Perceptron (MLP)
spark.naiveBayes
:Naive Bayes
回帰
spark.survreg
:Accelerated Failure Time (AFT) Survival Model
spark.glm
orglm
:Generalized Linear Model (GLM)
spark.isoreg
:Isotonic Regression
Tree
spark.gbt
:Gradient Boosted Trees for
Regression
and
Classification
spark.randomForest
:Random Forest for
Regression
and
Classification
クラスタリング
spark.gaussianMixture
:Gaussian Mixture Model (GMM)
spark.kmeans
:K-Means
spark.lda
:Latent Dirichlet Allocation (LDA)
協調フィルタリング
統計
spark.kstest
:コルモゴロフ-スミルノフ テスト
裏では、SparkRはモデルを訓練するためにMLlibを使用します。コードの例については、MLlibユーザガイドの対応する章を参照してください。ユーザは合致したモデルのサマリを出力するために summary
を呼ぶことができ、新しいデータ上で予想をするためにpredictを呼ぶことができ、合致したモデルを保存/ロードするためにwrite.ml/read.mlを呼ぶことができます。SparkRは、モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。
モデルの連続性
以下の例はSparkRによってMLlibのモデルをどうやって保存/ロードするかを示します。
irisDF <- suppressWarnings(createDataFrame(iris))
# Fit a generalized linear model of family "gaussian" with spark.glm
gaussianDF <- irisDF
gaussianTestDF <- irisDF
gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
showDF(gaussianPredictions)
unlink(modelPath)
R 関数名の衝突
R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。
以下の関数はSparkRパッケージによってマスクされています。
マスクされた関数 | どうやってアクセスするか |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkRの部分は dplyr
パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr
内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)
あるいはdplyr::cume_dist(x)
のようにパッケージ名を使った呼び出しをします。
search()
を使ってR内の検索パスを調べることができます。
移行ガイド
SparkR 1.5.x から1.6.x へのアップグレード
- Spark1.6.0の前は、書き込みのデフォルトのモードは
append
でした。Spark 1.6.0でScala APIに合致するように、error
に変更されました。 - SparkSQL はRの
NA
をnull
に変換します。逆もまた同様です。
SparkR 1.6.x から2.0 へのアップグレード
- メソッド
table
は削除され、tableToDF
に置き換えられました。 - クラス
DataFrame
は名前の衝突を避けるためにSparkDataFrame
に名前を変更されました。 - Sparkの
SQLContext
とHiveContext
はSparkSession
と置き換えられるために非推奨になりました。SparkSessionをインスタンス化するには、sparkR.init()
の代わりに、その場所でsparkR.session()
を呼んでください。一度これが行われると、現在の有効なSparkSessionhaSparkDataFrame操作のために使われるでしょう。 sparkExecutorEnv
はsparkR.session
によってサポートされません。executorのための環境を設定するために、プリフィックス"spark.executorEnv.VAR_NAME"を使ってSparkの設定プロパティを設定します。例えば "spark.executorEnv.PATH"sqlContext
パラメータはもう以下のこれらの関数を必要としません:createDataFrame
,as.DataFrame
,read.json
,jsonFile
,read.parquet
,parquetFile
,read.text
,sql
,tables
,tableNames
,cacheTable
,uncacheTable
,clearCache
,dropTempTable
,read.df
,loadDF
,createExternalTable
.- メソッド
registerTempTable
はcreateOrReplaceTempView
と置き換えられるために非推奨になりました。 - メソッド
dropTempTable
はdropTempView
と置き換えられるために非推奨になりました。 - これらの関数では
sc
SparkContext パラメータはもう必要とされません:setJobGroup
,clearJobGroup
,cancelJobGroup
SparkR 2.1.0へのアップグレード
join
はもうデフォルトではデカルト積を実行しません。代わりにcrossJoin
を使ってください。