最適化 - RDDベースのAPI

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学的な説明

勾配降下法

$\min_{\wv \in\R^d} \; f(\wv)$の形式の最適化問題を解決する一番簡単な方法は 勾配降下法です。一次の最適化メソッド(勾配降下とその確率論的な変種を含む)は大規模および分散計算にとても適しています。

勾配降下法は最も急勾配の下り坂の方向へ繰り返し向かうことでローカルな最小の関数を見つけることを目的としています。これは現在の場所、つまり現在のパラメータ値での関数の消極的な導関数(勾配)です。目的の関数$f$ が全ての引数で判別不能だがまだ凸状であれば、副勾配が勾配のありのままの一般化であり、ステップの方向の役割と見なされます。どの場合においても、$f$の勾配あるいは副勾配の計算は高くつきます - 全ての損失単語からの寄与を計算するためにデータセット全体の完全な通過を必要とします。

確率的勾配降下法 (SGD)

Optimization problems whose objective function $f$ is written as a sum are particularly suitable to be solved using stochastic gradient descent (SGD). この場合、管理された機械学習で一般的に使われる最適化の公式に関しては、\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ 。\end{equation} 損失が各データポイントから来る個々の損失の平均として書かれるため、これは特に普通です。

A stochastic subgradient is a randomized choice of a vector, such that in expectation, we obtain a true subgradient of the original objective function. Picking one datapoint $i\in[1..n]$ uniformly at random, we obtain a stochastic subgradient of $\eqref{eq:regPrimal}$, with respect to $\wv$ as follows: \[ f'_{\wv,i} := L'_{\wv,i} + \lambda\, R'_\wv \ , \] where $L'_{\wv,i} \in \R^d$ is a subgradient of the part of the loss function determined by the $i$-th datapoint, that is $L'_{\wv,i} \in \frac{\partial}{\partial \wv} L(\wv;\x_i,y_i)$. 更に、 $R'_\wv$ は正規化木$R(\wv)$の副勾配です。つまり、$R'_\wv \in \frac{\partial}{\partial \wv} R(\wv)$。単語$R'_\wv$ はどのランダムなデータポイントが取り上げられたかに依存しません。Clearly, in expectation over the random choice of $i\in[1..n]$, we have that $f'_{\wv,i}$ is a subgradient of the original objective $f$, meaning that $\E\left[f'_{\wv,i}\right] \in \frac{\partial}{\partial \wv} f(\wv)$.

Running SGD now simply becomes walking in the direction of the negative stochastic subgradient $f'_{\wv,i}$, that is \begin{equation}\label{eq:SGDupdate} \wv^{(t+1)} := \wv^{(t)} - \gamma \; f'_{\wv,i} \ . \end{equation} Step-size. The parameter $\gamma$ is the step-size, which in the default implementation is chosen decreasing with the square root of the iteration counter, i.e. $\gamma := \frac{s}{\sqrt{t}}$ in the $t$-th iteration, with the input parameter $s=$ stepSize. SGDメソッドの最適なステップサイズの選択は実際において繊細なものになりえて、それは能動的な調査のトピックです。

勾配。 spark.mllibで実装される機械学習メソッドの(副)勾配の表。分類および回帰 の章で利用可能です。

Proximal Updates. As an alternative to just use the subgradient $R'(\wv)$ of the regularizer in the step direction, an improved update for some cases can be obtained by using the proximal operator instead. L1-regularizerについては、近傍オペレータはL1Updaterで実装されるようにソフト閾値によって与えられます。

分散されたSGDのためのスキーマの更新

GradientDescentでのSGDの実装は単純な(分散した)データ例のサンプリングを使用します。最適化問題$\eqref{eq:regPrimal}$ の損失部分は $\frac1n \sum_{i=1}^n L(\wv;\x_i,y_i)$であり、従って$\frac1n \sum_{i=1}^n L'_{\wv,i}$ はtrueの(副)勾配になりえるでしょう。これはデータセット全体へのアクセスを必要とするため、パラメータminiBatchFractionは完全なデータのどの部分を代わりに使用するかを指定します。この部分集合の勾配の平均、つまり\[ \frac1{|S|} \sum_{i\in S} L'_{\wv,i} \ , \] は確率的な勾配です。ここで、$S$はサイズ$|S|=$ miniBatchFraction $\cdot n$の標本化された部分集合です。

各繰り返しの中で、分散されたデータセット(RDD)のサンプリングと、各ワーカーマシーンからの結果の部分の合計の計算は標準のsparkの決まりきった作業によって行われます。

If the fraction of points miniBatchFraction is set to 1 (default), then the resulting step in each iteration is exact (sub)gradient descent. この場合、ランダムさは無く、使用されたステップの方向には変数がありません。On the other extreme, if miniBatchFraction is chosen very small, such that only a single point is sampled, i.e. $|S|=$ miniBatchFraction $\cdot n = 1$, then the algorithm is equivalent to standard SGD. In that case, the step direction depends from the uniformly random sampling of the point.

メモリ制限 BFGS (L-BFGS)

