概要: 推測器、変換器およびパイプライン - spark.ml

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

spark.ml パッケージはユーザが実践的な機械学習パイプラインを構築および調整するのに役立つDataFramesに基づいた高レベルAPIの同型なセットを提供することを目的とします。パイプラインAPI、アンサンブルなどにユニークな特徴変換器を含むspark.mlのサブパッケージのガイドについては、以下の アルゴリズムガイド の章を見てください。

目次

パイプラインの主要な概念

Spark MLは機械学習アルゴリズムのためのAPIを複数のアルゴリズムを一つのパイプラインあるいはワークフローに組み合わせるのを簡単にするために標準化します。この章ではSpark ML API によって導入される主要な概念をカバーします。パイプラインの概念はほとんどscikit-learn プロジェクトによって着想されました。

データフレーム

機械学習はベクトル、テキスト、イメージおよび構造化データのような広範囲の様々なデータタイプに適用することができます。Spark ML は様々なデータタイプをサポートするためにSpark SQLから データフレームを借用します。

データフレームは多くの基本的および構造化タイプをサポートします; サポートされるタイプのリストについてはSpark SQL データタイプリファレンス を見てください。Spark SQL ガイドにリストされるタイプに加えて、データタイプはML ベクトル タイプも使うことができます。

データフレームは通常の RDDから暗黙的あるいは明示的のどちらでも生成することができます。例として、以下のコード例とSpark SQL プログラミングガイド を見てください。

データフレーム内のカラムは名前が付いています。以下の例は"text", "freatures" および "label" のような名前を使用します。

パイプライン コンポーネント

変換器

変換器は変換器と学習モデルを含む抽象概念です。 技術的には、変換器は一つのデータフレームを一般的に筆頭以上のカラムを追加することで他に変換するtarnsform()メソッドを実装します。例えば:

予想器

予測器 は学習アルゴリズムあるいはデータに適合あるいは教育するどのようなアルゴリズムの概念も抽出します。技術的には、予測器fit()メソッドを実装します。これはデータフレームを受け付け、 モデル、これは変換器です、を生成します。例えば、ロジスティック回帰のような学習アルゴリズムは予測器です。そしてfit() の呼び出しはLogisticRegressionModelを教育します。これはモデルで、従って変換器です。

パイプラインコンポーネントのプロパティ

Transformer.transform()Estimator.fit()は両方ともステートレスです。将来的に、ステートフルアルゴリズムは別の概念を使ってサポートされるかも知れません。

変換器あるいは 予測器 の各インスタンスはユニークなIDを持ちます。これはパラメータ(以下で議論します)を指定する時に便利です。

パイプライン

機械学習では、処理をしデータから学習するためにアルゴリズムの系列を実行することは良くあります。例えば、単純なテキストドキュメント処理のワークフローは以下の幾つかのステージを含むかもしれません:

Spark ML はPipelineとしてそのようなワークフローを表します。これは特定の順番で実行される PipelineStage (変換器予測器)からなります。この章ではこの単純なワークフローを実行例として使用するつもりです。

どのように動くか

パイプライン はステージの系列として指定され、各ステージは 変換器 あるいは 予測器のどちらかです。これらのステージは順番に実行され、入力 データフレームは各ステージを通過する時に変換されます。変換器ステージに関して、transform() メソッドがデータフレーム上で呼ばれます。予測器 ステージに関して、(PipelineModel あるいは適合したPipelineの一部となる)変換器 を生成するためにfit()メソッドが呼び出され、変換器transform() メソッドがDataFrame上で呼び出されます。

簡単なテキストドキュメントワークフローについてこれを説明します。以下の図はパイプライン学習時の使い方です。

Spark ML パイプラインの例

