SparkR (SparkでのR)
- 概要
- SparkDataFrame
- 機械学習
- R 関数名の衝突
- 移行ガイド
概要
SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 2.1.2では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。
SparkDataFrame
SparkDataFrameは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。SparkDataFrameは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。
このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR
シェルを使って実行することができます。
開始: SparkSession
SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkSession
です。sparkR.session
を使って SparkSession
を生成することができ、アプリケーション名、依存される全てのSparkパッケージなどのようなオプションを渡すことができます。更に、SparkSession
を使ってSparkDataFrameと連携することもできます。SparkR
シェルから操作をしている場合は、SparkSession
は既に生成されており、<c7>sparkR.session/c7>を呼び出す必要は無いでしょう。
RStudioからの開始
RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.session
を呼び出してください。Sparkのインストレーションの調査を行い、もし見つからなければ自動的にダウンロードしキャッシュするでしょう。他のやり方として、install.spark
を手動で実行することもできます。
sparkR.session
の呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkConfig
引数の中の他の設定プロパティをsparkR.session()
に渡すように、それらを渡します。
以下のSparkドライバーのプロパティはRStudioからsparkR.session
を使って sparkConfig
に設定することができます。
プロパティ名 | プロパティグループ | spark-submit 等価なもの |
---|---|---|
spark.master |
アプリケーションのプロパティ | --master |
spark.yarn.keytab |
アプリケーションのプロパティ | --keytab |
spark.yarn.principal |
アプリケーションのプロパティ | --principal |
spark.driver.memory |
アプリケーションのプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
SparkDataFramesの作成
SparkSession
を使って、アプリケーションはローカルのRデータフレーム、Hiveテーブル、あるいはその他の data sourcesから、SparkDataFrame
を生成することができます。
ローカルデータフレームから
データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkDataFrameに変換する方法です。具体的には、as.DataFrame
あるいはcreateDataFrame
を使い、 SparkDataFrameを生成するためにローカルのRデータフレームを渡すことができます。例として、以下はRのfaithful
を使ったSparkDataFrame
を生成します。
データソースから
SparkRは SparkDataFrame
インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。
データソースからSparkDataFrameを作成する一般的なメソッドは read.df
です。このメソッドはロードするファイルのパスとデータソースの種類を取り、現在のところ有効なSparkSessionが自動的に使われるようにするでしょう。SparkRはそのままでJSON, CSVおよびParquetファイルの読み込みをサポートし、サードパーティ プロジェクトのようなソースから利用可能なパッケージを使って、Avroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。These packages can either be added by specifying --packages
with spark-submit
or sparkR
commands, or if initializing SparkSession with sparkPackages
parameter when in an interactive R shell or from RStudio.
例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。
データソースAPIは本質的にCSV形式の入力ファイルをサポートします。もっと詳しい情報はSparkRのrad.df APIドキュメントを参照してください。
データソースAPIはSparkDataFramesを複数のファイルフォーマットに保存するために使用することもできます。例えば、SparkDataFrameを以前の例からwrite.df
を使ってParquetファイルへ保存することができます。
Hiveテーブルから
SparkDataFrameをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveサポートのSparkSessionを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。詳細はSQL プログラミングガイドで見つけることができます。SparkRでは、デフォルトでHiveサポートが有効なSparkSessionを生成しようとするでしょう(enableHiveSupport = TRUE
)。
SparkDataFrame 操作
SparkDataFrames は構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:
行、列の選択
グルーピング、集約
SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithful
データセットの中で待ち
時間のヒストグラムを計算することができます。
列の操作
SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。
ユーザ定義の関数の適用
SparkRでは、いくつかの種類のユーザ定義の関数をサポートします。
dapply
あるいは dapplyCollect
を使って巨大なデータセット上で指定した関数を実行する
dapply
SparkDataFrame
の各パーティションへ関数を適用します。SparkDataFrame
の各パーティションに適用される関数は1つだけパラメータを持たなければなりません。各パーティションに対応する data.frame
がそれに渡されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それは返り値のdata types に一致しなければなりません。
dapplyCollect
dapply
のように、SparkDataFrame
の各パーティションに関数を適用し、返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、dapplyCollect
は失敗するかも知れないことに注意してください。
入力カラムによってグルーピングし、gapply
あるいはgapplyCollect
を使うことで、巨大なデータセット上で指定した関数を実行する
gapply
SparkDataFrame
の各グループへ関数を適用します。関数はSparkDataFrame
の各グループへ適用され、二つのパラメータだけを持つ必要があります: グルーピングのキーとそのキーに対応するRのdata.frame
。グループは SparkDataFrame
のカラムから選択されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それはSparkのデータタイプを基礎としてRの関数の出力スキーマを表さなければなりません。返り値のdata.frame
のカラム名はユーザによって設定されます。
gapplyCollect
gapply
のように、SparkDataFrame
の各パーティションに関数を適用し、Rのdata.frameに返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、gapplyCollect
は失敗するかも知れないことに注意してください。
RとSparkの間でデータタイプをマッピング
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | string |
string | string |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
spark.lapply
を使って分散されたローカルのR関数を実行する
spark.lapply
ネイティブのRのlapply
と似て、spark.lapply
は要素のリスト上に関数を実行し、Sparkを使って計算を分配します。doParallel
あるいは lapply
に似た方法でリストの要素に関数を適用します。全ての計算の結果は1つのマシーン内に収まらなければなりません。そうでなければ、df <- createDataFrame(list)
のようなことをすることができ、そしてdapply
を使います。
SparkRからのSQLクエリの実行
SparkDataFrameはSpark SQLの中の一時的なビューとして登録することもでき、そのデータにSQLクエリを実行することができます。sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrame
として返されます。
機械学習
アルゴリズム
SparkR は現在のところ以下の機械学習アルゴリズムをサポートします:
spark.glm
あるいはglm
:一般化線形モデル
spark.survreg
:加速度故障時間 (AFT) 生存回帰モデル
spark.naiveBayes
:ナイーブベイズ モデル
spark.kmeans
:K-平均モデル
spark.logit
:ロジスティック回帰モデル
spark.isoreg
:等調回帰モデル
spark.gaussianMixture
:混合ガウス モデル
spark.lda
:潜在的ディレクレ配分法 (LDA) モデル
spark.mlp
:多層パーセプトロン分類モデル
spark.gbt
:回帰
および
分類
のための勾配ブースト木モデル
spark.randomForest
:回帰
および
分類
のためのランダムフォレストモデル
spark.als
:交互最小自乗法 (ALS) マトリックス因数分解モデル
spark.kstest
:コルモゴロフ-スミルノフ テスト
裏では、SparkRはモデルを訓練するためにMLlibを使用します。コードの例については、MLlibユーザガイドの対応する章を参照してください。ユーザは合致したモデルのサマリを出力するために summary
を呼ぶことができ、新しいデータ上で予想をするためにpredictを呼ぶことができ、合致したモデルを保存/ロードするためにwrite.ml/read.mlを呼ぶことができます。SparkRは、モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。
モデルの連続性
以下の例はSparkRによってMLlibのモデルをどうやって保存/ロードするかを示します。<div class="highlight"><pre>irisDF <- suppressWarnings(createDataFrame(iris)) # Fit a generalized linear model of family "gaussian" with spark.glm gaussianDF <- irisDF gaussianTestDF <- irisDF gaussianGLM <- spark.glm(gaussianDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")
# Save and then load a fitted MLlib model modelPath <- tempfile(pattern = "ml", fileext = ".tmp") write.ml(gaussianGLM, modelPath) gaussianGLM2 <- read.ml(modelPath)
# Check model summary summary(gaussianGLM2)
# Check model prediction gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF) showDF(gaussianPredictions)
unlink(modelPath) </pre></div><div>Find full example code at “examples/src/main/r/ml/ml.R” in the Spark repo.</div>
R 関数名の衝突
R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。
以下の関数はSparkRパッケージによってマスクされています。
マスクされた関数 | どうやってアクセスするか |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkRの部分は dplyr
パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr
内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)
あるいはdplyr::cume_dist(x)
のようにパッケージ名を使った呼び出しをします。
search()
を使ってR内の検索パスを調べることができます。
移行ガイド
SparkR 1.5.x から1.6.x へのアップグレード
- Spark1.6.0の前は、書き込みのデフォルトのモードは
append
でした。Spark 1.6.0でScala APIに合致するように、error
に変更されました。 - SparkSQL はRの
NA
をnull
に変換します。逆もまた同様です。
SparkR 1.6.x から2.0 へのアップグレード
- メソッド
table
は削除され、tableToDF
に置き換えられました。 - クラス
DataFrame
は名前の衝突を避けるためにSparkDataFrame
に名前を変更されました。 - Sparkの
SQLContext
とHiveContext
はSparkSession
と置き換えられるために非推奨になりました。SparkSessionをインスタンス化するには、sparkR.init()
の代わりに、その場所でsparkR.session()
を呼んでください。一度これが行われると、現在の有効なSparkSessionhaSparkDataFrame操作のために使われるでしょう。 sparkExecutorEnv
はsparkR.session
によってサポートされません。executorのための環境を設定するために、プリフィックス"spark.executorEnv.VAR_NAME"を使ってSparkの設定プロパティを設定します。例えば "spark.executorEnv.PATH"sqlContext
パラメータはもう以下のこれらの関数を必要としません:createDataFrame
,as.DataFrame
,read.json
,jsonFile
,read.parquet
,parquetFile
,read.text
,sql
,tables
,tableNames
,cacheTable
,uncacheTable
,clearCache
,dropTempTable
,read.df
,loadDF
,createExternalTable
.- メソッド
registerTempTable
はcreateOrReplaceTempView
と置き換えられるために非推奨になりました。 - メソッド
dropTempTable
はdropTempView
と置き換えられるために非推奨になりました。 - これらの関数では
sc
SparkContext パラメータはもう必要とされません:setJobGroup
,clearJobGroup
,cancelJobGroup
SparkR 2.1.0へのアップグレード
join
はもうデフォルトではデカルト積を実行しません。代わりにcrossJoin
を使ってください。