L-BFGS$\min_{\wv \in\R^d} \; f(\wv)$の形式の最適化問題を解決するための類似ニュートンメソッドのファミリーの最適化アルゴリズムです。L-BFGS メソッドはHessianマトリックスを構築するために、対象の関数の2次部分導関数を評価すること無しに局所的な2次の関数として対象の関数を近似します。The Hessian matrix is approximated by previous gradient evaluations, so there is no vertical scalability issue (the number of training features) when computing the Hessian matrix explicitly in Newton’s method. As a result, L-BFGS often achieves rapider convergence compared with other first-order optimization.

最適化メソッドの選択

線形メソッド は内部的に最適化を使い、spark.mllibのいくつかの線形メソッドはSGDとL-BFGSの両方をサポートします。Different optimization methods can have different convergence guarantees depending on the properties of the objective function, and we cannot cover the literature here. 一般的に、L-BFGSは速く(少ない繰り替えしで)収束する傾向があるため、L-BFGSが利用可能な時にはSGDの代わりに利用することをお勧めします。

MLlibでの実装

勾配降下法と確率的勾配降下法

Gradient descent methods including stochastic subgradient descent (SGD) as included as a low-level primitive in MLlib, upon which various ML algorithms are developed, see the linear methods section for example.

SGD クラス GradientDescentは以下のパラメータを設定します:

L-BFGS

L-BFGS is currently only a low-level optimization primitive in MLlib. If you want to use L-BFGS in various ML algorithms such as Linear Regression, and Logistic Regression, you have to pass the gradient of objective function, and updater into optimizer yourself instead of using the training APIs like LogisticRegressionWithSGD. 以下の例を見てください。それは次のリリースで言及されるでしょう。

L1Updaterを使ったL1正規化は、L1Updaterでのsoft-thresholdingロジックが勾配降下法のために設計されていないために、動作しないでしょう。開発者のメモを見てください。

L-BFGS メソッド LBFGS.runLBFGS は以下のパラメータを持ちます:

返り値は2つの要素を含むタプルです。最初の要素は各特長の重み付けを含むカラムのマトリックスで、2つ目の要素は各繰り返しのために計算される損失を含む配列です。

L-BFGS最適化を使ったL2正規化を持つ2分ロジスティック回帰を訓練するための例です。

APIの詳細はLBFGS Scala ドキュメント および SquaredL2Updater Scala ドキュメント を参照してください。

import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.optimization.{LBFGS, LogisticGradient, SquaredL2Updater}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

// Append 1 into the training data as intercept.
val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()

val test = splits(1)

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

val model = new LogisticRegressionModel(
  Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
  weightsWithIntercept(weightsWithIntercept.size - 1))

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Loss of each step in training process")
loss.foreach(println)
println("Area under ROC = " + auROC)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/LBFGSExample.scala" で見つかります。

APIの詳細はLBFGS Java ドキュメント および SquaredL2Updater Java ドキュメント を参照してください。

import java.util.Arrays;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.optimization.*;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
int numFeatures = data.take(1).get(0).features().size();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> trainingInit = data.sample(false, 0.6, 11L);
JavaRDD<LabeledPoint> test = data.subtract(trainingInit);

// Append 1 into the training data as intercept.
JavaRDD<Tuple2<Object, Vector>> training = data.map(
  new Function<LabeledPoint, Tuple2<Object, Vector>>() {
    public Tuple2<Object, Vector> call(LabeledPoint p) {
      return new Tuple2<Object, Vector>(p.label(), MLUtils.appendBias(p.features()));
    }
  });
training.cache();

// Run training algorithm to build the model.
int numCorrections = 10;
double convergenceTol = 1e-4;
int maxNumIterations = 20;
double regParam = 0.1;
Vector initialWeightsWithIntercept = Vectors.dense(new double[numFeatures + 1]);

Tuple2<Vector, double[]> result = LBFGS.runLBFGS(
  training.rdd(),
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept);
Vector weightsWithIntercept = result._1();
double[] loss = result._2();

final LogisticRegressionModel model = new LogisticRegressionModel(
  Vectors.dense(Arrays.copyOf(weightsWithIntercept.toArray(), weightsWithIntercept.size() - 1)),
  (weightsWithIntercept.toArray())[weightsWithIntercept.size() - 1]);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
  new Function<LabeledPoint, Tuple2<Object, Object>>() {
    public Tuple2<Object, Object> call(LabeledPoint p) {
      Double score = model.predict(p.features());
      return new Tuple2<Object, Object>(score, p.label());
    }
  });

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(scoreAndLabels.rdd());
double auROC = metrics.areaUnderROC();

System.out.println("Loss of each step in training process");
for (double l : loss)
  System.out.println(l);
System.out.println("Area under ROC = " + auROC);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaLBFGSExample.java" で見つかります。

開発者の記録

Since the Hessian is constructed approximately from previous gradient evaluations, the objective function can not be changed during the optimization process. As a result, Stochastic L-BFGS will not work naively by just using miniBatch; therefore, we don’t provide this until we have better understanding.

Updater はもともと実際の勾配降下ステップを計算する勾配降下法のために設計されたクラスです。しかし、However, we’re able to take the gradient and loss of objective function of regularization for L-BFGS by ignoring the part of logic only for gradient decent such as adaptive step size stuff. We will refactorize this into regularizer to replace updater to separate the logic between regularization and step update later.

TOP
inserted by FC2 system