Decision Trees - spark.mllib

決定木 とその集合は分類と回帰の機械学習タスクのための一般的な方法です。決定木は解釈、カテゴリ機能の処理、多層クラス分類設定が容易で、機能のスケーリングを必要とせず、非線形性および機能の相互作用を捉えることができるため、広く使われています。ランダムフォレストおよびブースティングのような木構成アルゴリズムは分類および回帰タスクのために最高のパフォーマンスを持ちます。

spark.mllibは、繰り返しおよび分類機能の両方を使って、バイナリと多層クラス分類、および回帰のための決定木をサポートします。実装は行によってデータを分割し、数百万のインスタンスを使って分散トレーニングをすることができます。

アンサンブル木(ランダムフォレストおよびGradient-Boosted 木)は アンサンブルガイドの中で説明されます。

基本アルゴリズム

決定木は特徴空間の回帰2値分解を実施する貪欲なアルゴリズムです。木は一番底(葉)の各分割のための同じラベルを予想します。各分割は木のノードでの情報利得を最大にするために、可能な分割のセットから 最良の分割を選択することで貪欲に選択されます。別の言い方をすると、各ツリーノードで選択された分割は $\underset{s}{\operatorname{argmax}} IG(D,s)$のセットから選択され、$IG(D,s)$は分割$s$ がデータセット $D$に適用された場合の情報利得です。

ノードの不純度と情報利得

ノードの不純度はノードでのラベルの均一性の指標です。現在の実装は分類のための2つの不純度の指標(ジニ不純度とエントロピー)と、回帰のための1つの不純度(分散)を提供します。

不純度タスク公式解説
ジニ不純度 分類 $\sum_{i=1}^{C} f_i(1-f_i)$$f_i$ はノードでのラベル $i$ の頻度で、$C$ はユニークなラベルの数です。
エントロピー 分類 $\sum_{i=1}^{C} -f_ilog(f_i)$$f_i$ はノードでのラベル $i$ の頻度で、$C$ はユニークなラベルの数です。
分散 回帰 $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$$y_i$ はインスタンスのためのラベル、$N$ はインスタンスの数、$\mu$ は$\frac{1}{N} \sum_{i=1}^N y_i$ によって与えられる平均です。

情報利得 は親ノードの不純度と2つの子ノードの不純度の重み付け合計との間の違いです。分割 $s$ が サイズ$N$のデータセット$D$を、サイズがそれぞれ$N_{left}$$N_{right}$の2つのデータセット $D_{left}$$D_{right}$に分割すると仮定します。情報利得は以下の通りです:

$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$

分割候補

連続特徴量

1つのマシーンでの実行での小さなデータセットについては、各連続特徴量のための分割候補は一般的に特徴のためのユニークな値です。幾つかの実装では特徴値をソートし、もっと高速なツリー計算のために順番に並んだ特長値を分割候補として使用します。

大きな分散型データセットに対しては特徴値のソートは高くつきます。この実装はデータの抽出された部分への定量的な計算を実施することで分割候補の近似セットを計算します。並べられた分割は"bins"を生成し、そのようなbinsの最大数はmaxBinsパラメータを使って指定することができます。

binsの数はインスタンスの数 $N$より大きくできないことに注意してください (デフォルトのmaxBins 値は32のため稀な場合)。条件が満たされない場合は、ツリーアルゴリズムは自動的にbinsの数を減少します。

分類特徴

For a categorical feature with $M$ possible values (categories), one could come up with $2^{M-1}-1$ split candidates. 2値(0/1)分類と回帰については、平均ラベルによってカテゴリ特徴量を並べることで、分割候補の数を$M-1$に減らすことができます。(詳細については統計的機械学習の要素の9.2.4章を見てください。) For example, for a binary classification problem with one categorical feature with three categories A, B and C whose corresponding proportions of label 1 are 0.2, 0.6 and 0.4, the categorical features are ordered as A, C, B. The two split candidates are A | C, B and A , C | B where | denotes the split.

多クラス分類においては、可能であれば全ての$2^{M-1}-1$の取り得る分割が使われます。$2^{M-1}-1$maxBins パラメータより大きい場合は、二値分類と回帰のために使われる似たような(発見的)メソッドを使います。$M$ カテゴリ特徴値は不純度によって並べられ、$M-1$ 分割の結果が考慮されます。

停止ルール

以下の条件が合致した場合に、再帰ツリーの構築はノード上で停止されます。

  1. ノードの深さはmaxDepth 訓練パラメータと等しいです。
  2. 分割候補は結局minInfoGainより大きい情報利得となることはありません。
  3. 分割候補は少なくとも minInstancesPerNode 訓練インスタンスを持つ子ノードを生成することはありません。

