SparkR (SparkでのR)
- 概要
- SparkDataFrame
- 機械学習
- RとSparkの間でデータタイプをマッピング
- 構造化ストリーミング
- SparkR でのApache Arrow
- R 関数名の衝突
- 移行ガイド
概要
SparkRは、RからApache Sparkを使うための軽量フロントエンドを提供するR パッケージです。Spark 3.2.1では、SparkRは巨大なデータセット上でのselection, filtering, aggregation など(Rデータフレームに似ています dplyr) のような操作をサポートする分散データフレームの実装を提供します。SparkRはMLlibを使った分散機械学習もサポートします。
SparkDataFrame
SparkDataFrameは、データの分散コレクションが名前付きの列に整理されたものです。概念的にはリレーショナルデータベースのテーブル、あるいはRでのデータフレームに相当しますが、裏ではもっと最適化されています。SparkDataFrameは以下のようなソースの大きな配列です: 構造化されたデータファイル、Hiveのテーブル、外部データベース、あるいは既存のローカルのRデータフレーム。
このページの全ての例は、RまたはSparkの配布物に含まれるサンプルデータを使用し、./bin/sparkR
シェルを使って実行することができます。
開始: SparkSession
SparkRの入り口は、RプログラムがSparkクラスタに接続しているSparkSession
です。sparkR.session
を使って SparkSession
を生成することができ、アプリケーション名、依存される全てのSparkパッケージなどのようなオプションを渡すことができます。更に、SparkSession
を使ってSparkDataFrameと連携することもできます。SparkR
シェルから操作をしている場合は、SparkSession
は既に生成されており、<c7>sparkR.session/c7>を呼び出す必要は無いでしょう。
RStudioからの開始
RStudioからSparkRを開始することも可能です。RプログラムをRStudio, R shell, RscriptあるいはR IDEからSpark クラスタに接続することができます。始めるには、環境に SPARK_HOME を設定(Sys.getenvを調べることができます)し、SparkRパッケージをロードし、以下のようにl sparkR.session
を呼び出してください。Sparkのインストレーションの調査を行い、もし見つからなければ自動的にダウンロードしキャッシュするでしょう。他のやり方として、install.spark
を手動で実行することもできます。
sparkR.session
の呼び出しに加えて、特定のSparkドライバのプロパティを指定することもできます。この場合SparkRが気を使ってドライバーJVMプロセスが開始されているため、通常これらのアプリケーション プロパティ および ランタイム環境 はプログラム的に設定することはできません。それらを設定するには、sparkConfig
引数の中の他の設定プロパティをsparkR.session()
に渡すように、それらを渡します。
以下のSparkドライバーのプロパティはRStudioからsparkR.session
を使って sparkConfig
に設定することができます。
プロパティ名 | プロパティグループ | spark-submit 等価なもの |
---|---|---|
spark.master |
アプリケーションのプロパティ | --master |
spark.kerberos.keytab |
アプリケーションのプロパティ | --keytab |
spark.kerberos.principal |
アプリケーションのプロパティ | --principal |
spark.driver.memory |
アプリケーションのプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
SparkDataFramesの作成
SparkSession
を使って、アプリケーションはローカルのRデータフレーム、Hiveテーブル、あるいはその他の data sourcesから、SparkDataFrame
を生成することができます。
ローカルデータフレームから
データフレームを生成する最も簡単な方法はローカルのRデータフレームをSparkDataFrameに変換する方法です。具体的には、as.DataFrame
あるいはcreateDataFrame
を使い、 SparkDataFrameを生成するためにローカルのRデータフレームを渡すことができます。例として、以下はRのfaithful
を使ったSparkDataFrame
を生成します。
データソースから
SparkRは SparkDataFrame
インタフェースを使った様々なデータソースへの操作をサポートします。この章ではデータソースを使ってデータをロードおよび保存する一般的な方法について説明します。組み込みのデータソースに利用可能な特定のオプションについてはSpark SQL プログラミングガイドを調べる事ができます。
データソースからSparkDataFrameを作成する一般的なメソッドは read.df
です。このメソッドはロードするファイルのパスとデータソースの種類を取り、現在のところ有効なSparkSessionが自動的に使われるようにするでしょう。SparkRはそのままでJSON, CSVおよびParquetファイルの読み込みをサポートし、サードパーティ プロジェクトのようなソースから利用可能なパッケージを使って、Avroのような一般的なファイル形式のためのデータソースコネクタを見つけることができます。これらのパッケージはspark-submit
あるいは sparkR
と一緒に--packages
を指定するか、対話的なRシェルあるいはRStudioにいる時は sparkPackages
パラメータを使ってSparkSessionを初期化するかによって追加することができます。
例のJSON入力ファイルを使ってデータソースを使用する方法を見ることができます。ここで使われているファイルは代表的なJSONファイルではないことに注意してください。ファイル内の各行は別個の自己内包の有効なJSONオブジェクトでなければなりません。更に詳しい情報は、JSON Lines text format, also called newline-delimited JSONを見てください。結果として、通常の複数行のJSONファイルはほとんどの場合失敗するでしょう。
データソースAPIは本質的にCSV形式の入力ファイルをサポートします。もっと詳しい情報はSparkRのrad.df APIドキュメントを参照してください。
データソースAPIはSparkDataFramesを複数のファイルフォーマットに保存するために使用することもできます。例えば、SparkDataFrameを以前の例からwrite.df
を使ってParquetファイルへ保存することができます。
Hiveテーブルから
SparkDataFrameをHiveテーブルから生成することもできます。こうするには、Hiveメタストア内のテーブルにアクセス可能なHiveサポートのSparkSessionを生成する必要があるでしょう。SparkはHiveをサポートするようにビルドされていなければなりません。詳細はSQL プログラミングガイドで見つけることができます。SparkRでは、デフォルトでHiveサポートが有効なSparkSessionを生成しようとするでしょう(enableHiveSupport = TRUE
)。
SparkDataFrame 操作
SparkDataFrames は構造化データ処理を行う多数の関数をサポートします。ここでは幾つかの基本的な例を示し、完全なリストはAPI ドキュメントの中で見つけることができます:
行、列の選択
グルーピング、集約
SparkRデータフレームはグルーピングの後でデータを集約するために一般的に使われる多くの関数をサポートします。例えば、以下に示すようにfaithful
データセットの中で待ち
時間のヒストグラムを計算することができます。
標準の集約に加えて、SparkRは OLAP cube オペレータ cube
をサポートします:
そして rollup
:
列の操作
SparkRは集約の間にデータ処理のためにカラムに直接適用することができる多くの関数も提供します。以下の例は基本的な数学関数の使用を示します。
ユーザ定義の関数の適用
SparkRでは、いくつかの種類のユーザ定義の関数をサポートします。
dapply
あるいは dapplyCollect
を使って巨大なデータセット上で指定した関数を実行する
dapply
SparkDataFrame
の各パーティションへ関数を適用します。SparkDataFrame
の各パーティションに適用される関数は1つだけパラメータを持たなければなりません。各パーティションに対応する data.frame
がそれに渡されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それは返り値のdata types に一致しなければなりません。
dapplyCollect
dapply
のように、SparkDataFrame
の各パーティションに関数を適用し、返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、dapplyCollect
は失敗するかも知れないことに注意してください。
入力カラムによってグルーピングし、gapply
あるいはgapplyCollect
を使うことで、巨大なデータセット上で指定した関数を実行する
gapply
SparkDataFrame
の各グループへ関数を適用します。関数はSparkDataFrame
の各グループへ適用され、二つのパラメータだけを持つ必要があります: グルーピングのキーとそのキーに対応するRのdata.frame
。グループは SparkDataFrame
のカラムから選択されます。関数の出力はdata.frame
であるべきです。スキーマはSparkDataFrame
の結果の行のフォーマットを定義します。それはSparkのデータタイプを基礎としてRの関数の出力スキーマを表さなければなりません。返り値のdata.frame
のカラム名はユーザによって設定されます。
gapplyCollect
gapply
のように、SparkDataFrame
の各パーティションに関数を適用し、Rのdata.frameに返ってくる結果をまとめます。関数の出力はdata.frame
であるべきです。しかし、Schemaは渡される必要はありません。全てのパーティション上で実行されるUDFの出力がドライバーへ取り出せずドライバーのメモリ内に収まる場合に、gapplyCollect
は失敗するかも知れないことに注意してください。
spark.lapply
を使って分散されたローカルのR関数を実行する
spark.lapply
ネイティブのRのlapply
と似て、spark.lapply
は要素のリスト上に関数を実行し、Sparkを使って計算を分配します。doParallel
あるいは lapply
に似た方法でリストの要素に関数を適用します。全ての計算の結果は1つのマシーン内に収まらなければなりません。そうでなければ、df <- createDataFrame(list)
のようなことをすることができ、そしてdapply
を使います。
Eager execution
eager execution が有効になっている場合、SparkDataFrame
が作成されるとすぐにデータが R クライアントに返されます。デフォルトでは、eager 実行は無効で、SparkSession
が起動する時に設定プロパティspark.sql.repl.eagerEval.enabled
をtrue
に設定することで、有効にすることができます。
表示するデータの行の最大数と、列ごとの文字の最大数は、それぞれspark.sql.repl.eagerEval.maxNumRows
とspark.sql.repl.eagerEval.truncate
設定パラメータで制御することができます。これらのプロパティは、eager 実行が有効な場合にのみ効果があります。これらのプロパティは明示的に設定されない場合、デフォルトでは、20行と、列ごとに20文字までのデータが表示されます。
sparkR
シェルの中で eager 実行を有効にするには、spark.sql.repl.eagerEval.enabled=true
設定プロパティを --conf
オプションに追加します。
SparkRからのSQLクエリの実行
SparkDataFrameはSpark SQLの中の一時的なビューとして登録することもでき、そのデータにSQLクエリを実行することができます。sql
関数によってアプリケーションはSQLクエリをプログラム的に実行することができ、結果はSparkDataFrame
として返されます。
機械学習
アルゴリズム
SparkR は現在のところ以下の機械学習アルゴリズムをサポートします:
分類
spark.logit
:ロジスティック回帰
spark.mlp
:多層パーセプトロン (MLP)
spark.naiveBayes
:ナイーブ ベイズ
spark.svmLinear
:線形サポートベクターマシーン
spark.fmClassifier
:Factorization Machines classifier
回帰
spark.survreg
:加速度故障時間 (AFT) 生存モデル
spark.glm
orglm
:一般化線形モデル (GLM)
spark.isoreg
:等調回帰
spark.lm
:Linear Regression
spark.fmRegressor
:Factorization Machines regressor
Tree
spark.decisionTree
:Decision Tree for
Regression
および
Classification
spark.gbt
:回帰
と
分類
のための勾配ブースト木
spark.randomForest
:回帰
と
分類
のためのランダムフォレスト
クラスタリング
spark.bisectingKmeans
:二値k平均
spark.gaussianMixture
:混合ガウスモデル (GMM)
spark.kmeans
:K-Means
spark.lda
:潜在的ディレクレ配分 (LDA)
spark.powerIterationClustering (PIC)
:Power Iteration Clustering (PIC)
協調フィルタリング
頻度パターン マイニング
統計
spark.kstest
:コルモゴロフ-スミルノフ テスト
裏では、SparkRはモデルを訓練するためにMLlibを使用します。コードの例については、MLlibユーザガイドの対応する章を参照してください。ユーザは合致したモデルのサマリを出力するために summary
を呼ぶことができ、新しいデータ上で予想をするためにpredictを呼ぶことができ、合致したモデルを保存/ロードするためにwrite.ml/read.mlを呼ぶことができます。SparkRは、モデルの適合のために‘~’, ‘.’, ‘:‘, ‘+’および‘-‘を含む利用可能なRの公式のサブセットをサポートします。
モデルの連続性
以下の例はSparkRによってMLlibのモデルをどうやって保存/ロードするかを示します。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
RとSparkの間でデータタイプをマッピング
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | 文字列 |
文字列 | 文字列 |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
構造化ストリーミング
SparkRは構造化ストリーミングAPIをサポートします。構造化ストリーミングはSpark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーミング処理エンジンです。詳細な情報は構造化ストリーミング プログラミング ガイドのR APIを見てください。
SparkR でのApache Arrow
Apache ArrowはJVMとRプロセスの間で効率的にデータを転送するためにSparkで使われる カラム状のデータ形式です。See also PySpark optimization done, PySpark Usage Guide for Pandas with Apache Arrow. このガイドは、SparkR で Arrow の最適化を利用する方法を、幾つかの重要なポイントとともに説明することを目的としています。
Arrow がインストールされたことを確認してください
Arrow R ライブラリは CPAN で利用可能で、以下のようにインストールできます。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
詳細については、Apache Arrow の公式ドキュメントを参照してください。
Arrow R パッケージがインストールされており、全てのクラスタノードで利用可能であることを確認するようにしてください。現在サポートされる最小バージョンは 1.0.0 です; ただし、Spark R での Arrow 最適化は実験的であるため、マイナーリリース間で変わる可能性があります。
R DataFrame、dapply
および gapply
へ/から の変換を有効化
Arrow の最適化は、collect(spark_df)
を使って Spark DataFrame を R DataFrame に変換する時、createDataFrame(r_df)
を使って Spark DataFrame を R DataFrame から作成する時、dapply(...)
を使って R ネイティブ関数を各パーティションに適用する時、およびgapply(...)
を使って R ネイティブ関数をグループ化されたデータに適用する時に、利用可能です。これらの呼び出しを実行する時に Arrow を使うには、ユーザはまず Spark 設定 ‘spark.sql.execution.arrow.sparkr.enabled’ を ‘true’ に設定する必要があります。これはデフォルトでは無効です。
最適化が有効または無効かに関係なく、SparkRは同じ結果を生成します。さらに、Spark DataFrame と R DataFrame の間の変換は、実際の計算の前になんらかの理由で最適化が失敗した場合、自動的に非Arrow最適化の実装にフォールバックします。
Arrowを使ったとしても、collect(spark_df)
はドライバプログラムへのデータフレーム内の全てのレコードのコレクションという結果になり、データの小さな部分セット上で行われるべきです。さらに、gapply(...)
とdapply(...)
で指定された出力スキーマは、指定された関数で返される R DataFrameのものと一致するはずです。
サポートされるSQL型
現在のところ、FloatType
, BinaryType
, ArrayType
, StructType
, MapType
を除いて、全てのSpark SQLデータ型がArrowベースの変換でサポートされます。
R 関数名の衝突
R内の新しいパッケージをロードおよびアタッチする場合、名前のconflictがありえます。この場合、関数が他の関数をマスクします。
以下の関数はSparkRパッケージによってマスクされています。
マスクされた関数 | どうやってアクセスするか |
---|---|
cov in package:stats |
|
filter in package:stats |
|
sample in package:base |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkRの部分は dplyr
パッケージ上でモデル化されるため、SparkRのあるパッケージはdplyr
内のそれらの同じ名前を共有します。二つのパッケージのロード順序に依存して、最初にロードされたパッケージの関数が後でロードされたパッケージの関数によってマスクされます。そのような場合、例えばSparkR::cume_dist(x)
あるいはdplyr::cume_dist(x)
のようにパッケージ名を使った呼び出しをします。
search()
を使ってR内の検索パスを調べることができます。
移行ガイド
移行ガイドは今はこのページに保管されています。