決定木 - RDDベースのAPI
決定木 とその集合は分類と回帰の機械学習タスクのための一般的な方法です。決定木は解釈、カテゴリ機能の処理、多層クラス分類設定が容易で、機能のスケーリングを必要とせず、非線形性および機能の相互作用を捉えることができるため、広く使われています。ランダムフォレストおよびブースティングのような木構成アルゴリズムは分類および回帰タスクのために最高のパフォーマンスを持ちます。
spark.mllib
は、繰り返しおよび分類機能の両方を使って、バイナリと多層クラス分類、および回帰のための決定木をサポートします。実装は行によってデータを分割し、数百万のインスタンスを使って分散トレーニングをすることができます。
アンサンブル木(ランダムフォレストおよび勾配ブースト木)は アンサンブルガイドの中で説明されます。
基本アルゴリズム
決定木は特徴空間の回帰2値分解を実施する貪欲なアルゴリズムです。木は一番底(葉)の各分割のための同じラベルを予想します。各分割は木のノードでの情報利得を最大にするために、可能な分割のセットから 最良の分割を選択することで貪欲に選択されます。別の言い方をすると、各ツリーノードで選択された分割は $\underset{s}{\operatorname{argmax}} IG(D,s)$
のセットから選択され、$IG(D,s)$
は分割$s$
がデータセット $D$
に適用された場合の情報利得です。
ノードの不純度と情報利得
ノードの不純度はノードでのラベルの均一性の指標です。現在の実装は分類のための2つの不純度の指標(ジニ不純度とエントロピー)と、回帰のための1つの不純度(分散)を提供します。
不純度 | タスク | 公式 | 解説 |
---|---|---|---|
ジニ不純度 | 分類 | $\sum_{i=1}^{C} f_i(1-f_i)$ | $f_i$ はノードでのラベル $i$ の頻度で、$C$ はユニークなラベルの数です。 |
エントロピー | 分類 | $\sum_{i=1}^{C} -f_ilog(f_i)$ | $f_i$ はノードでのラベル $i$ の頻度で、$C$ はユニークなラベルの数です。 |
分散 | 回帰 | $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$ | $y_i$ はインスタンスのためのラベル、$N$ はインスタンスの数、$\mu$ は$\frac{1}{N} \sum_{i=1}^N y_i$ によって与えられる平均です。 |
情報利得 は親ノードの不純度と2つの子ノードの不純度の重み付け合計との間の違いです。分割 $s$ が サイズ$N$
のデータセット$D$
を、サイズがそれぞれ$N_{left}$
と $N_{right}$
の2つのデータセット $D_{left}$
と $D_{right}$
に分割すると仮定します。情報利得は以下の通りです:
$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$
分割候補
連続特徴量
1つのマシーンでの実行での小さなデータセットについては、各連続特徴量のための分割候補は一般的に特徴のためのユニークな値です。幾つかの実装では特徴値をソートし、もっと高速なツリー計算のために順番に並んだ特長値を分割候補として使用します。
大きな分散型データセットに対しては特徴値のソートは高くつきます。この実装はデータの抽出された部分への定量的な計算を実施することで分割候補の近似セットを計算します。並べられた分割は"bins"を生成し、そのようなbinsの最大数はmaxBins
パラメータを使って指定することができます。
binsの数はインスタンスの数 $N$
より大きくできないことに注意してください (デフォルトのmaxBins
値は32のため稀な場合)。条件が満たされない場合は、ツリーアルゴリズムは自動的にbinsの数を減少します。
分類特徴
For a categorical feature with $M$
possible values (categories), one could come up with $2^{M-1}-1$
split candidates. 2値(0/1)分類と回帰については、平均ラベルによってカテゴリ特徴量を並べることで、分割候補の数を$M-1$
に減らすことができます。(詳細については統計的機械学習の要素の9.2.4章を見てください。) 例えば、ラベル1に対して0.2、0.6 および0.4の3つの分類A,BおよびCを持つ二値分類問題について、分類特徴はA、C、Bとして整列されます。二つの分割分類候補は | が分割を示す場合に、A | C, B と A , C | B になります。
多クラス分類においては、可能であれば全ての$2^{M-1}-1$
の取り得る分割が使われます。$2^{M-1}-1$
が maxBins
パラメータより大きい場合は、二値分類と回帰のために使われる似たような(発見的)メソッドを使います。$M$
カテゴリ特徴値は不純度によって並べられ、$M-1$
分割の結果が考慮されます。
停止ルール
以下の条件が合致した場合に、再帰ツリーの構築はノード上で停止されます。
- ノードの深さは
maxDepth
訓練パラメータと等しいです。 - 分割候補は結局
minInfoGain
より大きい情報利得となることはありません。 - 分割候補は少なくとも
minInstancesPerNode
訓練インスタンスを持つ子ノードを生成することはありません。
使い方のtips
様々なパラメータを議論することで、決定木を使うための2、3のガイドラインを含みます。以下でパラメータは大まかに重要度の降順にリスト化されます。新しいユーザは"問題仕様パラメータ"の章とmaxDepth
パラメータを主に考慮する必要があります。
問題仕様パラメメータ
これらのパラメータは解決したい問題とデータセットを説明します。それらは指定されなければならず、チューニングを必要としません。
-
algo
: 決定木の種類。Classification
あるいはRegression
のどちらか。 -
numClasses
: クラスの数 (Classification
のみ)。 -
categoricalFeaturesInfo
: どの特徴が絶対的か、それらの特徴がどれだけの分類値を取ることをできるかを指定します。これは特徴インデックスから特徴項数(カテゴリの数)へのマップとして与えられます。このマップに含まれない特徴は連続しているものとして扱われます。- 例えば、
Map(0 -> 2, 4 -> 10)
は特徴0
が2値 (値0
または1
を取る)、そして特徴4
は10の分類 (値は{0, 1, ..., 9}
)を持つことを示します。特徴インデックスは0から始まることに注意してください: 特徴0
と4
はインスタンスの特徴ベクトルの1番目と5番目です。 categoricalFeaturesInfo
を指定してはならないことに注意してください。アルゴリズムはまだ実行中で意味のある結果を取得するでしょう。しかし、もしカテゴリ特徴が適切に設計されていれば、パフォーマンスはもっとよくなるはずです。
- 例えば、
停止条件
これらのパラメータはツリーがいつ構築(新しいノードを追加)を停止するかを決定します これらのパラメータを調整する場合、オーバーフィッティングを避けるために提出されたデータでの検証に注意してください。
-
maxDepth
: ツリーの最大の深さ木が深くなるほど表現に富みます(高精度になるかも知れない)が、訓練が高くつき、オーバーフィットしがちにもなります。 -
minInstancesPerNode
: ノードがさらに分割されるためには、各ノードの子は少なくともこの数の訓練インスタンスを受け入れなければなりません。それらはしばしば個々の木よりも深く学習されるため、これはRandomForestで一般的に使われます。 -
minInfoGain
: ノードがさらに分割されるためには、分割は少なくとも(情報利得の点で)これだけは改善されなければなりません。
調整可能なパラメータ
これらのパラメータは調整されるかもしれません。オーバーフィッティングを避けるために調整する場合は、提出されたデータでの検証に注意してください。
maxBins
: 連続する特徴を分離する場合に使われるbinsの数。maxBins
を増やすと、アルゴリズムがもっと多くの分割候補を考慮し、fine-grained分割決定をすることができます。しかし、計算量と通信量も増えます。maxBins
パラメータは全てのカテゴリ特徴量について少なくとも分類$M$
の最大数でなければならないことに注意してください。
maxMemoryInMB
: 十分な統計を集めるために使われるメモリ量。- デフォルト値はほとんどのシナリオで決定木が動作できるように控えめに256MBが選択されています。
maxMemoryInMB
の増加は(もしメモリが利用可能であれば)データ上の通過を少なくし高速な訓練につながります。しかし、各繰り返しでの通信量はmaxMemoryInMB
に比例するかも知れないので、maxMemoryInMB
が大きくなるに従って見返りが少なくなります。 - 実装の詳細: 高速な処理のために、決定木のアルゴリズムは(一度に1ノードではなく)分割するノードのグループについての統計を集めます。一つのグループで扱うことができるノードの数はメモリの要求(特徴ごとに変わります)によって決定されます。
maxMemoryInMB
パラメータはそれらの統計のために各workerが使うことができるメモリの制限をメガバイトで指定します。
- デフォルト値はほとんどのシナリオで決定木が動作できるように控えめに256MBが選択されています。
-
subsamplingRate
: 決定木を学習するために使われる訓練データの断片。このパラメータは(ランダムフォレスト
および勾配ブースト木
を使った)ツリーのアンサンブルの訓練に最も関連があり、元のデータの副標本を採るのに便利かも知れません。訓練インスタンスの数は一般的に主要な制限ではないため、1つの決定木を訓練するためにはこのパラメータは有用ではありません。 impurity
: 分割候補間で選択のために使われる不純度指標 (上で議論されました)。この指標はalgo
パラメータに一致しなければなりません。
キャッシュとチェックポイント
MLlib 1.2 はもっと大きな(深い)ツリーへのスケーリングアップとツリーのアンサンブルのための幾つかの機能が追加されました。maxDepth
が大きく設定される場合、ノードIDのキャッシングとチェックポイントを有効にすると便利かも知れません。これらのパラメータはnumTrees
が大きく設定された場合、ランダムフォレストにとっても便利です。
useNodeIdCache
: もしこれがtrueに設定されると、アルゴリズムは各繰り返しで現在のモデル(ツリーあるいは複数のツリー)がexecutorに渡されることを防ぐでしょう。- これは深い木(workerでの計算のスピードアップ)や大きなランダムフォレスト(各繰り返しでの通信の削減)にとって便利かも知れません。
- 実装の詳細: デフォルトでは、アルゴリズムは現在のモデルをexecutorに通信するため、executorは訓練インスタンスをツリーノードと合致することができます。この設定が有効にされた場合、アルゴリズムはその代わりにこの情報をキャッシュするでしょう。
ノードIDのキャッシュは(1回の繰り返しごとに)RDDの系列を生成します。この長い系統はパフォーマンスの問題を引き起こしえますが、中間RDDのチェックポイントがそれらの問題を軽減します。チェックポイントは useNodeIdCache
がtrueに設定された場合にのみ適用可能なことに注意してください。
-
checkpointDir
: ノードIDのキャッシュRDDのチェックポイントのためのディレクトリ。 -
checkpointInterval
: ノードIDのキャッシュRDDのチェックポイントの頻度。これをあまりに低く設定するとHDFSへの書き込みからくる余分なオーバーヘッドを起こすでしょう: これをあまりに高く設定すると、もしexecutorが失敗しRDDが再計算されなければならない場合に問題を起こすかも知れません。
スケーリング
計算は、訓練インスタンスの数、特徴の数、そしてmaxBins
パラメータの数におよそ線形に比例します。計算は特徴の数とmaxBins
におよそ線形に比例します。
実装されたアルゴリズムはsparseとdenseデータの両方を読みます。しかし、sparse入力については最適化されていません。
例
分類
以下の例はLIBSVM データファイルをどうやってロードするかを説明し、LabeledPoint
のRDDとしてパースし、Gini不純度指標と5の最大の木の深さとしてGini不純度を持つ決定木を使って分類を行います。テストエラーはアルゴリズムの正確さを計測するために計算されます。
APIの詳細はDicisionTree
Scala ドキュメント および DecisionTreeModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
APIの詳細はDecision Tree
Java ドキュメント および DecisionTreeModel
Java ドキュメント を参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Integer numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
Integer maxDepth = 5;
Integer maxBins = 32;
// Train a DecisionTree model for classification.
final DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override
public Tuple2<Double, Double> call(LabeledPoint p) {
return new Tuple2<>(model.predict(p.features()), p.label());
}
});
Double testErr =
1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
@Override
public Boolean call(Tuple2<Double, Double> pl) {
return !pl._1().equals(pl._2());
}
}).count() / testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
APIについての詳細はDecisionTree
Python ドキュメント およびDecisionTreeModel
Python ドキュメントを参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
回帰
以下の例はLIBSVM データファイルをどうやってロードするかを説明し、LabeledPoint
のRDDとしてパースし、不純度指標と5の最大の木の深さとして分散を持つ決定木を使って回帰を行います。平均二乗誤差(MSE)は最後に 適合度を評価するために計算されます。
APIの詳細はDicisionTree
Scala ドキュメント および DecisionTreeModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println("Test Mean Squared Error = " + testMSE)
println("Learned regression tree model:\n" + model.toDebugString)
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
APIの詳細はDecision Tree
Java ドキュメント および DecisionTreeModel
Java ドキュメント を参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
Integer maxDepth = 5;
Integer maxBins = 32;
// Train a DecisionTree model.
final DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
@Override
public Tuple2<Double, Double> call(LabeledPoint p) {
return new Tuple2<>(model.predict(p.features()), p.label());
}
});
Double testMSE =
predictionAndLabel.map(new Function<Tuple2<Double, Double>, Double>() {
@Override
public Double call(Tuple2<Double, Double> pl) {
Double diff = pl._1() - pl._2();
return diff * diff;
}
}).reduce(new Function2<Double, Double, Double>() {
@Override
public Double call(Double a, Double b) {
return a + b;
}
}) / data.count();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
APIについての詳細はDecisionTree
Python ドキュメント およびDecisionTreeModel
Python ドキュメントを参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")