線形法 - RDDベースのAPI

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学的な公式

多くの標準的な機械学習 メソッドが凸状の最適化問題に定式化することができます。つまり、変数ベクトル$\wv$(コード中ではweightsと呼ばれます)に依存する凸関数$f$を最小化するものを見つける作業です。これは $d$個のエントリを持ちます。形式上は、これを最適化問題$\min_{\wv \in\R^d} \; f(\wv)$として書くことができ、目的の関数は\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ の形式です。\end{equation} ここでベクトル $\x_i\in\R^d$ は訓練データの例で、$1\le i\le n$$y_i\in\R$はそれらに対応するラベルで、それらは予測したいものです。もし $L(\wv; \x, y)$ が $\wv^T x$ と $y$ の関数で表現できる場合、線形 と呼びます。いくつかのspark.mllibの分類と回帰アルゴリズムはこの分類に当てはまり、ここで議論されます。

目的関数 $f$ は2つの部分からなります: モデルの複雑さを制御するregularizerと、訓練データ上のモデルのエラーを計測するloss です。損失関数 $L(\wv;.)$ は一般的に $\wv$ の凸状関数です。固定された正規化パラメータ $\lambda \ge 0$ (コード中のregParam) は、損失(例えば訓練エラー)のマイニングとモデルの複雑さ(つまりオーバーフィッティングを防ぐため)のマイニングを定義します。

損失関数

以下の表は、損失関数とspark.mllibがサポートするメソッドのそれらの勾配あるいは副勾配の概要です:

loss function $L(\wv; \x, y)$勾配あるいは副勾配
損失の条件$\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ $\begin{cases}-y \cdot \x & \text{if $y \wv^T \x <1$}, \\ 0 & \text{otherwise}.\end{cases}$
ロジスティック損失$\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$
二乗損失$\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ $(\wv^T \x - y) \cdot \x$

正規器

regularizer の目的は簡単なメソッドを促進することとオーバーフィッティングを避けることです。spark.mllib内の以下のregularizerをサポートします:

regularizer $R(\wv)$勾配あるいは副勾配
zero (unregularized)0$\0$
L2$\frac{1}{2}\|\wv\|_2^2$$\wv$
L1$\|\wv\|_1$$\mathrm{sign}(\wv)$
エラスティック ネット$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$

ここで、$\mathrm{sign}(\wv)$$\wv$の全てのエントリの符号からなるベクトル ($\pm1$) です。

滑らかさの点で、L2-正規化問題は一般的にL1-正規化問題を解くよりも簡単です。しかし、L1正規化は小ささおよび解明し易いモデルに繋がる重み付けでのまばらさを促進することができ、その後者は特徴の選択に役に立つかも知れません。エラスティック ネット はL1およびL2正規化の組み合わせです。訓練例の数が少ない場合は特に、正規化をせずにモデルを訓練することはお勧めしません。

最適化

裏では線形法は目的関数を最適化するための凸状最適化メソッドを使用します。spark.mllib最適化の章で説明される2つのメソッド、SGDとL-BFGSを使用します。正確には、ほとんどのアルゴリズムAPIは確率論的勾配効果(SGD) をサポートし、わずかがL-BFGSをサポートします。最適化メソッドを選択するためのガイドラインについては、この最適化の章を参照してください。

分類

分類 は項目をカテゴリに分割することが目的です。ほとんどの一般的な分類の種類は二分類で、通常は正と負の2つのカテゴリがあります。2つ以上のカテゴリがある場合は、他クラス分類と呼ばれます。spark.mllib は分類のために2つの線形メソッドをサポートします: 線形サポートベクトルマシーン(SVMs) と ロジスティック回帰。線形SVMは二分類のみをサポートしますが、ロジスティック回帰は二値と他クラス分類問題をサポートします。両方のメソッドのために、spark.mllibはL1およびL2正規化分散をサポートします。訓練データセットはMLlibではLabeledPointのRDDによって表現され、ラベルはゼロから始まる分類インデックスです: $0, 1, 2, \ldots$. このガイド内の数学的な公式では、二値ラベル $y$は $+1$ (正) または $-1$ (負)として印を付けられ、公式化するのに便利になっています。しかし、多クラスラベルのためにspark.mllibでは負のラベルは $-1$の代わりに$0$で表現されます。

