分類と回帰 - spark.ml

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

目次

spark.mlでは、$L_1$ あるいは $L_2$ 正規化を使った人気のあるロジスティック回帰や線形最小二乗法を実装します。実装とチューニングについての詳細はmllibでの線形法を参照してください。Elastic netのためのデータフレームAPI、Zou et al, Regularization and variable selection via the elastic netで提案された$L_1$ と $L_2$の混成、も含まれています。数学的には、$L_1$ と the $L_2$ の正規化の凸結合として定義されます:\[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \] $\alpha$ を適切に設定することで、elastic net は $L_1$ と $L_2$ の両方の正規化を特別な場合として含みます。例えば、もしl線形回帰 モデルが elastic net パラメータ $\alpha$ を $1$ にして訓練されると、Lasso モデルと同じになります。言い換えると、もし $\alpha$ が $0$ に設定されると、訓練されたモデルは リッジ回帰モデルに変形されます。線形回帰とロジスティック回帰の両方を elastic net 正規化を使ってパイプラインAPIを実装します。

分類

ロジスティック回帰

ロジスティック回帰は二分応答を予測するための人気のある方法です。それは結果の確率を予測する一般化された線形モデル の特別な例です。実装についての背景と詳細はspark.mllibでのロジスティック回帰を参照してください。

spark.mlでの現在のロジスティック回帰の実装は、二分クラスのみをサポートします。多クラス回帰のサポートは将来追加されるでしょう。

以下の例はelastic net 正規化を使ってロジスティック回帰モデルを訓練する方法を示します。elasticNetParam は $\alpha$ に対応し、regParam は $\lambda$ に対応します。

import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = sqlCtx.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala" で見つかります。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load training data
DataFrame training = sqlContext.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for logistic regression
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java" で見つかります。
from pyspark.ml.classification import LogisticRegression

# Load training data
training = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/logistic_regression_with_elastic_net.py" で見つかります。

ロジスティック回帰のspark.ml の実装は、訓練セットのモデルの総計の抽出もサポートします。BinaryLogisticRegressionSummary内にデータフレームとして格納されている予想とマトリックスは、@transient の注釈が付けられ、従ってドライバ上でのみ利用可能なことに注意してください。

LogisticRegressionTrainingSummaryLogisticRegressionModelのための総計を提供します。現在のところ、2値分類のみがサポートされ、総計は明示的に BinaryLogisticRegressionTrainingSummaryにキャストされなければなりません。これは多クラス分類がサポートされた時におそらく変更されるでしょう。

以前の例を続けます:

import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
val trainingSummary = lrModel.summary

// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))

// Obtain the metrics useful to judge performance on test data.
// We cast the summary to a BinaryLogisticRegressionSummary since the problem is a
// binary classification problem.
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = binarySummary.roc
roc.show()
println(binarySummary.areaUnderROC)

// Set the model threshold to maximize F-Measure
val fMeasure = binarySummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
  .select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
例の完全なコードは Sparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala" で見つかります。

LogisticRegressionTrainingSummaryLogisticRegressionModelのための総計を提供します。現在のところ、2値分類のみがサポートされ、総計は明示的に BinaryLogisticRegressionTrainingSummaryにキャストされなければなりません。これは多クラス分類がサポートされた時におそらく変更されるでしょう。

以前の例を続けます:

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
  System.out.println(lossPerIteration);
}

// Obtain the metrics useful to judge performance on test data.
// We cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary
// classification problem.
BinaryLogisticRegressionSummary binarySummary =
  (BinaryLogisticRegressionSummary) trainingSummary;

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
DataFrame roc = binarySummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(binarySummary.areaUnderROC());

// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold.
DataFrame fMeasure = binarySummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
  .select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java" で見つかります。

ロジスティック回帰モデルの総計はPythonではまだサポートされません。

決定木クラス分類

決定木は分類と再帰の方法の人気のある一群です。spark.mlの実装についての詳細は決定木の章で見つけることができます。

以下の例では、LibSVMフォーマットのデータセットをロードし、それをトレーニングとテストセットに分割し、最初のデータセットで学習し、それからテストセットを評価します。データを準備するために2つの機能の変換器を使用します: これらはラベルおよび分類機能のためのカテゴリに印をつけるのに役立ち、メタデータを決定木が理解できる DataFrame に追加します。

