SparkR (SparkでのR)
概要
SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 1.6.0では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。
SparkR データフレーム
データフレームは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。データフレームは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。
このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR
シェルを使って実行することができます。
開始: SparkContext, SQLContext
SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkContext
です。sparkR.init
を使って SparkContext
を生成することができ、アプリケーション名、依存するどのようなSparkパッケージなどをオプションで渡すことができます。更に、データフレームで作業をするために、SQLContext
が必要でしょう。これはSparkContextから生成することができます。SparkR
シェルから操作をしている場合は、SQLContext
および SparkContext
は既に生成されており、sparkR.init
を呼び出す必要は無いでしょう。
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
RStudioからの開始
RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.init
を呼び出してください。sparkR.init
の呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkEnvir
引数の中の他の設定プロパティをsparkR.init()
に渡すように、それらを渡します。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))
以下のオプションはRStudioからsparkR.init
を使ってsparkEnvir
に渡すことができます。
プロパティ名 | プロパティグループ | spark-submit 等価 |
---|---|---|
spark.driver.memory |
アプリケーションのプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
データフレームの生成
SQLContext
を使って、アプリケーションはローカルのRデータフレーム、Hive table、あるいはその他の data sourcesから、DataFrame
を生成することができます。
ローカルデータフレームから
データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkR データフレームに変換する方法です。具体的には、SparkRデータフレームを生成するために、createDataFrame
を使い、 ローカルのRデータフレームを渡すことができます。例として、以下はRのfaithful
を使ったDataFrame
を生成します。
df <- createDataFrame(sqlContext, faithful)
# Displays the content of the DataFrame to stdout
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
データソースから
SparkRは DataFrame
インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。
データソースからデータフレームを作成する一般的なメソッドは read.df
です。このメソッドはSQLContext
、ロードするファイルのパス、およびデータソースの種類を取ります。SparkRはそのままでJSONとParquetファイルの読み込みをサポートし、Spark Packagesを通じてCSV およびAvroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。これらのパッケージは spark-submit
を使って--packages
を指定するか、 sparkR
コマンドで追加することができます。あるいはもしコンテキストをinit
を使って生成する場合は、パッケージを packages
引数を使って指定することができます。
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)
データソースAPIはデータフレームを複数のファイルフォーマットに保存するために使用することもできます。例えば、データフレームを以前の例からwrite.df
を使ってParquetファイルへ保存することができます(Spark 1.6まで、書き込みのデフォルトのモードはappend
でした。Spark 1.7ではScala APIに合致するように、error
に変更されました)
write.df(people, path="people.parquet", source="parquet", mode="overwrite")
Hiveテーブルから
SparkRデータフレームをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveContextを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。SQLContextとHiveContextの違いの詳細はSQL プログラミングガイドで見つけることができます。
# sc is an existing SparkContext.
hiveContext <- sparkRHive.init(sc)
sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql(hiveContext, "FROM src SELECT key, value")
# results is now a DataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
データフレームの操作
SparkR データフレームは構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:
行、列の選択
# Create the DataFrame
df <- createDataFrame(sqlContext, faithful)
# Get basic information about the DataFrame
df
## DataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
グルーピング、集約
SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithful
データセットの中で待ち
時間のヒストグラムを計算することができます。
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 81 13
##2 60 6
##3 68 1
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
列の操作
SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
SparkRからのSQLクエリの実行
SparkRデータフレームはSpark SQLの中の一時的なテーブルとして登録することもでき、テーブルとしてデータフレームを登録することでデータにSQLクエリを実行することができます。sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はデータフレーム
として返されます。
# Load a JSON file
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
# Register this DataFrame as a table.
registerTempTable(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
機械学習
SparkR glm() 関数を使ってDataFrame上に一般化された線形モデルを適合することができます。裏では、SparkRは特定のファミリーのモデルを訓練するためにMLlibを使用します。現在のところ、ガウシアンと二項式のファミリーがサポートされます。モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。
summary() 関数はglm()によって生成されるモデルの概要を渡します。
- gaussian GLM モデルについては、‘devianceResiduals’ および ‘coefficients’ コンポーネントを持つリストを返します。‘devianceResiduals’ は予測の逸脱の剰余の最小/最大を返します; ‘coefficients’ は予測された係数とそれらの予測された標準誤差 t 値とp値を返します。(通常のsolverによって当てはめられたモデルの場合にのみ利用可能です。)
- binomial GLM モデルについては、予測された係数を渡す‘coefficients’ コンポーネントを持つリストを返します。
以下の例は、SparkRを使った gaussian GLM モデルと binomial GLM モデルの使い方を示します。
ガウスGLMモデル
# Create the DataFrame
df <- createDataFrame(sqlContext, iris)
# Fit a gaussian GLM model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model summary are returned in a similar format to R's native glm().
summary(model)
##$devianceResiduals
## Min Max
## -1.307112 1.412532
##
##$coefficients
## Estimate Std. Error t value Pr(>|t|)
##(Intercept) 2.251393 0.3697543 6.08889 9.568102e-09
##Sepal_Width 0.8035609 0.106339 7.556598 4.187317e-12
##Species_versicolor 1.458743 0.1121079 13.01195 0
##Species_virginica 1.946817 0.100015 19.46525 0
# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
## Sepal_Length prediction
##1 5.1 5.063856
##2 4.9 4.662076
##3 4.7 4.822788
##4 4.6 4.742432
##5 5.0 5.144212
##6 5.4 5.385281
二値GLMモデル
# Create the DataFrame
df <- createDataFrame(sqlContext, iris)
training <- filter(df, df$Species != "setosa")
# Fit a binomial GLM model over the dataset.
model <- glm(Species ~ Sepal_Length + Sepal_Width, data = training, family = "binomial")
# Model coefficients are returned in a similar format to R's native glm().
summary(model)
##$coefficients
## Estimate
##(Intercept) -13.046005
##Sepal_Length 1.902373
##Sepal_Width 0.404655
R 関数名の衝突
R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。
以下の関数はSparkRパッケージによってマスクされています。
マスクされた関数 | どうやってアクセスするか |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
table in package:base |
|
SparkRの部分は dplyr
パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr
内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)
あるいはdplyr::cume_dist(x)
のようにパッケージ名を使った呼び出しをします。
search()
を使ってR内の検索パスを調べることができます。
移行ガイド
SparkR 1.6から1.7へのアップグレード
- Spark1.6まで、書き込みのデフォルトのモードは
append
でした。Spark 1.7ではScala APIに合致するように、error
に変更されました)