線形サポートベクターマシーン(SVMs)

線形 SVM は大規模分類タスクのための標準的な方法です。等式$\eqref{eq:regPrimal}$で表される線形メソッドで条件損失によって与えられる公式の中での損失関数を持ちます。

\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \] デフォルトでは、線形SVMはL2正規化を使って訓練されます。代替のL1正規化もサポートします。この場合、問題は線形プログラムになります。

線形SVMアルゴリズムはSVMモデルを出力します。$\x$によって新しいデータポイントを与えられると、モデルは $\wv^T \x$ の値に基づいて予想を生成します。デフォルトでは、もし $\wv^T \x \geq 0$ の場合出力は正で、そうでなければ負です。

以下のコードの断片は標本データセットをどうやってロードするかを説明し、アルゴリズムオブジェクトの静的メソッドを使ってこの訓練データに訓練アルゴリズムを実行し、訓練エラーを計算するために結果のモデルを使って予想を行います。

APIの詳細はSVMWithSGD Scala ドキュメント および SVMModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)

// Save and load model
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/SVMWithSGDExample.scala" で見つかります。

SVMWithSGD.train() メソッドはデフォルトで正規化パラメータを1.0に設定しL2正規化を実施します。このアルゴリズムを設定したい場合は、新しいオブジェクトを直接生成しsetterメソッドを呼び出すことで更にSVMWithSGD をカスタマイズすることができます。全ての他のspark.mllibアルゴリズムは同様にこのやり方でカスタマイズをサポートします。例えば、以下のコードは正規化パラメータを0.1に設定してSVMの正規化分散を生成します。そして訓練アルゴリズムを200回繰り返し実行します。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)

全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDDオブジェクト上で.rdd() を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:

APIの詳細はSVMWithSGD Java ドキュメント および SVMModel Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);

// Run training algorithm to build the model.
int numIterations = 100;
final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
  new Function<LabeledPoint, Tuple2<Object, Object>>() {
    public Tuple2<Object, Object> call(LabeledPoint p) {
      Double score = model.predict(p.features());
      return new Tuple2<Object, Object>(score, p.label());
    }
  }
);

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();

System.out.println("Area under ROC = " + auROC);

// Save and load model
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java" で見つかります。

SVMWithSGD.train() メソッドはデフォルトで正規化パラメータを1.0に設定しL2正規化を実施します。このアルゴリズムを設定したい場合は、新しいオブジェクトを直接生成しsetterメソッドを呼び出すことで更にSVMWithSGD をカスタマイズすることができます。全ての他のspark.mllibアルゴリズムは同様にこのやり方でカスタマイズをサポートします。例えば、以下のコードは正規化パラメータを0.1に設定してSVMの正規化分散を生成します。そして訓練アルゴリズムを200回繰り返し実行します。

import org.apache.spark.mllib.optimization.L1Updater;

SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater());
final SVMModel modelL1 = svmAlg.run(training.rdd());

上のアプリケーションを実行するためには、Sparkクイックガイドの自己内包型アプリケーション の章で提供される説明に従ってください。依存性としてビルドファイルにspark-mllibも含めるようにしてください。

以下の例は標本データセットをどうやってロードするかを示し、SVMモデルを構築し、訓練エラーを計算するために結果のモデルを使って予測を行います。

APIについての詳細はSVMWithSGD Python ドキュメント およびSVMModel Python ドキュメントを参照してください。

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/svm_with_sgd_example.py" で見つかります。

ロジスティック回帰

ロジスティック回帰 は二つの応答を予想するために広く使われています。等式$\eqref{eq:regPrimal}$で表される線形メソッドでロジスティック損失によって与えられる公式の中での損失関数を持ちます: \[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x))\]