パラメータについての詳細はScala API ドキュメントの中で見つけることができます。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and tree in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

// Train model.  This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala" で見つかります。

パラメータについての詳細はJava API ドキュメントの中で見つけることができます。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load the data stored in LIBSVM format as a DataFrame.
DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);

// Automatically identify categorical features, and index them.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[]{0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a DecisionTree model.
DecisionTreeClassifier dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels());

// Chain indexers and tree in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});

// Train model.  This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

DecisionTreeClassificationModel treeModel =
  (DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java" で見つかります。

パラメータについての詳細はPython API ドキュメントの中で見つけることができます。

from pyspark import SparkContext, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/decision_tree_classification_example.py" で見つかります。.

ランダムフォレスト分類

ランダムフォレストは分類と再帰の方法の人気のある一群です。spark.mlの実装についての詳細はランダムフォレストの章で見つけることができます。

以下の例では、LibSVMフォーマットのデータセットをロードし、それをトレーニングとテストセットに分割し、最初のデータセットで学習し、それからテストセットを評価します。データを準備するために2つの機能の変換器を使用します: これらはラベルおよび分類機能のためのカテゴリに印をつけるのに役立ち、メタデータを木に基づいたアルゴリズムが理解できる DataFrame に追加します。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and forest in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

// Train model.  This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println("Learned classification forest model:\n" + rfModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load and parse the data file, converting it to a DataFrame.
DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a RandomForest model.
RandomForestClassifier rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels());

// Chain indexers and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/random_forest_classifier_example.py" で見つかります。

勾配ブースト木分類

勾配ブースト木(GBTs) は決定木のアンサンブルを使った人気のある分類および回帰の方法です。spark.mlの実装についての詳細はGBTの章で見つけることができます。

以下の例では、LibSVMフォーマットのデータセットをロードし、それをトレーニングとテストセットに分割し、最初のデータセットで学習し、それからテストセットを評価します。データを準備するために2つの機能の変換器を使用します: これらはラベルおよび分類機能のためのカテゴリに印をつけるのに役立ち、メタデータを木に基づいたアルゴリズムが理解できる DataFrame に追加します。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and GBT in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

// Train model.  This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println("Learned classification GBT model:\n" + gbtModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load and parse the data file, converting it to a DataFrame.
DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels());

// Chain indexers and GBT in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});

// Train model.  This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("precision");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py" で見つかります。

多層パーセプトロン分類器

多層パーセプトロン分類器(MLPC)は 順伝播型ニューラルネットワーク.に基づいた分類器です。MLPC はノードの複数の層から成ります。各層はネットワーク内の次の層へ完全に接続します。入力層内のノードは入力データを表します。他の全てのノードはノードの重み $\wv$ とバイアス $\bv$ を使って入力の線形結合を実施し活性化機能を適用することで、入力を出力にマップします。以下のように $K+1$ 層を使ったMLPCのためのマトリックス形式で書くことができます: \[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \] 中間層のノードはS字(ロジスティック)巻数を使用します: \[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \] 出力層のノードはソフトマックス関数を使用します: \[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \] 出力層のノード数$N$ はクラスの数に対応します。

MLPC はモデルの学習に誤差逆伝播法を採用します。最適化のためにロジスティックロス、最適化のルーチンとしてL-BFGSを使います。

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = sqlContext.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")
// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)
// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute precision on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("precision")
println("Precision:" + evaluator.evaluate(predictionAndLabels))
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala" で見つかります。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.DataFrame;

