ML パイプライン
\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]
この章では、ML パイプラインの概念を説明します。ML パイプラインはユーザが実践的な機械学習パイプラインを構築および調整するのに役立つDataFramesに基づいた高レベルAPIの同型なセットを提供します。
目次
パイプラインの主要な概念
MLlibは、機械学習アルゴリズムのためのAPIを複数のアルゴリズムを一つのパイプラインあるいはワークフローに組み合わせるのを簡単にするために、標準化します。この章ではPipeline API によって導入される主要な概念をカバーします。パイプラインの概念はほとんどscikit-learn プロジェクトによって着想されました。
-
データフレーム
: このML APIはMLデータセットとしてSpark SQLからデータフレーム
を使用します。これは様々なデータタイプを保持します。例えば、データフレーム
はテキスト、特徴ベクトル、trueのラベル、および予想を格納する異なるカラムを持つことができます。 -
変換器
:Transformer
は一つのデータフレーム
を他のデータフレーム
に変換することができるアルゴリズムです。例えば、MLモデルは 特徴量を持つデータフレーム
を予想を持つデータフレーム
に変換する変換器
です。 -
予測器
:予測器
は変換器
を生成するためにデータフレーム
にうまく当てはめることができるアルゴリズムです。例えば、学習アルゴリズムはデータフレーム
を教育しモデルを生成する予測器
です。 -
パイプライン
:パイプライン
はMLワークフローを指定するために複数の変換器
と予測器
を一緒につなげます。 -
パラメータ
: 全ての変換器
と予測器
はパラメータを指定するための共通のAPIを今では共有します。
データフレーム
機械学習はベクトル、テキスト、イメージおよび構造化データのような広範囲の様々なデータタイプに適用することができます。このAPIは様々なデータタイプをサポートするためにSpark SQLから データフレーム
を借用します。
データフレーム
は多くの基本的および構造化タイプをサポートします; サポートされるタイプのリストについてはSpark SQL データタイプリファレンス を見てください。Spark SQL ガイドにリストされるタイプに加えて、データタイプ
はML ベクトル
タイプも使うことができます。
データフレーム
は通常の RDD
から暗黙的あるいは明示的のどちらでも生成することができます。例として、以下のコード例とSpark SQL プログラミングガイド を見てください。
データフレーム
内のカラムは名前が付いています。以下の例は"text", "freatures" および "label" のような名前を使用します。
パイプライン コンポーネント
変換器
変換器
は変換器と学習モデルを含む抽象概念です。 技術的には、変換器
は一つのデータフレーム
を一般的に1つ以上のカラムを追加することで他に変換するtarnsform()
メソッドを実装します。例えば:
- 特徴変換は
データフレーム
を取り、カラム(例えば テキスト)を読み込み、それを新しいカラム(例えば、特徴ベクトル)にマップし、マップされたカラムを追加した新しいデータフレーム
を出力するかもしれません。 - 学習モデルは
データフレーム
を取り、特徴ベクトルを含むカラムを読み込み、各特長ベクトルのためのラベルを予想し、追加された予想ラベルをカラムとして新しいデータフレーム
を出力するかも知れません。
予想器
予測器
は学習アルゴリズムあるいはデータに適合あるいは教育するどのようなアルゴリズムの概念も抽出します。技術的には、予測器
は fit()
メソッドを実装します。これはデータフレーム
を受け付け、 モデル
、これは変換器
です、を生成します。例えば、ロジスティック回帰
のような学習アルゴリズムは予測器
です。そしてfit()
の呼び出しはLogisticRegressionModel
を教育します。これはモデル
で、従って変換器
です。
パイプラインコンポーネントのプロパティ
Transformer.transform()
と Estimator.fit()
は両方ともステートレスです。将来的に、ステートフルアルゴリズムは別の概念を使ってサポートされるかも知れません。
変換器
あるいは 予測器
の各インスタンスはユニークなIDを持ちます。これはパラメータ(以下で議論します)を指定する時に便利です。
パイプライン
機械学習では、処理をしデータから学習するためにアルゴリズムの系列を実行することは良くあります。例えば、単純なテキストドキュメント処理のワークフローは以下の幾つかのステージを含むかもしれません:
- 各ドキュメントのテキストを単語に分割する。
- 各ドキュメントのワードを数値の特徴ベクトルに変換する。
- 特徴ベクトルとラベルを使って予測モデルを学習する。
MLLib はPipeline
としてそのようなワークフローを表します。これは特定の順番で実行される PipelineStage
(変換器
と 予測器
)からなります。この章ではこの単純なワークフローを実行例として使用するつもりです。
どのように動くか
パイプライン
はステージの系列として指定され、各ステージは 変換器
あるいは 予測器
のどちらかです。これらのステージは順番に実行され、入力 データフレーム
は各ステージを通過する時に変換されます。変換器
ステージに関して、transform()
メソッドがデータフレーム
上で呼ばれます。予測器
ステージに関して、(PipelineModel
あるいは適合したPipeline
の一部となる)変換器
を生成するためにfit()
メソッドが呼び出され、変換器
の transform()
メソッドがDataFrame
上で呼び出されます。
簡単なテキストドキュメントワークフローについてこれを説明します。以下の図はパイプライン
の学習時の使い方です。
上では、一番上の行は3つのステージを持つパイプライン
を表します。最初の2つ(Tokenizer
と HashingTF
) は 変換器
(blue)で、3つ目 (LogisticRegression
) は 予測器
(red)です。一番下の行はパイプラインを通ったデータフロー表し、円筒はデータフレーム
を示します。Pipeline.fit()
メソッドは元のデータフレーム
上で呼び出され、それは生のテキストドキュメントとラベルを持ちます。Tokenizer.transform()
メソッドは生のテキストドキュメントを単語に分割し、単語を持つ新しいカラムをデータフレーム
に追加します。HashingTF.transform()
メソッドは単語のカラムを特徴ベクトルに変換し、それらのベクトルを持つ新しいカラムをデータフレーム
に追加します。ここで、LogisticRegression
は 予測器
のため、パイプライン
は LogisticRegressionModel
.を生成するために最初にLogisticRegression.fit()
を呼びます。パイプライン
が更にステージを持っていた場合、データフレーム
を次のステージに渡す前にDataFrame
上にLogisticRegressionModel
の transform()
メソッドを呼び出すでしょう。
パイプライン
は予測器
です。従って、パイプライン
のfit()
メソッドが実行された後で、そえrはPipelineModel
を生成します。これは変換器
です。この PipelineModel
はテスト時に使用されます; 下の図はこの使い方を説明します。
上の図の中でPipelineModel
は元の パイプライン
のステージと同じ番号を持ちますが、元のパイプライン
内の全ての 予測器
は変換器
になります。テストのデータセット上でPipelineModel
の transform()
メソッドが呼ばれると、データは適合するパイプラインに順番に渡されます。各ステージのtransform()
メソッドはデータセットを更新し、それを次のステージに渡します。
パイプライン
と PipelineModel
は訓練とテストデータが同一の特徴処理ステップに行くことを保証するのに役立ちます。
詳細
DAG パイプライン
: パイプライン
のステージが順番の配列として指定されます。ここで与えられた例は全て線形パイプライン
です。つまり各ステージ内の パイプライン
は以前のステージで生成されたデータを使用します。データフローグラフが有向非循環グラフ(DAG)である限り、非線形パイプライン
を生成することが可能です。このグラフは現在のところ暗黙的に各ステージでの入力および出力カラム名に基づいて指定されます(一般的にパラメータとして指定されます)。パイプライン
がDAGを形成する場合、ステージは幾何学的な順番で指定されなければなりません。
実行時チェック: パイプライン
はタイプが変化するデータフレーム
上で操作するため、それらはコンパイル時の型チェックを使うことができません。パイプライン
と PipelineModel
はパイプライン
を実行する前に実行時のチェックを代わりに行います。この型チェックはデータフレーム
スキーマ、データフレーム
内のカラムのデータ型の説明、を使って行われます。
Unique Pipeline stages: パイプライン
のステージはユニークなインスタンスでなければなりません。例えば、Pipeline
はユニークなIDを持たなければならないため、同じインスタンス myHashingTF
はパイプライン
に二度入れられるべきではありません。しかし、異なるインスタンス myHashingTF1
と myHashingTF2
(両方のタイプはHashingTF
) は、異なるインスタンスは異なるIDを持って生成されるので、同じパイプライン
に配置することができます。
パラメータ
MLlibの予測器
と 変換器
はパラメータを指定するために共通のAPIを使用します。
パラメータ
は自己内包説明を持つ名前付きのパラメータです。ParamMap
は(パラメータ, 値)ペアのセットです。
パラメータをアルゴリズムに渡すには主に2つの方法があります:
- インスタンスのためにパラメータを設定する。例えば、もし
lr
がLogisticRegression
のインスタンスの場合、最大10回のlr.fit()
の繰り返しのためにlr.setMaxIter(10)
を呼び出すことができます。このAPIはspark.mllib
パッケージで使われるAPIに似ています。 ParamMap
をfit()
あるいはtransform()
に渡します。ParamMap
内の全てのパラメータは以前にsetterメソッドを使って指定されたパラメータを上書くでしょう。
パラメータは、特定のEstimator
と Transformer
のインスタンスに所属します。例えば、もし2つのLogisticRegression
インスタンス lr1
と lr2
がある場合、両方のmaxIter
パラメータが指定されたParamMap
を構築することができます: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
. これはパイプライン
内でmaxIter
パラメータを持つ2つのアルゴリズムがある場合に便利です。
パイプラインの保存とロード
モデルあるいはパイプラインを後で使うためにディスクに保存することはしばしば役に立ちます。Spark1.6で、モデルの import/exportの機能がパイプラインAPIに追加されました。ほとんどの基本的な変換がもっと基本的なMLモデルの幾つかと同じようにサポートされます。保存およびロードがサポートされているかどうかはアルゴリズムのAPIドキュメントを参照してください。
コード例
この章では上で議論された機能について説明をするコードの例を示します。更に詳しい情報はAPI ドキュメントを参照してください(Scala, Javaおよび Python)。
例: 予測器、変換器、およびパラメータ
この例は予測器
, 変換器
および パラメータ
の概念をカバーします。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. このインスタンスは予測器です。
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. これはlrに格納されているパラメータを使用します
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. これは元のmaxIterを上書きします。
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Prepare training data.
List<Row> dataTraining = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);
// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // Specify 1 Param.
.put(lr.maxIter(), 30) // This overwrites the original maxIter.
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
List<Row> dataTest = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.
# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())
# Prepare test data
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
例: パイプライン
この例は上の図で説明された単純なテキストドキュメントのパイプライン
に続きます。
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
import java.util.Arrays;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
モデルの選択 (ハイパーパラメータの調整)
MLパイプラインを使う大きな利点はハイパーパラメータの最適化です。自動的なモデルの選択についての詳細な情報はML チューニングガイドを見てください。