二値クラス問題について、アルゴリズムは二値ロジスティック回帰モデルを出力します。$\x$ によって新しいデータポイントが与えられると、モデルは$z = \wv^T \x$ であるロジスティック関数 \[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \] を適用することで予想を行います。デフォルトでは、もし $\mathrm{f}(\wv^T x) > 0.5$ であれば出力は正になり、そうでなければ負ですが、線形SVMと異なり、ロジックテック回帰モデルの生の出力、 $\mathrm{f}(z)$ は確率的な解釈を持ちます(つまり、 $\x$ が正である確率)。

二値ロジスティック回帰は、多クラス分類問題を訓練および予想するために、多項ロジスティック回帰 に一般化することができます。例えば、$K$ 個の可能な結果について、結果のうちの一つが"pivot"として選択される可能性があり、他の $K - 1$ 個の結果がpivotの結果に対して別々に逆行するかも知れません。spark.mllibでは、最初のクラス$0$は"pivot"クラスとして選択されます。参照として統計的な学習の要素の章4.4を見てください。詳細な数学的派生があります。

多クラス分類問題について、アルゴリズムは多項ロジスティック回帰モデルを出力するでしょう。これは最初のクラスに対して回帰された $K -1$ 二値ロジスティック回帰モデルを含みます。新しいデータポイントが与えられた場合、$K - 1$モデルは実行され、最も可能性が高いクラスは予想されたクラスとして選択されるでしょう。

ロジスティック回帰を解決するための2つのアルゴリズムを実装しました: ミニバッチ勾配降下と と L-BFGS です。高速な収束のためには、ミニバッチ勾配法よりもL-BFGSをお勧めします。

以下のコードは標本多クラスデータセットをどうやってロードするかを説明し、それを訓練およびテストに分割し、ロジスティック回帰モデルに適合するために LogisticRegressionWithLBFGS を使う方法を説明します。そして、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細はLogisticRegressionWithLBFGS Scala ドキュメント および LogisticRegressionModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training)

// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"Accuracy = $accuracy")

// Save and load model
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/scalaLogisticRegressionWithLBFGSModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/LogisticRegressionWithLBFGSExample.scala" で見つかります。

以下のコードは標本多クラスデータセットをどうやってロードするかを説明し、それを訓練およびテストに分割し、ロジスティック回帰モデルに適合するために LogisticRegressionWithLBFGS を使う方法を説明します。そして、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細はLogisticRegressionWithLBFGS Java ドキュメント および LogisticRegressionModel Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];

// Run training algorithm to build the model.
final LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training.rdd());

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
  new Function<LabeledPoint, Tuple2<Object, Object>>() {
    public Tuple2<Object, Object> call(LabeledPoint p) {
      Double prediction = model.predict(p.features());
      return new Tuple2<Object, Object>(prediction, p.label());
    }
  }
);

// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Accuracy = " + accuracy);

// Save and load model
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/javaLogisticRegressionWithLBFGSModel");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java" で見つかります。

以下の例は標本データセットをどうやってロードするかを示し、ロジスティック回帰モデルを構築し、訓練エラーを計算するために結果のモデルを使って予測を行います。

Python API はまだ多クラス分類とモデルの保存/ロードをサポートしていませんが、将来的にサポートされるでしょう。

APIについての詳細はLogisticRegressionWithLBFGS Python ドキュメント および>LogisticRegressionModel Python ドキュメントを参照してください。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py" で見つかります。

回帰

線形最小2乗、ラッソ、リッジ回帰

線形最小二乗は回帰問題のための最も一般的な公式です。上で説明された等式$\eqref{eq:regPrimal}$で表される線形メソッドで、二乗損失によって与えられる公式の中での損失関数を持ちます: \[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]

異なるタイプの正規化を使って様々な関連する回帰メソッドが派生されます: 最小二乗推定法 あるいは 線形最小二乗 は正規化を使用しません; リッジ回帰 はL2正規化を使用します; ラッソ はL1正規化を使用します。これらのモデルに関して、平均損失あるいは訓練エラー $\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$ は 平均二乗エラーとして知られています。

以下の例は訓練データをどうやってロードするかを実演し、それをラベル付きのポイントのRDDとしてパースします。そして、レベル値を予想する簡単な線形モデルを構築するために LinearRegressionWithSGD を使用します。適合度を評価するために最後に平均二乗エラーを計算します。