使い方のtips

様々なパラメータを議論することで、決定木を使うための2、3のガイドラインを含みます。以下でパラメータは大まかに重要度の降順にリスト化されます。新しいユーザは"問題仕様パラメータ"の章とmaxDepthパラメータを主に考慮する必要があります。

問題仕様パラメメータ

これらのパラメータは解決したい問題とデータセットを説明します。それらは指定されなければならず、チューニングを必要としません。

停止条件

これらのパラメータはツリーがいつ構築(新しいノードを追加)を停止するかを決定します これらのパラメータを調整する場合、オーバーフィッティングを避けるために提出されたデータでの検証に注意してください。

調整可能なパラメータ

これらのパラメータは調整されるかもしれません。オーバーフィッティングを避けるために調整する場合は、提出されたデータでの検証に注意してください。

キャッシュとチェックポイント

MLlib 1.2 はもっと大きな(深い)ツリーへのスケーリングアップとツリーのアンサンブルのための幾つかの機能が追加されました。maxDepthが大きく設定される場合、ノードIDのキャッシングとチェックポイントを有効にすると便利かも知れません。これらのパラメータはnumTreesが大きく設定された場合、ランダムフォレストにとっても便利です。

ノードIDのキャッシュは(1回の繰り返しごとに)RDDの系列を生成します。この長い系統はパフォーマンスの問題を引き起こしえますが、中間RDDのチェックポイントがそれらの問題を軽減します。チェックポイントは useNodeIdCache がtrueに設定された場合にのみ適用可能なことに注意してください。

スケーリング

計算は、訓練インスタンスの数、特徴の数、そしてmaxBinsパラメータの数におよそ線形に比例します。計算は特徴の数とmaxBinsにおよそ線形に比例します。

実装されたアルゴリズムはsparseとdenseデータの両方を読みます。しかし、sparse入力については最適化されていません。

分類

以下の例はLIBSVM データファイルをどうやってロードするかを説明し、LabeledPointのRDDとしてパースし、Gini不純度指標と5の最大の木の深さとしてGini不純度を持つ決定木を使って分類を行います。テストエラーはアルゴリズムの正確さを計測するために計算されます。

APIの詳細はDicisionTree Scala ドキュメント および DecisionTreeModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeClassificationExample.scala" で見つかります。

APIの詳細はDecision Tree Java ドキュメント および DecisionTreeModel Java ドキュメント を参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
Integer numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
String impurity = "gini";
Integer maxDepth = 5;
Integer maxBins = 32;

// Train a DecisionTree model for classification.
final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
    @Override
    public Tuple2<Double, Double> call(LabeledPoint p) {
      return new Tuple2<Double, Double>(model.predict(p.features()), p.label());
    }
  });
Double testErr =
  1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
    @Override
    public Boolean call(Tuple2<Double, Double> pl) {
      return !pl._1().equals(pl._2());
    }
  }).count() / testData.count();

System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java" で見つかります。

APIについての詳細はDecisionTree Python ドキュメント およびDecisionTreeModel Python ドキュメントを参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/decision_tree_classification_example.py" で見つかります。

回帰

以下の例はLIBSVM データファイルをどうやってロードするかを説明し、LabeledPointのRDDとしてパースし、不純度指標と5の最大の木の深さとして分散を持つ決定木を使って回帰を行います。平均二乗誤差(MSE)は最後に 適合度を評価するために計算されます。

APIの詳細はDicisionTree Scala ドキュメント および DecisionTreeModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
  maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println("Test Mean Squared Error = " + testMSE)
println("Learned regression tree model:\n" + model.toDebugString)

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRegressionExample.scala" で見つかります。

APIの詳細はDecision Tree Java ドキュメント および DecisionTreeModel Java ドキュメント を参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<Integer, Integer>();
String impurity = "variance";
Integer maxDepth = 5;
Integer maxBins = 32;

// Train a DecisionTree model.
final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
  @Override
  public Tuple2<Double, Double> call(LabeledPoint p) {
    return new Tuple2<Double, Double>(model.predict(p.features()), p.label());
  }
});
Double testMSE =
  predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
    @Override
    public Double call(Tuple2<Double, Double> pl) {
      Double diff = pl._1() - pl._2();
      return diff * diff;
    }
  }).reduce(new Function2<Double, Double, Double>() {
    @Override
    public Double call(Double a, Double b) {
      return a + b;
    }
  }) / data.count();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java" で見つかります。

APIについての詳細はDecisionTree Python ドキュメント およびDecisionTreeModel Python ドキュメントを参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() /\
    float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/decision_tree_regression_example.py" i で見つかります。
TOP
inserted by FC2 system