// Load training data
String path = "data/mllib/sample_multiclass_classification_data.txt";
DataFrame dataFrame = jsql.read().format("libsvm").load(path);
// Split the data into train and test
DataFrame[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
DataFrame train = splits[0];
DataFrame test = splits[1];
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
int[] layers = new int[] {4, 5, 4, 3};
// create the trainer and set its parameters
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100);
// train the model
MultilayerPerceptronClassificationModel model = trainer.fit(train);
// compute precision on the test set
DataFrame result = model.transform(test);
DataFrame predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("precision");
System.out.println("Precision = " + evaluator.evaluate(predictionAndLabels));
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java" で見つかります。
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = sqlContext.read.format("libsvm")\
    .load("data/mllib/sample_multiclass_classification_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute precision on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="precision")
print("Precision:" + str(evaluator.evaluate(predictionAndLabels)))
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/multilayer_perceptron_classification.py" で見つかります。

One-vs-Rest 分類 (a.k.a. One-vs-All)

OneVsRest is an example of a machine learning reduction for performing multiclass classification given a base classifier that can perform binary classification efficiently. "One-vs-All"としても知られています。

OneVsRestEstimatorとして実装されます。基本分類器として、Classifierのインスタンスを取り、それぞれのkクラスのためのバイナリ分類問題を生成します。クラス i のための分類器は、他の全てのクラスからクラス i を識別するために、ラベルが i あるいはそうでないかを予想するために訓練されます。

各バイナリ分類器を評価することで予想が行われ、最も確信している分類器のインデックスがラベルとして出力されます。

以下の例はIris datasetをどうやってロードするかを説明し、それをデータフレームとしてパースし、OneVsRestを使って多クラス分類を行います。テストエラーはアルゴリズムの正確さを計測するために計算されます。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.examples.mllib.AbstractParams
import org.apache.spark.ml.classification.{OneVsRest, LogisticRegression}
import org.apache.spark.ml.util.MetadataUtils
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.DataFrame

val inputData = sqlContext.read.format("libsvm").load(params.input)
// compute the train/test split: if testInput is not provided use part of input.
val data = params.testInput match {
  case Some(t) => {
    // compute the number of features in the training set.
    val numFeatures = inputData.first().getAs[Vector](1).size
    val testData = sqlContext.read.option("numFeatures", numFeatures.toString)
      .format("libsvm").load(t)
    Array[DataFrame](inputData, testData)
  }
  case None => {
    val f = params.fracTest
    inputData.randomSplit(Array(1 - f, f), seed = 12345)
  }
}
val Array(train, test) = data.map(_.cache())

// instantiate the base classifier
val classifier = new LogisticRegression()
  .setMaxIter(params.maxIter)
  .setTol(params.tol)
  .setFitIntercept(params.fitIntercept)

// Set regParam, elasticNetParam if specified in params
params.regParam.foreach(classifier.setRegParam)
params.elasticNetParam.foreach(classifier.setElasticNetParam)

// instantiate the One Vs Rest Classifier.

val ovr = new OneVsRest()
ovr.setClassifier(classifier)

// train the multiclass model.
val (trainingDuration, ovrModel) = time(ovr.fit(train))

// score the model on test data.
val (predictionDuration, predictions) = time(ovrModel.transform(test))

// evaluate the model
val predictionsAndLabels = predictions.select("prediction", "label")
  .map(row => (row.getDouble(0), row.getDouble(1)))

val metrics = new MulticlassMetrics(predictionsAndLabels)

val confusionMatrix = metrics.confusionMatrix

// compute the false positive rate per label
val predictionColSchema = predictions.schema("prediction")
val numClasses = MetadataUtils.getNumClasses(predictionColSchema).get
val fprs = Range(0, numClasses).map(p => (p, metrics.falsePositiveRate(p.toDouble)))

println(s" Training Time ${trainingDuration} sec\n")

println(s" Prediction Time ${predictionDuration} sec\n")

println(s" Confusion Matrix\n ${confusionMatrix.toString}\n")

println("label\tfpr")

println(fprs.map {case (label, fpr) => label + "\t" + fpr}.mkString("\n"))
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.util.MetadataUtils;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructField;

// configure the base classifier
LogisticRegression classifier = new LogisticRegression()
  .setMaxIter(params.maxIter)
  .setTol(params.tol)
  .setFitIntercept(params.fitIntercept);

if (params.regParam != null) {
  classifier.setRegParam(params.regParam);
}
if (params.elasticNetParam != null) {
  classifier.setElasticNetParam(params.elasticNetParam);
}

// instantiate the One Vs Rest Classifier
OneVsRest ovr = new OneVsRest().setClassifier(classifier);

String input = params.input;
DataFrame inputData = jsql.read().format("libsvm").load(input);
DataFrame train;
DataFrame test;

// compute the train/ test split: if testInput is not provided use part of input
String testInput = params.testInput;
if (testInput != null) {
  train = inputData;
  // compute the number of features in the training set.
  int numFeatures = inputData.first().<Vector>getAs(1).size();
  test = jsql.read().format("libsvm").option("numFeatures",
    String.valueOf(numFeatures)).load(testInput);
} else {
  double f = params.fracTest;
  DataFrame[] tmp = inputData.randomSplit(new double[]{1 - f, f}, 12345);
  train = tmp[0];
  test = tmp[1];
}

// train the multiclass model
OneVsRestModel ovrModel = ovr.fit(train.cache());

// score the model on test data
DataFrame predictions = ovrModel.transform(test.cache())
  .select("prediction", "label");

// obtain metrics
MulticlassMetrics metrics = new MulticlassMetrics(predictions);
StructField predictionColSchema = predictions.schema().apply("prediction");
Integer numClasses = (Integer) MetadataUtils.getNumClasses(predictionColSchema).get();

// compute the false positive rate per label
StringBuilder results = new StringBuilder();
results.append("label\tfpr\n");
for (int label = 0; label < numClasses; label++) {
  results.append(label);
  results.append("\t");
  results.append(metrics.falsePositiveRate((double) label));
  results.append("\n");
}

Matrix confusionMatrix = metrics.confusionMatrix();
// output the Confusion Matrix
System.out.println("Confusion Matrix");
System.out.println(confusionMatrix);
System.out.println();
System.out.println(results);
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java" で見つかります。

回帰

線形回帰

線形回帰モデルおよびモデルの総計と連携するインタフェースはロジスティック回帰の場合と似ています。

以下の例は線形回帰モデルに正規化されたelastic netの訓練と、モデルの総計の統計を抽出を実演します。

import org.apache.spark.ml.regression.LinearRegression

// Load training data
val training = sqlCtx.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala" で見つかります。
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load training data
DataFrame training = sqlContext.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

LinearRegression lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model
LinearRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for linear regression
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// Summarize the model over the training set and print out some metrics
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java" で見つかります。
from pyspark.ml.regression import LinearRegression

# Load training data
training = sqlContext.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/linear_regression_with_elastic_net.py" で見つかります。

決定木回帰

決定木は分類と再帰の方法の人気のある一群です。spark.mlの実装についての詳細は決定木の章で見つけることができます。

以下の例では、LibSVMフォーマットのデータセットをロードし、それをトレーニングとテストセットに分割し、最初のデータセットで学習し、それからテストセットを評価します。分類機能に印をつけるために機能変換を使用します。メタデータを決定木アルゴリズムが理解できる DataFrameに追加します。

パラメータについての詳細はScala API ドキュメントの中で見つけることができます。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.evaluation.RegressionEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Here, we treat features with > 4 distinct values as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and tree in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, dt))