上では、一番上の行は3つのステージを持つパイプラインを表します。最初の2つ(TokenizerHashingTF) は 変換器 (blue)で、3つ目 (LogisticRegression) は 予測器 (red)です。一番下の行はパイプラインを通ったデータフロー表し、円筒はデータフレームを示します。Pipeline.fit() メソッドは元のデータフレーム上で呼び出され、それは生のテキストドキュメントとラベルを持ちます。Tokenizer.transform() メソッドは生のテキストドキュメントを単語に分割し、単語を持つ新しいカラムをデータフレームに追加します。HashingTF.transform() メソッドは単語のカラムを特徴ベクトルに変換し、それらのベクトルを持つ新しいカラムをデータフレームに追加します。ここで、LogisticRegression予測器のため、パイプラインLogisticRegressionModel.を生成するために最初にLogisticRegression.fit() を呼びます。パイプラインが更にステージを持っていた場合、データフレームを次のステージに渡す前にDataFrame上にLogisticRegressionModeltransform()メソッドを呼び出すでしょう。

パイプライン iは予測器です。従って、パイプラインfit()メソッドが実行された後で、そえrはPipelineModelを生成します。これは変換器です。この PipelineModelテスト時に使用されます; 下の図はこの使い方を説明します。

Spark ML PipelineModel 例

上の図の中でPipelineModel は元の パイプラインのステージと同じ番号を持ちますが、元のパイプライン内の全ての 予測器変換器になります。テストのデータセット上でPipelineModeltransform() メソッドが呼ばれると、データは適合するパイプラインに順番に渡されます。各ステージのtransform() メソッドはデータセットを更新し、それを次のステージに渡します。

パイプラインPipelineModelは訓練とテストデータが同一の特徴処理ステップに行くことを保証するのに役立ちます。

詳細

DAG パイプライン: パイプラインのステージが順番の配列として指定されます。ここで与えられた例は全て線形パイプラインです。つまり各ステージ内の パイプラインは以前のステージで生成されたデータを使用します。データフローグラフが有向非循環グラフ(DAG)である限り、非線形パイプラインを生成することが可能です。このグラフは現在のところ暗黙的に各ステージでの入力および出力カラム名に基づいて指定されます(一般的にパラメータとして指定されます)。パイプラインがDAGを形成する場合、ステージは幾何学的な順番で指定されなければなりません。

実行時チェック: パイプラインはタイプが変化するデータフレーム上で操作するため、それらはコンパイル時の型チェックを使うことができません。パイプラインPipelineModelパイプラインを実行する前に実行時のチェックを代わりに行います。この型チェックはデータフレーム スキーマデータフレーム内のカラムのデータ型の説明、を使って行われます。

Unique Pipeline stages: A Pipeline’s stages should be unique instances. E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs. However, different instances myHashingTF1 and myHashingTF2 (both of type HashingTF) can be put into the same Pipeline since different instances will be created with different IDs.

パラメータ

Spark MLの予測器変換器はパラメータを指定するために共通のAPIを使用します。

パラメータは自己内包説明を持つ名前付きのパラメータです。ParamMapは(パラメータ, 値)ペアのセットです。

パラメータをアルゴリズムに渡すには主に2つの方法があります:

  1. インスタンスのためにパラメータを設定する。例えば、もしlrLogisticRegressionのインスタンスの場合、最大10回のlr.fit()の繰り返しのためにlr.setMaxIter(10)を呼び出すことができます。このAPIは spark.mllibパッケージで使われるAPIに似ています。
  2. ParamMapfit()あるいは transform() に渡します。ParamMap 内の全てのパラメータは以前にsetterメソッドを使って指定されたパラメータを上書くでしょう。

特定のEstimatorTransformerのインスタンスに所属するパラメータ。例えば、もし2つのLogisticRegression インスタンス lr1lr2がある場合、両方のmaxIterパラメータが指定されたParamMapを構築することができます: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20). これはパイプライン内でmaxIterパラメータを持つ2つのアルゴリズムがある場合に便利です。

パイプラインの保存とロード