APIの詳細はLinearRegressionWithSGD Scala ドキュメント および LinearRegressionModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LinearRegressionModel
import org.apache.spark.mllib.regression.LinearRegressionWithSGD

// Load and parse the data
val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()

// Building the model
val numIterations = 100
val stepSize = 0.00000001
val model = LinearRegressionWithSGD.train(parsedData, numIterations, stepSize)

// Evaluate model on training examples and compute training error
val valuesAndPreds = parsedData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2) }.mean()
println("training Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "target/tmp/scalaLinearRegressionWithSGDModel")
val sameModel = LinearRegressionModel.load(sc, "target/tmp/scalaLinearRegressionWithSGDModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegressionWithSGDExample.scala" で見つかります。

RidgeRegressionWithSGDLassoWithSGDLinearRegressionWithSGDと同じやり方で使用することができます。

全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDDオブジェクト上で.rdd() を呼ぶことでJava RDDをScala RDDに変換することができます。与えれられたScalaの断片に対応するJavaの例は以下のようになります:

APIの詳細はLinearRegressionWithSGD Java ドキュメント および LinearRegressionModel Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;

// Load and parse the data
String path = "data/mllib/ridge-data/lpsa.data";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<LabeledPoint> parsedData = data.map(
  new Function<String, LabeledPoint>() {
    public LabeledPoint call(String line) {
      String[] parts = line.split(",");
      String[] features = parts[1].split(" ");
      double[] v = new double[features.length];
      for (int i = 0; i < features.length - 1; i++) {
        v[i] = Double.parseDouble(features[i]);
      }
      return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
    }
  }
);
parsedData.cache();

// Building the model
int numIterations = 100;
double stepSize = 0.00000001;
final LinearRegressionModel model =
  LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations, stepSize);

// Evaluate model on training examples and compute training error
JavaRDD<Tuple2<Double, Double>> valuesAndPreds = parsedData.map(
  new Function<LabeledPoint, Tuple2<Double, Double>>() {
    public Tuple2<Double, Double> call(LabeledPoint point) {
      double prediction = model.predict(point.features());
      return new Tuple2<>(prediction, point.label());
    }
  }
);
double MSE = new JavaDoubleRDD(valuesAndPreds.map(
  new Function<Tuple2<Double, Double>, Object>() {
    public Object call(Tuple2<Double, Double> pair) {
      return Math.pow(pair._1() - pair._2(), 2.0);
    }
  }
).rdd()).mean();
System.out.println("training Mean Squared Error = " + MSE);

// Save and load model
model.save(sc.sc(), "target/tmp/javaLinearRegressionWithSGDModel");
LinearRegressionModel sameModel = LinearRegressionModel.load(sc.sc(),
  "target/tmp/javaLinearRegressionWithSGDModel");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaLinearRegressionWithSGDExample.java" で見つかります。

以下の例は訓練データをどうやってロードするかを実演し、それをラベル付きのポイントのRDDとしてパースします。そして、レベル値を予想する簡単な線形モデルを構築するために LinearRegressionWithSGD を使用します。適合度を評価するために最後に平均二乗エラーを計算します。

Python API はまだモデルの保存/ロードをサポートしていませんが、将来的にサポートされるでしょう。

APIについての詳細はLinearRegressionWithSGD Python ドキュメント およびLinearRegressionModel Python ドキュメントを参照してください。

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData, iterations=100, step=0.00000001)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds \
    .map(lambda (v, p): (v - p)**2) \
    .reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
sameModel = LinearRegressionModel.load(sc, "target/tmp/pythonLinearRegressionWithSGDModel")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/linear_regression_with_sgd_example.py" で見つかります。

上のアプリケーションを実行するためには、Sparkクイックガイドの自己内包型アプリケーション の章で提供される説明に従ってください。依存性としてビルドファイルにspark-mllibも含めるようにしてください。

ストリーミング線形回帰

データがストリーム形式で到着すると、それはオンラインで回帰モデルに適合するために利用され、新しいデータの到着と共にパラメータが更新されます。spark.mllib は現在のところ最小二乗推定を使ったストリーミング線形回帰をサポートします。適合は、データの各バッチごとに適合が行われることを除いて、オフラインで実行されるものに似ていて、モデルはストリームからのデータを反映するために頻繁に更新されます。