// Train model.  This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)

val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println("Learned regression tree model:\n" + treeModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala" で見つかります。

パラメータについての詳細はJava API ドキュメントの中で見つけることができます。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load the data stored in LIBSVM format as a DataFrame.
DataFrame data = sqlContext.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[]{0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a DecisionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
  .setFeaturesCol("indexedFeatures");

// Chain indexer and tree in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{featureIndexer, dt});

// Train model.  This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

DecisionTreeRegressionModel treeModel =
  (DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java" で見つかります。

パラメータについての詳細はPython API ドキュメントの中で見つけることができます。

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/decision_tree_regression_example.py" で見つかります。

ランダムフォレスト回帰

ランダムフォレストは分類と再帰の方法の人気のある一群です。spark.mlの実装についての詳細はランダムフォレストの章で見つけることができます。

以下の例では、LibSVMフォーマットのデータセットをロードし、それをトレーニングとテストセットに分割し、最初のデータセットで学習し、それからテストセットを評価します。分類機能に印をつけるために機能変換を使用します。メタデータを木ベースアルゴリズムが理解できる DataFrameに追加します。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and forest in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, rf))

// Train model.  This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)

val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println("Learned regression forest model:\n" + rfModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load and parse the data file, converting it to a DataFrame.
DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures");

// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {featureIndexer, rf});

// Train model.  This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/random_forest_regressor_example.py" で見つかります。

勾配ブースト木回帰

勾配ブースト木(GBTs) は決定木のアンサンブルを使った人気のある回帰の方法です。spark.mlの実装についての詳細はGBTの章で見つけることができます。

注意: この例のデータセットについて、GBTRegressor は実際のところは1回の繰り返しのみを必要としますが、これは一般的には正しくありません。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Chain indexer and GBT in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, gbt))