モデルあるいはパイプラインを後で使うためにディスクに保存することはしばしば役に立ちます。Spark1.6で、モデルの import/exportの機能がパイプラインAPIに追加されました。ほとんどの基本的な変換がもっと基本的なMLモデルの幾つかと同じようにサポートされます。保存およびロードがサポートされているかどうかはアルゴリズムのAPIドキュメントを参照してください。

コード例

この章では上で議論された機能について説明をするコードの例を示します。更に詳しい情報はAPI ドキュメントを参照してください(Scala, Javaおよび Python)。幾つかのSpark MLアルゴリズムは spark.mllib アルゴリズムのためのラッパーで、特定のアルゴリズムについてはMLlib プログラミングガイドに詳細があります。

例: 予測器、変換器、およびパラメータ

この例は予測器, 変換器および パラメータの概念をカバーします。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance.  このインスタンスは予測器です。
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model.  これはlrに格納されているパラメータを使用します
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30) // Specify 1 Param.  これは元のmaxIterを上書きします。
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.
val test = sqlContext.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;

// Prepare training data.
// We use LabeledPoint, which is a JavaBean.  Spark SQL can convert RDDs of JavaBeans
// into DataFrames, where it uses the bean metadata to infer the schema.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
  new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
), LabeledPoint.class);

// Create a LogisticRegression instance.  このインスタンスは予測器です。
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01);

// Learn a LogisticRegression model.  これはlrに格納されているパラメータを使用します。
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
  .put(lr.maxIter().w(20)) // Specify 1 Param.
  .put(lr.maxIter(), 30) // This overwrites the original maxIter.
  .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.

// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
  .put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

// Prepare test documents.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
  new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
  new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
DataFrame results = model2.transform(test);
for (Row r: results.select("features", "label", "myProbability", "prediction").collect()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
      + ", prediction=" + r.get(3));
}
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params

# Prepare training data from a list of (label, features) tuples.
training = sqlContext.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. このインスタンスは予測器です。
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"

# Learn a LogisticRegression model. これはlrに格納されているパラメータを使用します。
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print "Model 1 was fit using parameters: "
print model1.extractParamMap()

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # Specify multiple Params.

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"} # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print "Model 2 was fit using parameters: "
print model2.extractParamMap()

# Prepare test data
test = sqlContext.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
selected = prediction.select("features", "label", "myProbability", "prediction")
for row in selected.collect():
    print row

例: パイプライン

この例は上の図で説明された単純なテキストドキュメントのパイプラインに続きます。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = sqlContext.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// now we can optionally save the fitted pipeline to disk
model.save("/tmp/spark-logistic-regression-model")

// we can also save this unfit pipeline to disk
pipeline.save("/tmp/unfit-lr-model")

// and load it back in during production
val sameModel = Pipeline.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sqlContext.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;

// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
public class Document implements Serializable {
  private long id;
  private String text;

  public Document(long id, String text) {
    this.id = id;
    this.text = text;
  }

  public long getId() { return this.id; }
  public void setId(long id) { this.id = id; }

  public String getText() { return this.text; }
  public void setText(String text) { this.text = text; }
}

public class LabeledDocument extends Document implements Serializable {
  private double label;

  public LabeledDocument(long id, String text, double label) {
    super(id, text);
    this.label = label;
  }

  public double getLabel() { return this.label; }
  public void setLabel(double label) { this.label = label; }
}

// Prepare training documents, which are labeled.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
  new LabeledDocument(0L, "a b c d e spark", 1.0),
  new LabeledDocument(1L, "b d", 0.0),
  new LabeledDocument(2L, "spark f g h", 1.0),
  new LabeledDocument(3L, "hadoop mapreduce", 0.0)
), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);

// Prepare test documents, which are unlabeled.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
  new Document(4L, "spark i j k"),
  new Document(5L, "l m n"),
  new Document(6L, "mapreduce spark"),
  new Document(7L, "apache hadoop")
), Document.class);