以下の例はテキストファイルの2つの異なるテキストストリームからの訓練およびテストデータをどうやってロードするかを説明し、ラベルが付けられたポイントとしてストリームをパースし、最初のストリームにオンラインで線形回帰モデルを適合し、次のストリームの予想を行います。

まず、入力データのパースとモデルの作成のために必要なクラスをインポートします。

そして、訓練およびテストデータのために入力ストリームを作成します。StreamingContext ssc がすでに生成されていると仮定します。詳細な情報はSpark ストリーミング プログラミングガイド を見てください。この例については、訓練およびテストストリーム内のラベル付きのポイントを使いますが、実際にはテストデータとしてラベルが付いていないベクトルを使いたいと思うかも知れないでしょう。

重み付けを0に初期化することでモデルを生成し、クンrねとテストのためにストリームを登録し、ジョブを開始します。trueラベルのそばに予想を出力すると、結果が見やすくなるでしょう。

最後に、データのテキストファイルを訓練あるいはテストのフォルダに保存することができます。各行は、y がラベルで x1,x2,x3が特徴とした時に、(y,[x1,x2,x3])として整形されたデータポイントであるべきです。args(0)にテキストファイルが置かれるといつでもモデルが更新されるでしょう。args(1)にテキストファイルが置かれるといつでも予測を見ることができるでしょう。訓練ディレクトリにもっと多くのデータを与えると、予想はもっと良くなるでしょう。

以下は完全な例です:

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegressionExample.scala" で見つかります。

まず、入力データのパースとモデルの作成のために必要なクラスをインポートします。

そして、訓練およびテストデータのために入力ストリームを作成します。StreamingContext ssc がすでに生成されていると仮定します。詳細な情報はSpark ストリーミング プログラミングガイド を見てください。この例については、訓練およびテストストリーム内のラベル付きのポイントを使いますが、実際にはテストデータとしてラベルが付いていないベクトルを使いたいと思うかも知れないでしょう。

重み付けを0で初期化することでモデルを生成します。

ここで、訓練およびテストのためのストリームを登録し、ジョブを開始します。

これで訓練あるいはテストフォルダーへデータのテキストファイルを保存することができます。各行は、y がラベルで x1,x2,x3が特徴とした時に、(y,[x1,x2,x3])として整形されたデータポイントであるべきです。sys.argv[1]にテキストファイルが置かれるといつでもモデルが更新されるでしょう。sys.argv[2]にテキストファイルが置かれるといつでも予測を見ることができるでしょう。訓練ディレクトリにもっと多くのデータを与えると、予想はもっと良くなるでしょう。

以下は完全な例です:

import sys

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)

numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/streaming_linear_regression_example.py" で見つかります。

実装 (開発者)

背景では、spark.mllib は基礎となる勾配降下プリミティブ(最適化 の章で説明されます)を構築して、単純な確率論的な勾配降下(SDG)の分散バージョンを実装します。与えられた全てのアルゴリズムは入力として正規化パラメータ (regParam) と確率論的な勾配降下(stepSize, numIterations, miniBatchFraction)に関連する様々なパラメータを取ります。それらのそれぞれについて、3つの正規化(none、L1 あるいは L2)をサポートします。

SGDバージョンは二値ロジスティック回帰のみをサポートするため、ロジスティック回帰について、L-BFGS バージョンは LogisticRegressionWithLBFGSのもとで実装され、このバージョンは二値および多項ロジスティック回帰の両方をサポートします。しかし、L-BFGS バージョンはL1正規化をサポートしませんが、SGDバージョンはL1正規化をサポートします。L1正規化が必要ではない場合、L-BFGSバージョンはquasi-Newtonメソッドを使った逆Hessianマトリックの見積もりによって、SGBに比べて高速およびより正確に集約するため、とてもお勧めです。

アルゴリズムは全てScalaで実装されています:

Pythonは PythonMLLibAPIを使ってScalaの実装を呼び出します。

TOP
inserted by FC2 system