// Train model.  This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)

val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println("Learned regression GBT model:\n" + gbtModel.toDebugString)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

// Load and parse the data file, converting it to a DataFrame.
DataFrame data = sqlContext.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
DataFrame[] splits = data.randomSplit(new double[] {0.7, 0.3});
DataFrame trainingData = splits[0];
DataFrame testData = splits[1];

// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Chain indexer and GBT in a Pipeline
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});

// Train model.  This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
DataFrame predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py" で見つかります。

生存回帰

spark.mlの中で、検閲されたデータのためのパラメータの生存回帰の加速度故障時間 (AFT)モデルを実装します。それは生存時間の記録のためのモデルを説明します。つまり生存解析のためにログ線形モデルをしばしば呼びます。同じ目的の比例ハザードモデルとの違いは、AFTモデルは各インスタンスが独立して目的関数に貢献するため簡単に並行化しやすいことです。

Given the values of the covariates $x^{‘}$, for random lifetime $t_{i}$ of subjects i = 1, …, n, with possible right-censoring, the likelihood function under the AFT model is given as: \[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \] Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$, the log-likelihood function assumes the form: \[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \] Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, and $f_{0}(\epsilon_{i})$ is corresponding density function.

最も一般的に使われるAFTモデルは生存時間のベイブル分布に基づいています。The Weibull distribution for lifetime corresponding to extreme value distribution for log of the lifetime, and the $S_{0}(\epsilon)$ function is: \[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \] the $f_{0}(\epsilon_{i})$ function is: \[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \] The log-likelihood function for AFT model with Weibull distribution of lifetime is: \[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \] Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, the loss function we use to optimize is $-\iota(\beta,\sigma)$. The gradient functions for $\beta$ and $\log\sigma$ respectively are: \[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \] \[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]

The AFT model can be formulated as a convex optimization problem, i.e. the task of finding a minimizer of a convex function $-\iota(\beta,\sigma)$ that depends coefficients vector $\beta$ and the log of scale parameter $\log\sigma$. 実装のための最適化アルゴリズムはL-BFGSです。実装はRの生存関数survregの結果に一致します。

import org.apache.spark.ml.regression.AFTSurvivalRegression
import org.apache.spark.mllib.linalg.Vectors