// Make predictions on test documents.
DataFrame predictions = model.transform(test);
for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
      + ", prediction=" + r.get(3));
}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# Prepare training documents from a list of (id, text, label) tuples.
LabeledDocument = Row("id", "text", "label")
training = sqlContext.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = sqlContext.createDataFrame([
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

例: クロス検証を使ったモデルの選択

MLでの重要なタスクはモデルの選択、あるいはデータを使って指定されたタスクについての最適なモデルあるいはパラメータを検索することです。これはチューニングとも呼ばれます。パイプラインは、個々のパイプライン内の各要素をチューニングするのではなく、一度に全体のパイプラインを簡単にチューニングできるようにすることでモデルの選択を容易にします。

現在のところ、 spark.mlCrossValidator クラスを使ったモデルの選択をサポートします。これは 予測器ParamMapのセット、および評価器を取ります。CrossValidator は個々の訓練およびテストデータセットとして使われるfoldsのセットにデータセットを分割することで開始します; つまり、$k=3$ fold を使って、CrossValidator は3つの(訓練, テスト)データセットペア、それぞれ訓練のためにデータの2/3を、テストのために 1/3 を生成するでしょう。CrossValidatorParamMapのセットを繰り返します。各 ParamMapのために、それは指定された 予測器 を訓練し、指定された評価器を使って評価します。

評価器は回帰問題について RegressionEvaluator に、BinaryClassificationEvaluator はバイナリデータについて、あるいは MultiClassClassificationEvaluator は多クラス問題について、なりえます。最適なParamMap を選択するために使われるデフォルトのマトリックスはそれらの評価器の中の setMetricメソッドによって上書きすることができます。

最も良い評価マトリックス(平均が$k$以上を持つ)を生成するParamMapが最も良いモデルとして選択されます。CrossValidator は最終的に最も良いParamMapとデータセット全体を使って 評価器 に適合します。

以下の例はパラメータの格子から選択するためにCrossValidatorを使った実演です。パラメータ格子を構築するのを手伝うためにParamGridBuilder ユーティリティを使います。

パラメータ格子上の cross-validation は高くつくことに注意してください。例えば、上の例では、パラメータ格子はhashingTF.numFeaturesについて3つの値を、lr.regParamについては2つの値を、そしてCrossValidator は2つの層を使用します。This multiplies out to $(3 \times 2) \times 2 = 12$ different models being trained. 現実的な設定では、もっと多くのパラメータともっと多くの層を使うことが一般的です($k=3$$k=10$ が一般的です)。別の言い方をすると、CrossValidator はとても高くつきやすいです。しかし、経験的な手動の調整より統計学的に健全なパラメータの選択のための良く確立された方法でもあります。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training data from a list of (id, text, label) tuples.
val training = sqlContext.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0),
  (4L, "b spark who", 1.0),
  (5L, "g d a y", 0.0),
  (6L, "spark fly", 1.0),
  (7L, "was mapreduce", 0.0),
  (8L, "e spark program", 1.0),
  (9L, "a e c l", 0.0),
  (10L, "spark compile", 1.0),
  (11L, "hadoop software", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2) // Use 3+ in practice

// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sqlContext.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "mapreduce spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;

// Labeled and unlabeled instance types.
// Spark SQL can infer schema from Java Beans.
public class Document implements Serializable {
  private long id;
  private String text;

  public Document(long id, String text) {
    this.id = id;
    this.text = text;
  }

  public long getId() { return this.id; }
  public void setId(long id) { this.id = id; }

  public String getText() { return this.text; }
  public void setText(String text) { this.text = text; }
}

public class LabeledDocument extends Document implements Serializable {
  private double label;

  public LabeledDocument(long id, String text, double label) {
    super(id, text);
    this.label = label;
  }

  public double getLabel() { return this.label; }
  public void setLabel(double label) { this.label = label; }
}


// Prepare training documents, which are labeled.
DataFrame training = sqlContext.createDataFrame(Arrays.asList(
  new LabeledDocument(0L, "a b c d e spark", 1.0),
  new LabeledDocument(1L, "b d", 0.0),
  new LabeledDocument(2L, "spark f g h", 1.0),
  new LabeledDocument(3L, "hadoop mapreduce", 0.0),
  new LabeledDocument(4L, "b spark who", 1.0),
  new LabeledDocument(5L, "g d a y", 0.0),
  new LabeledDocument(6L, "spark fly", 1.0),
  new LabeledDocument(7L, "was mapreduce", 0.0),
  new LabeledDocument(8L, "e spark program", 1.0),
  new LabeledDocument(9L, "a e c l", 0.0),
  new LabeledDocument(10L, "spark compile", 1.0),
  new LabeledDocument(11L, "hadoop software", 0.0)
), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
// this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
ParamMap[] paramGrid = new ParamGridBuilder()
    .addGrid(hashingTF.numFeatures(), new int[]{10, 100, 1000})
    .addGrid(lr.regParam(), new double[]{0.1, 0.01})
    .build();

// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is areaUnderROC.
CrossValidator cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2); // Use 3+ in practice

// Run cross-validation, and choose the best set of parameters.
CrossValidatorModel cvModel = cv.fit(training);

// Prepare test documents, which are unlabeled.
DataFrame test = sqlContext.createDataFrame(Arrays.asList(
  new Document(4L, "spark i j k"),
  new Document(5L, "l m n"),
  new Document(6L, "mapreduce spark"),
  new Document(7L, "apache hadoop")
), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
DataFrame predictions = cvModel.transform(test);
for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
      + ", prediction=" + r.get(3));
}

