概要: 推測器、変換器およびパイプライン - spark.ml
\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]
spark.ml
パッケージはユーザが実践的な機械学習パイプラインを構築および調整するのに役立つDataFramesに基づいた高レベルAPIの同型なセットを提供することを目的とします。パイプラインAPI、アンサンブルなどにユニークな特徴変換器を含むspark.ml
のサブパッケージのガイドについては、以下の アルゴリズムガイド の章を見てください。
目次
パイプラインの主要な概念
Spark MLは機械学習アルゴリズムのためのAPIを複数のアルゴリズムを一つのパイプラインあるいはワークフローに組み合わせるのを簡単にするために標準化します。この章ではSpark ML API によって導入される主要な概念をカバーします。パイプラインの概念はほとんどscikit-learn プロジェクトによって着想されました。
-
データフレーム
: Spark ML はMLデータセットとしてSpark SQLからデータフレーム
を使用します。これは様々なデータタイプを保持します。例えば、データフレーム
はテキスト、特徴ベクトル、trueのラベル、および予想を格納する異なるカラムを持つことができます。 -
変換器
:Transformer
は一つのデータフレーム
を他のデータフレーム
に変換することができるアルゴリズムです。例えば、MLモデルは 特徴量を持つデータフレーム
を予想を持つデータフレーム
に変換する変換器
です。 -
予測器
:予測器
は変換器
を生成するためにデータフレーム
にうまく当てはめることができるアルゴリズムです。例えば、学習アルゴリズムはデータフレーム
を教育しモデルを生成する予測器
です。 -
パイプライン
:パイプライン
はMLワークフローを指定するために複数の変換器
と予測器
を一緒につなげます。 -
パラメータ
: 全ての変換器
と予測器
はパラメータを指定するための共通のAPIを今は共有します。
データフレーム
機械学習はベクトル、テキスト、イメージおよび構造化データのような広範囲の様々なデータタイプに適用することができます。Spark ML は様々なデータタイプをサポートするためにSpark SQLから データフレーム
を借用します。
データフレーム
は多くの基本的および構造化タイプをサポートします; サポートされるタイプのリストについてはSpark SQL データタイプリファレンス を見てください。Spark SQL ガイドにリストされるタイプに加えて、データタイプ
はML ベクトル
タイプも使うことができます。
データフレーム
は通常の RDD
から暗黙的あるいは明示的のどちらでも生成することができます。例として、以下のコード例とSpark SQL プログラミングガイド を見てください。
データフレーム
内のカラムは名前が付いています。以下の例は"text", "freatures" および "label" のような名前を使用します。
パイプライン コンポーネント
変換器
変換器
は変換器と学習モデルを含む抽象概念です。 技術的には、変換器
は一つのデータフレーム
を一般的に筆頭以上のカラムを追加することで他に変換するtarnsform()
メソッドを実装します。例えば:
- 特徴変換は
データフレーム
を取り、カラム(例えば テキスト)を読み込み、それを新しいカラム(例えば、特徴ベクトル)にマップし、マップされたカラムを追加した新しいデータフレーム
を出力するかもしれません。 - 学習モデルは
データフレーム
を取り、特徴ベクトルを含むカラムを読み込み、各特長ベクトルのためのラベルを予想し、追加された予想ラベルをカラムとして新しいデータフレーム
を出力するかも知れません。
予想器
予測器
は学習アルゴリズムあるいはデータに適合あるいは教育するどのようなアルゴリズムの概念も抽出します。技術的には、予測器
は fit()
メソッドを実装します。これはデータフレーム
を受け付け、 モデル
、これは変換器
です、を生成します。例えば、ロジスティック回帰
のような学習アルゴリズムは予測器
です。そしてfit()
の呼び出しはLogisticRegressionModel
を教育します。これはモデル
で、従って変換器
です。
パイプラインコンポーネントのプロパティ
Transformer.transform()
と Estimator.fit()
は両方ともステートレスです。将来的に、ステートフルアルゴリズムは別の概念を使ってサポートされるかも知れません。
変換器
あるいは 予測器
の各インスタンスはユニークなIDを持ちます。これはパラメータ(以下で議論します)を指定する時に便利です。
パイプライン
機械学習では、処理をしデータから学習するためにアルゴリズムの系列を実行することは良くあります。例えば、単純なテキストドキュメント処理のワークフローは以下の幾つかのステージを含むかもしれません:
- 各ドキュメントのテキストを単語に分割する。
- 各ドキュメントのワードを数値の特徴ベクトルに変換する。
- 特徴ベクトルとラベルを使って予測モデルを学習する。
Spark ML はPipeline
としてそのようなワークフローを表します。これは特定の順番で実行される PipelineStage
(変換器
と 予測器
)からなります。この章ではこの単純なワークフローを実行例として使用するつもりです。
どのように動くか
パイプライン
はステージの系列として指定され、各ステージは 変換器
あるいは 予測器
のどちらかです。これらのステージは順番に実行され、入力 データフレーム
は各ステージを通過する時に変換されます。変換器
ステージに関して、transform()
メソッドがデータフレーム
上で呼ばれます。予測器
ステージに関して、(PipelineModel
あるいは適合したPipeline
の一部となる)変換器
を生成するためにfit()
メソッドが呼び出され、変換器
の transform()
メソッドがDataFrame
上で呼び出されます。
簡単なテキストドキュメントワークフローについてこれを説明します。以下の図はパイプライン
の学習時の使い方です。
上では、一番上の行は3つのステージを持つパイプライン
を表します。最初の2つ(Tokenizer
と HashingTF
) は 変換器
(blue)で、3つ目 (LogisticRegression
) は 予測器
(red)です。一番下の行はパイプラインを通ったデータフロー表し、円筒はデータフレーム
を示します。Pipeline.fit()
メソッドは元のデータフレーム
上で呼び出され、それは生のテキストドキュメントとラベルを持ちます。Tokenizer.transform()
メソッドは生のテキストドキュメントを単語に分割し、単語を持つ新しいカラムをデータフレーム
に追加します。HashingTF.transform()
メソッドは単語のカラムを特徴ベクトルに変換し、それらのベクトルを持つ新しいカラムをデータフレーム
に追加します。ここで、LogisticRegression
は 予測器
のため、パイプライン
は LogisticRegressionModel
.を生成するために最初にLogisticRegression.fit()
を呼びます。パイプライン
が更にステージを持っていた場合、データフレーム
を次のステージに渡す前にDataFrame
上にLogisticRegressionModel
の transform()
メソッドを呼び出すでしょう。
パイプライン
iは予測器
です。従って、パイプライン
のfit()
メソッドが実行された後で、そえrはPipelineModel
を生成します。これは変換器
です。この PipelineModel
はテスト時に使用されます; 下の図はこの使い方を説明します。
上の図の中でPipelineModel
は元の パイプライン
のステージと同じ番号を持ちますが、元のパイプライン
内の全ての 予測器
は変換器
になります。テストのデータセット上でPipelineModel
の transform()
メソッドが呼ばれると、データは適合するパイプラインに順番に渡されます。各ステージのtransform()
メソッドはデータセットを更新し、それを次のステージに渡します。
パイプライン
と PipelineModel
は訓練とテストデータが同一の特徴処理ステップに行くことを保証するのに役立ちます。
詳細
DAG パイプライン
: パイプライン
のステージが順番の配列として指定されます。ここで与えられた例は全て線形パイプライン
です。つまり各ステージ内の パイプライン
は以前のステージで生成されたデータを使用します。データフローグラフが有向非循環グラフ(DAG)である限り、非線形パイプライン
を生成することが可能です。このグラフは現在のところ暗黙的に各ステージでの入力および出力カラム名に基づいて指定されます(一般的にパラメータとして指定されます)。パイプライン
がDAGを形成する場合、ステージは幾何学的な順番で指定されなければなりません。
実行時チェック: パイプライン
はタイプが変化するデータフレーム
上で操作するため、それらはコンパイル時の型チェックを使うことができません。パイプライン
と PipelineModel
はパイプライン
を実行する前に実行時のチェックを代わりに行います。この型チェックはデータフレーム
スキーマ、データフレーム
内のカラムのデータ型の説明、を使って行われます。
Unique Pipeline stages: A Pipeline
’s stages should be unique instances. E.g., the same instance
myHashingTF
should not be inserted into the Pipeline
twice since Pipeline
stages must have
unique IDs. However, different instances myHashingTF1
and myHashingTF2
(both of type HashingTF
)
can be put into the same Pipeline
since different instances will be created with different IDs.
パラメータ
Spark MLの予測器
と 変換器
はパラメータを指定するために共通のAPIを使用します。
パラメータ
は自己内包説明を持つ名前付きのパラメータです。ParamMap
は(パラメータ, 値)ペアのセットです。
パラメータをアルゴリズムに渡すには主に2つの方法があります:
- インスタンスのためにパラメータを設定する。例えば、もし
lr
がLogisticRegression
のインスタンスの場合、最大10回のlr.fit()
の繰り返しのためにlr.setMaxIter(10)
を呼び出すことができます。このAPIはspark.mllib
パッケージで使われるAPIに似ています。 ParamMap
をfit()
あるいはtransform()
に渡します。ParamMap
内の全てのパラメータは以前にsetterメソッドを使って指定されたパラメータを上書くでしょう。
特定のEstimator
と Transformer
のインスタンスに所属するパラメータ。例えば、もし2つのLogisticRegression
インスタンス lr1
と lr2
がある場合、両方のmaxIter
パラメータが指定されたParamMap
を構築することができます: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
. これはパイプライン
内でmaxIter
パラメータを持つ2つのアルゴリズムがある場合に便利です。
パイプラインの保存とロード
モデルあるいはパイプラインを後で使うためにディスクに保存することはしばしば役に立ちます。Spark1.6で、モデルの import/exportの機能がパイプラインAPIに追加されました。ほとんどの基本的な変換がもっと基本的なMLモデルの幾つかと同じようにサポートされます。保存およびロードがサポートされているかどうかはアルゴリズムのAPIドキュメントを参照してください。
コード例
この章では上で議論された機能について説明をするコードの例を示します。更に詳しい情報はAPI ドキュメントを参照してください(Scala, Javaおよび Python)。幾つかのSpark MLアルゴリズムは spark.mllib
アルゴリズムのためのラッパーで、特定のアルゴリズムについてはMLlib プログラミングガイドに詳細があります。
例: 予測器、変換器、およびパラメータ
この例は予測器
, 変換器
および パラメータ
の概念をカバーします。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. このインスタンスは予測器です。
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. これはlrに格納されているパラメータを使用します
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. これは元のmaxIterを上書きします。
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// Prepare test data.
val test = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into DataFrames, where it uses the bean metadata to infer the schema.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
), LabeledPoint.class);
// Create a LogisticRegression instance. このインスタンスは予測器です。
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01);
// Learn a LogisticRegression model. これはlrに格納されているパラメータを使用します。
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // Specify 1 Param.
.put(lr.maxIter(), 30) // This overwrites the original maxIter.
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
), LabeledPoint.class);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
DataFrame results = model2.transform(test);
for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
# Prepare training data from a list of (label, features) tuples.
training = sqlContext.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. このインスタンスは予測器です。
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
# Learn a LogisticRegression model. これはlrに格納されているパラメータを使用します。
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print "Model 1 was fit using parameters: "
print model1.extractParamMap()
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print "Model 2 was fit using parameters: "
print model2.extractParamMap()
# Prepare test data
test = sqlContext.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
selected = prediction.select("features", "label", "myProbability", "prediction")
for row in selected.collect():
print row
例: パイプライン
この例は上の図で説明された単純なテキストドキュメントのパイプライン
に続きます。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = sqlContext.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// now we can optionally save the fitted pipeline to disk
model.save("/tmp/spark-logistic-regression-model")
// we can also save this unfit pipeline to disk
pipeline.save("/tmp/unfit-lr-model")
// and load it back in during production
val sameModel = Pipeline.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sqlContext.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
public class Document implements Serializable {
private long id;
private String text;
public Document(long id, String text) {
this.id = id;
this.text = text;
}
public long getId() { return this.id; }
public void setId(long id) { this.id = id; }
public String getText() { return this.text; }
public void setText(String text) { this.text = text; }
}
public class LabeledDocument extends Document implements Serializable {
private double label;
public LabeledDocument(long id, String text, double label) {
super(id, text);
this.label = label;
}
public double getLabel() { return this.label; }
public void setLabel(double label) { this.label = label; }
}
// Prepare training documents, which are labeled.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0)
), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop")
), Document.class);
// Make predictions on test documents.
DataFrame predictions = model.transform(test);
for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# Prepare training documents from a list of (id, text, label) tuples.
LabeledDocument = Row("id", "text", "label")
training = sqlContext.createDataFrame([
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = sqlContext.createDataFrame([
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print(row)
例: クロス検証を使ったモデルの選択
MLでの重要なタスクはモデルの選択、あるいはデータを使って指定されたタスクについての最適なモデルあるいはパラメータを検索することです。これはチューニングとも呼ばれます。パイプライン
は、個々のパイプライン
内の各要素をチューニングするのではなく、一度に全体のパイプライン
を簡単にチューニングできるようにすることでモデルの選択を容易にします。
現在のところ、 spark.ml
は CrossValidator
クラスを使ったモデルの選択をサポートします。これは 予測器
、ParamMap
のセット、および評価器
を取ります。CrossValidator
は個々の訓練およびテストデータセットとして使われるfoldsのセットにデータセットを分割することで開始します; つまり、$k=3$
fold を使って、CrossValidator
は3つの(訓練, テスト)データセットペア、それぞれ訓練のためにデータの2/3を、テストのために 1/3 を生成するでしょう。CrossValidator
はParamMap
のセットを繰り返します。各 ParamMap
のために、それは指定された 予測器
を訓練し、指定された評価器
を使って評価します。
評価器
は回帰問題について RegressionEvaluator
に、BinaryClassificationEvaluator
はバイナリデータについて、あるいは MultiClassClassificationEvaluator
は多クラス問題について、なりえます。最適なParamMap
を選択するために使われるデフォルトのマトリックスはそれらの評価器の中の setMetric
メソッドによって上書きすることができます。
最も良い評価マトリックス(平均が$k$
以上を持つ)を生成するParamMap
が最も良いモデルとして選択されます。CrossValidator
は最終的に最も良いParamMap
とデータセット全体を使って 評価器
に適合します。
以下の例はパラメータの格子から選択するためにCrossValidator
を使った実演です。パラメータ格子を構築するのを手伝うためにParamGridBuilder
ユーティリティを使います。
パラメータ格子上の cross-validation は高くつくことに注意してください。例えば、上の例では、パラメータ格子はhashingTF.numFeatures
について3つの値を、lr.regParam
については2つの値を、そしてCrossValidator
は2つの層を使用します。This multiplies out to $(3 \times 2) \times 2 = 12$
different models being trained. 現実的な設定では、もっと多くのパラメータともっと多くの層を使うことが一般的です($k=3$
と $k=10$
が一般的です)。別の言い方をすると、CrossValidator
はとても高くつきやすいです。しかし、経験的な手動の調整より統計学的に健全なパラメータの選択のための良く確立された方法でもあります。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training data from a list of (id, text, label) tuples.
val training = sqlContext.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0),
(10L, "spark compile", 1.0),
(11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sqlContext.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
public class Document implements Serializable {
private long id;
private String text;
public Document(long id, String text) {
this.id = id;
this.text = text;
}
public long getId() { return this.id; }
public void setId(long id) { this.id = id; }
public String getText() { return this.text; }
public void setText(String text) { this.text = text; }
}
public class LabeledDocument extends Document implements Serializable {
private double label;
public LabeledDocument(long id, String text, double label) {
super(id, text);
this.label = label;
}
public double getLabel() { return this.label; }
public void setLabel(double label) { this.label = label; }
}
// Prepare training documents, which are labeled.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0),
new LabeledDocument(4L, "b spark who", 1.0),
new LabeledDocument(5L, "g d a y", 0.0),
new LabeledDocument(6L, "spark fly", 1.0),
new LabeledDocument(7L, "was mapreduce", 0.0),
new LabeledDocument(8L, "e spark program", 1.0),
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0)
), LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures(), new int[]{10, 100, 1000})
.addGrid(lr.regParam(), new double[]{0.1, 0.01})
.build();
// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
CrossValidator cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2); // Use 3+ in practice
// Run cross-validation, and choose the best set of parameters.
CrossValidatorModel cvModel = cv.fit(training);
// Prepare test documents, which are unlabeled.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop")
), Document.class);
// Make predictions on test documents. cvModel uses the best model found (lrModel).
DataFrame predictions = cvModel.transform(test);
for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
例: model selection via train validation split
CrossValidator
に加えて、Sparkはハイパーパラメータ チューニングのための TrainValidationSplit
も提供します。TrainValidationSplit
は CrossValidator
の場合のk回に対して一度だけのみパラメータの各組み合わせを評価します。従って高くつくことないですが、訓練データセットが十分に大きくない場合でも信頼できる結果を生成しないでしょう。
TrainValidationSplit
は予測器
、estimatorParamMaps
パラメータで提供されるParamMap
のセット、および評価器
を取ります。別個の訓練とテストデータセットとして使用される trainRatio
パラメータを使って、データセットを2つの部分に分割することで開始します。例えば$trainRatio=0.75$
(デフォルト)の場合、TrainValidationSplit
は訓練に75%のデータと検証に25%のデータを使用する、訓練の生成とデータセットのペアのテストを行うでしょう。CrossValidator
と似て、TrainValidationSplit
もParamMap
のセットを使って繰り返します。パラメータの各組み合わせについて、指定された評価器
を使って指定された 予測器
を訓練し、それを評価します。最も良い評価マトリックスを生成するParamMap
が最も良いモデルとして選択されます。 TrainValidationSplit
は最終的に最も良いParamMap
とデータセット全体を使って評価器
に適合します。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
// Prepare training and test data.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
val lr = new LinearRegression()
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.addGrid(lr.fitIntercept)
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
// 80% of the data will be used for training and the remaining 20% for validation.
.setTrainRatio(0.8)
// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)
// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
.select("features", "label", "prediction")
.show()
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.tuning.*;
import org.apache.spark.sql.DataFrame;
DataFrame data = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Prepare training and test data.
DataFrame[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
DataFrame training = splits[0];
DataFrame test = splits[1];
LinearRegression lr = new LinearRegression();
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam(), new double[] {0.1, 0.01})
.addGrid(lr.fitIntercept())
.addGrid(lr.elasticNetParam(), new double[] {0.0, 0.5, 1.0})
.build();
// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator())
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8); // 80% for training and the remaining 20% for validation
// Run train validation split, and choose the best set of parameters.
TrainValidationSplitModel model = trainValidationSplit.fit(training);
// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
.select("features", "label", "prediction")
.show();