val training = sqlContext.createDataFrame(Seq(
  (1.218, 1.0, Vectors.dense(1.560, -0.605)),
  (2.949, 0.0, Vectors.dense(0.346, 2.158)),
  (3.627, 0.0, Vectors.dense(1.380, 0.231)),
  (0.273, 1.0, Vectors.dense(0.520, 1.151)),
  (4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles")

val model = aft.fit(training)

// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients} Intercept: " +
  s"${model.intercept} Scale: ${model.scale}")
model.transform(training).show(false)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala" で見つかります。
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.mllib.linalg.*;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
  RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
  RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
  RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
  RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
DataFrame training = jsql.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles");

AFTSurvivalRegressionModel model = aft.fit(training);

// Print the coefficients, intercept and scale parameter for AFT survival regression
System.out.println("Coefficients: " + model.coefficients() + " Intercept: "
  + model.intercept() + " Scale: " + model.scale());
model.transform(training).show(false);
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java" で見つかります。
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.mllib.linalg import Vectors

training = sqlContext.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/aft_survival_regression.py" で見つかります。

決定木

決定木 とその集合は分類と回帰の機械学習タスクのための一般的な方法です。決定木は解釈、カテゴリ機能の処理、多層クラス分類設定が容易で、機能のスケーリングを必要とせず、非線形性および機能の相互作用を捉えることができるため、広く使われています。ランダムフォレストおよびブースティングのような木構成アルゴリズムは分類および回帰タスクのために最高のパフォーマンスを持ちます。

spark.ml実装は、繰り返しおよび分類機能の両方を使って、バイナリと多層クラス分類、および回帰のための決定木をサポートします。実装は行によってデータを分割し、数百万あるいは数十億のインスタンスを使って分散トレーニングをすることができます。

決定木アルゴリズムについてのもっと多くの情報をMLlib 決定木ガイドの中で見つけることができます。このAPIとオリジナル MLlib 決定木 API の主な違いは以下の通りです:

決定木のためのパイプラインAPIは元のAPIよりももう少し多くの機能を提供します。分類のために、特にユーザは各クラスの予想確率を取得することができます(別名、条件付分類確率)。

アンサンブル木(ランダムフォレストおよびGradient-Boosted 木)は 木のアンサンブルの章の中で説明されます。

入力と出力

入力と出力(予想)カラムタイプをリスト化しました。全ての出力カラムは任意です; 出力カラムを除外するには対応するパラメータを空の文字列に設定します。

入力のカラム

パラメータ名 タイプ デフォルト 解説
labelCol Double "label" 予想のラベル
featuresCol Vector "features" 特徴ベクトル

出力のカラム

パラメータ名 タイプ デフォルト 解説 備考
predictionCol Double "prediction" Predicted label
rawPredictionCol Vector "rawPrediction" 長さのベクトル # クラス。予想を構成する木のノードのトレーニングインスタンスのラベルの数を持ちます。 分類化のみ
probabilityCol Vector "probability" Vector of length # classes equal to rawPrediction normalized to a multinomial distribution 分類化のみ

ツリーアンサンブル

データフレームは2つの主要なツリーアンサンブルアルゴリズムをサポートします: ランダムフォレストGradient-Boosted Trees (GBTs)。両方とも基本モデルとして spark.ml 決定木を使用します。

アンサンブルアルゴリズムについてのもっと多くの情報をMLlib アンサンブルガイドで見つけることができます。
この章では、アンサンブルのためのデータフレームを実演します。

このAPIとオリジナル MLlib アンサンブルAPI の主な違いは以下の通りです:

ランダムフォレスト

ランダムフォレスト決定木のアンサンブルです。ランダムフォレストはオーバーフィルタリングの危険を減らすために多くの決定木を組み合わせます。 risk of overfitting. spark.ml実装は、繰り返しおよび分類機能の両方を使って、バイナリと多層クラス分類、および回帰のためのランダムフォレストをサポートします。

アルゴリズムそのものについてのもっと詳しい情報は、ランダムフォレストのspark.mllibドキュメントを見てください。

入力と出力

入力と出力(予想)カラムタイプをリスト化しました。全ての出力カラムは任意です; 出力カラムを除外するには対応するパラメータを空の文字列に設定します。

入力のカラム

パラメータ名 タイプ デフォルト 解説
labelCol Double "label" 予想のラベル
featuresCol Vector "features" 特徴ベクトル

出力カラム (予想)

パラメータ名 タイプ デフォルト 解説 備考
predictionCol Double "prediction" Predicted label
rawPredictionCol Vector "rawPrediction" 長さのベクトル # クラス。予想を構成する木のノードのトレーニングインスタンスのラベルの数を持ちます。 分類化のみ
probabilityCol Vector "probability" Vector of length # classes equal to rawPrediction normalized to a multinomial distribution 分類化のみ

勾配ブースト木 (GBT)

Gradient-Boosted Trees (GBTs)決定木のアンサンブルです。GBT は損失関数を最小化するために決定木を繰り返し学習します。spark.ml実装は、繰り返しおよび分類機能の両方を使って、バイナリと多層クラス分類、および回帰のためのGBTをサポートします。

アルゴリズムそのものについてのもっと詳しい情報は、GBTのspark.mllibドキュメントを見てください。

入力と出力

入力と出力(予想)カラムタイプをリスト化しました。全ての出力カラムは任意です; 出力カラムを除外するには対応するパラメータを空の文字列に設定します。

入力のカラム

パラメータ名 タイプ デフォルト 解説
labelCol Double "label" 予想のラベル
featuresCol Vector "features" 特徴ベクトル

GBTClassifierは現在のところバイナリラベルのみサポートすることに注意してください。

出力カラム (予想)

パラメータ名 タイプ デフォルト 解説 備考
predictionCol Double "prediction" Predicted label

RandomForestClassifierがそうであったように、GBTClassifier は将来rawPrediction および probabilityのための出力カラムも出力するでしょう。

TOP
inserted by FC2 system