例: model selection via train validation split

CrossValidatorに加えて、Sparkはハイパーパラメータ チューニングのための TrainValidationSplit も提供します。TrainValidationSplitCrossValidatorの場合のk回に対して一度だけのみパラメータの各組み合わせを評価します。従って高くつくことないですが、訓練データセットが十分に大きくない場合でも信頼できる結果を生成しないでしょう。

TrainValidationSplit予測器estimatorParamMapsパラメータで提供されるParamMapのセット、および評価器を取ります。別個の訓練とテストデータセットとして使用される trainRatioパラメータを使って、データセットを2つの部分に分割することで開始します。例えば$trainRatio=0.75$ (デフォルト)の場合、TrainValidationSplit は訓練に75%のデータと検証に25%のデータを使用する、訓練の生成とデータセットのペアのテストを行うでしょう。CrossValidatorと似て、TrainValidationSplitParamMapのセットを使って繰り返します。パラメータの各組み合わせについて、指定された評価器を使って指定された 予測器を訓練し、それを評価します。最も良い評価マトリックスを生成するParamMap が最も良いモデルとして選択されます。 TrainValidationSplit は最終的に最も良いParamMapとデータセット全体を使って評価器に適合します。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// Prepare training and test data.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)

val lr = new LinearRegression()

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .addGrid(lr.fitIntercept)
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator)
  .setEstimatorParamMaps(paramGrid)
  // 80% of the data will be used for training and the remaining 20% for validation.
  .setTrainRatio(0.8)

// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)

// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
  .select("features", "label", "prediction")
  .show()
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.tuning.*;
import org.apache.spark.sql.DataFrame;

DataFrame data = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Prepare training and test data.
DataFrame[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
DataFrame training = splits[0];
DataFrame test = splits[1];

LinearRegression lr = new LinearRegression();

// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using
// the evaluator.
ParamMap[] paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam(), new double[] {0.1, 0.01})
  .addGrid(lr.fitIntercept())
  .addGrid(lr.elasticNetParam(), new double[] {0.0, 0.5, 1.0})
  .build();

// In this case the estimator is simply the linear regression.
// A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
  .setEstimator(lr)
  .setEvaluator(new RegressionEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8); // 80% for training and the remaining 20% for validation

// Run train validation split, and choose the best set of parameters.
TrainValidationSplitModel model = trainValidationSplit.fit(training);

// Make predictions on test data. model is the model with combination of parameters
// that performed best.
model.transform(test)
  .select("features", "label", "prediction")
  .show();
TOP
inserted by FC2 system