SparkR (SparkでのR)
- 概要
- SparkDataFrame
- 機械学習
- RとSparkの間でデータタイプをマッピング
- 構造化ストリーミング
- R 関数名の衝突
- 移行ガイド
概要
SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 2.4.3では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。
SparkDataFrame
SparkDataFrameは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。SparkDataFrameは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。
このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR
シェルを使って実行することができます。
開始: SparkSession
SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkSession
です。sparkR.session
を使って SparkSession
を生成することができ、アプリケーション名、依存される全てのSparkパッケージなどのようなオプションを渡すことができます。更に、SparkSession
を使ってSparkDataFrameと連携することもできます。SparkR
シェルから操作をしている場合は、SparkSession
は既に生成されており、<c7>sparkR.session/c7>を呼び出す必要は無いでしょう。
RStudioからの開始
RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.session
を呼び出してください。Sparkのインストレーションの調査を行い、もし見つからなければ自動的にダウンロードしキャッシュするでしょう。他のやり方として、install.spark
を手動で実行することもできます。
sparkR.session
の呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkConfig
引数の中の他の設定プロパティをsparkR.session()
に渡すように、それらを渡します。
以下のSparkドライバーのプロパティはRStudioからsparkR.session
を使って sparkConfig
に設定することができます。
プロパティ名 | プロパティグループ | spark-submit 等価なもの |
---|---|---|
spark.master |
アプリケーションのプロパティ | --master |
spark.yarn.keytab |
アプリケーションのプロパティ | --keytab |
spark.yarn.principal |
アプリケーションのプロパティ | --principal |
spark.driver.memory |
アプリケーションのプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
SparkDataFramesの作成
SparkSession
を使って、アプリケーションはローカルのRデータフレーム、Hiveテーブル、あるいはその他の data sourcesから、SparkDataFrame
を生成することができます。
ローカルデータフレームから
データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkDataFrameに変換する方法です。具体的には、as.DataFrame
あるいはcreateDataFrame
を使い、 SparkDataFrameを生成するためにローカルのRデータフレームを渡すことができます。例として、以下はRのfaithful
を使ったSparkDataFrame
を生成します。
データソースから
SparkRは SparkDataFrame
インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。
データソースからSparkDataFrameを作成する一般的なメソッドは read.df
です。このメソッドはロードするファイルのパスとデータソースの種類を取り、現在のところ有効なSparkSessionが自動的に使われるようにするでしょう。SparkRはそのままでJSON, CSVおよびParquetファイルの読み込みをサポートし、サードパーティ プロジェクトのようなソースから利用可能なパッケージを使って、Avroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。これらのパッケージはspark-submit
あるいは sparkR
と一緒に--packages
を指定するか、対話的なRシェルあるいはRStudioにいる時は sparkPackages
パラメータを使ってSparkSessionを初期化するかによって追加することができます。
例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。
データソースAPIは本質的にCSV形式の入力ファイルをサポートします。もっと詳しい情報はSparkRのrad.df APIドキュメントを参照してください。
データソースAPIはSparkDataFramesを複数のファイルフォーマットに保存するために使用することもできます。例えば、SparkDataFrameを以前の例からwrite.df
を使ってParquetファイルへ保存することができます。
Hiveテーブルから
SparkDataFrameをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveサポートのSparkSessionを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。詳細はSQL プログラミングガイドで見つけることができます。SparkRでは、デフォルトでHiveサポートが有効なSparkSessionを生成しようとするでしょう(enableHiveSupport = TRUE
)。
SparkDataFrame 操作
SparkDataFrames は構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:
行、列の選択
グルーピング、集約
SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithful
データセットの中で待ち
時間のヒストグラムを計算することができます。
標準の集約に加えて、SparkRは OLAP cube オペレータ cube
をサポートします:
そして rollup
:
列の操作
SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。
ユーザ定義の関数の適用
SparkRでは、いくつかの種類のユーザ定義の関数をサポートします。
dapply
あるいは dapplyCollect
を使って巨大なデータセット上で指定した関数を実行する
dapply
SparkDataFrame
の各パーティションへ関数を適用します。SparkDataFrame
の各パーティションに適用される関数は1つだけパラメータを持たなければなりません。各パーティションに対応する data.frame
がそれに渡されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それは返り値のdata types に一致しなければなりません。
dapplyCollect
dapply
のように、SparkDataFrame
の各パーティションに関数を適用し、返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、dapplyCollect
は失敗するかも知れないことに注意してください。
入力カラムによってグルーピングし、gapply
あるいはgapplyCollect
を使うことで、巨大なデータセット上で指定した関数を実行する
gapply
SparkDataFrame
の各グループへ関数を適用します。関数はSparkDataFrame
の各グループへ適用され、二つのパラメータだけを持つ必要があります: グルーピングのキーとそのキーに対応するRのdata.frame
。グループは SparkDataFrame
のカラムから選択されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それはSparkのデータタイプを基礎としてRの関数の出力スキーマを表さなければなりません。返り値のdata.frame
のカラム名はユーザによって設定されます。
gapplyCollect
gapply
のように、SparkDataFrame
の各パーティションに関数を適用し、Rのdata.frameに返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、gapplyCollect
は失敗するかも知れないことに注意してください。
spark.lapply
を使って分散されたローカルのR関数を実行する
spark.lapply
ネイティブのRのlapply
と似て、spark.lapply
は要素のリスト上に関数を実行し、Sparkを使って計算を分配します。doParallel
あるいは lapply
に似た方法でリストの要素に関数を適用します。全ての計算の結果は1つのマシーン内に収まらなければなりません。そうでなければ、df <- createDataFrame(list)
のようなことをすることができ、そしてdapply
を使います。
SparkRからのSQLクエリの実行
SparkDataFrameはSpark SQLの中の一時的なビューとして登録することもでき、そのデータにSQLクエリを実行することができます。sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrame
として返されます。
機械学習
アルゴリズム
SparkR は現在のところ以下の機械学習アルゴリズムをサポートします:
分類
spark.logit
:ロジスティック回帰
spark.mlp
:多層パーセプトロン (MLP)
spark.naiveBayes
:ナイーブ ベイズ
spark.svmLinear
:線形サポートベクターマシーン
回帰
Tree
spark.decisionTree
:Decision Tree for
Regression
および
Classification
spark.gbt
:回帰
と
分類
のための勾配ブースト木
spark.randomForest
:回帰
と
分類
のためのランダムフォレスト
クラスタリング
spark.bisectingKmeans
:二値k平均
spark.gaussianMixture
:混合ガウスモデル (GMM)
spark.kmeans
:K-Means
spark.lda
:潜在的ディレクレ配分 (LDA)
協調フィルタリング
頻度パターン マイニング
統計
spark.kstest
:コルモゴロフ-スミルノフ テスト
裏では、SparkRはモデルを訓練するためにMLlibを使用します。コードの例については、MLlibユーザガイドの対応する章を参照してください。ユーザは合致したモデルのサマリを出力するために summary
を呼ぶことができ、新しいデータ上で予想をするためにpredictを呼ぶことができ、合致したモデルを保存/ロードするためにwrite.ml/read.mlを呼ぶことができます。SparkRは、モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。
モデルの連続性
以下の例はSparkRによってMLlibのモデルをどうやって保存/ロードするかを示します。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
RとSparkの間でデータタイプをマッピング
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | 文字列 |
文字列 | 文字列 |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
構造化ストリーミング
SparkRは構造化ストリーミングAPIをサポートします。構造化ストリーミングはSpark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーミング処理エンジンです。詳細な情報は構造化ストリーミング プログラミング ガイドのR APIを見てください。
R 関数名の衝突
R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。
以下の関数はSparkRパッケージによってマスクされています。
マスクされた関数 | どうやってアクセスするか |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkRの部分は dplyr
パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr
内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)
あるいはdplyr::cume_dist(x)
のようにパッケージ名を使った呼び出しをします。
search()
を使ってR内の検索パスを調べることができます。
移行ガイド
SparkR 1.5.x から1.6.x へのアップグレード
- Spark1.6.0の前は、書き込みのデフォルトのモードは
append
でした。Spark 1.6.0でScala APIに合致するように、error
に変更されました。 - SparkSQL はRの
NA
をnull
に変換します。逆もまた同様です。
SparkR 1.6.x から2.0 へのアップグレード
- メソッド
table
は削除され、tableToDF
に置き換えられました。 - クラス
DataFrame
は名前の衝突を避けるためにSparkDataFrame
に名前を変更されました。 - Sparkの
SQLContext
とHiveContext
はSparkSession
と置き換えられるために非推奨になりました。SparkSessionをインスタンス化するには、sparkR.init()
の代わりに、その場所でsparkR.session()
を呼んでください。一度これが行われると、現在の有効なSparkSessionhaSparkDataFrame操作のために使われるでしょう。 sparkExecutorEnv
はsparkR.session
によってサポートされません。executorのための環境を設定するために、プリフィックス"spark.executorEnv.VAR_NAME"を使ってSparkの設定プロパティを設定します。例えば "spark.executorEnv.PATH"sqlContext
パラメータはもう以下のこれらの関数を必要としません:createDataFrame
,as.DataFrame
,read.json
,jsonFile
,read.parquet
,parquetFile
,read.text
,sql
,tables
,tableNames
,cacheTable
,uncacheTable
,clearCache
,dropTempTable
,read.df
,loadDF
,createExternalTable
.- メソッド
registerTempTable
はcreateOrReplaceTempView
と置き換えられるために非推奨になりました。 - メソッド
dropTempTable
はdropTempView
と置き換えられるために非推奨になりました。 - これらの関数では
sc
SparkContext パラメータはもう必要とされません:setJobGroup
,clearJobGroup
,cancelJobGroup
SparkR 2.1.0へのアップグレード
join
はもうデフォルトではデカルト積を実行しません。代わりにcrossJoin
を使ってください。
SparkR 2.2.0 へのアップグレード
numPartitions
パラメータがcreateDataFrame
とas.DataFrame
に追加されました。データを分割する時にパーティションの場所の計算がScalaでのそれと一致するように行われます。createExternalTable
メソッドがcreateTable
と置き換えられるために非推奨になりました。どちらのメソッドも外部あるいは管理されたテーブルを作成するために呼ぶことができます。追加のカタログメソッドが追加されました。- デフォルトでは、derby.log が
tempdir()
に保存されます。これはenableHiveSupport
をTRUE
に設定してSparkSessionが初期化される場合に生成されるでしょう。 spark.lda
はオプティマイザを正しく設定していませんでした。修正されました。- 幾つかのモデルのサマリの出力が
係数
を行列
として持つように更新されました。これにはspark.logit
,spark.kmeans
,spark.glm
が含まれます。spark.gaussianMixture
のためのモデルのサマリの出力はloglik
としてlog-likelihoodを追加しました。
SparkR 2.3.0 へのアップグレード
stringsAsFactors
パラメータは前もってcollect
を使って無視されました。例えば、collect(createDataFrame(iris), stringsAsFactors = TRUE))
。修正されました。summary
については、計算するための統計のためのオプションが追加されました。その出力は説明
のそれとは変更されました。- もしSparkRのパッケージとSpark JVMのバージョンが一致しない場合は警告があがるかもしれません。
SparkR 2.3.1以上へのアップグレード
- SparkR 2.3.0 以前では、
substr
メソッドのstart
パラメータは間違って1から引かれ、0ベースと見なされていました。これは矛盾した部分文字列の結果につながるかもしれず、Rでのsubstr
の挙動とも一致しません。バージョン 2.3.1以降、substr
メソッドのstart
パラメータが今では1ベースであるように修正されました。例として、substr(lit('abcdef'), 2, 4))
はSparkR 2.3.0ではabc
という結果になり、SparkR 2.3.1では結果はbcd
になるでしょう。
SparkR 2.4.0へのアップグレード
- 以前は、
spark.mlp
の最後の層のサイズの有効性をチェックしませんでした。例えば、もし訓練データが2つのラベルだけを持つ場合、c(1, 3)
のようなlayers
パラメータは前はエラーを起こしませんでした。今ではエラーを起こします。