クラスタリング - spark.mllib

クラスタリングはなんらかの類似性の注釈に基づいてエンティティのサブセットのお互いにグループ化することを目的とする管理されない学習問題です。クラスタリングは探索的解析 および/あるいは 階層的な管理された学習パイプライン(個々の分類器あるいは回帰モデルはそれぞれのクラスタのために訓練されます)の構成要素としてしばしば使われます。

spark.mllib パッケージは以下のモデルをサポートします:

K-平均法

k-平均法 は最も一般的に使われる、事前に定義したクラスタ数までデータを群にする、クラスタリング アルゴリズムです。spark.mllib の実装はkmeans||と呼ばれるk-means++メソッドの並行化された変形を含んでいます。spark.mlでの実装は以下のパラメータを持ちます:

以下のコード断片は spark-shellの中で実行することができます。

以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにKMeans オブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから Within Set Sum of Squared Error (WSSSE) を計算します。kを増やすことでこのエラー指標を減らすことができます。実際のところ最適なk は通常1つで、WSSSE グラフの中で"elbow"があります。

APIの詳細はKMeans Scala ドキュメント および KMeansModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

// Save and load model
clusters.save(sc, "myModelPath")
val sameModel = KMeansModel.load(sc, "myModelPath")

全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDDオブジェクト上で.rdd() を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:

APIの詳細はKMeans Java ドキュメント および KMeansModel Java ドキュメント を参照してください。

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

public class KMeansExample {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("K-means Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse data
    String path = "data/mllib/kmeans_data.txt";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Vector> parsedData = data.map(
      new Function<String, Vector>() {
        public Vector call(String s) {
          String[] sarray = s.split(" ");
          double[] values = new double[sarray.length];
          for (int i = 0; i < sarray.length; i++)
            values[i] = Double.parseDouble(sarray[i]);
          return Vectors.dense(values);
        }
      }
    );
    parsedData.cache();

    // Cluster the data into two classes using KMeans
    int numClusters = 2;
    int numIterations = 20;
    KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    double WSSSE = clusters.computeCost(parsedData.rdd());
    System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

    // Save and load model
    clusters.save(sc.sc(), "myModelPath");
    KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
  }
}

以下の例はPySparkシェルの中でテストすることができます。

以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにKMeansオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから Within Set Sum of Squared Error (WSSSE) を計算します。kを増やすことでこのエラー指標を減らすことができます。実際のところ最適なk は通常1つで、WSSSE グラフの中で"elbow"があります。

APIについての詳細はKMeans Python ドキュメント およびKMeansModel Python ドキュメントを参照してください。

from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
        runs=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "myModelPath")
sameModel = KMeansModel.load(sc, "myModelPath")

混合ガウス

混合ガウスモデルはそれぞれが独自の確率を持つ kガウシアン副分布の一つから引き出される合成分布を表します。spark.mllibの実装は、指定された標本のセットの最尤推定モデルを引き起こすために期待値最大化法アルゴリズムを使用します。実装は以下のパラメータを持ちます:

以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにGaussianMixtureオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから混合モデルのパラメータを出力します。

APIの詳細はGaussianMixture Scala ドキュメント および GaussianMixtureModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.clustering.GaussianMixture
import org.apache.spark.mllib.clustering.GaussianMixtureModel
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)

// Save and load model
gmm.save(sc, "myGMMModel")
val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")

// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
  println("weight=%f\nmu=%s\nsigma=\n%s\n" format
    (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}

全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDDオブジェクト上で.rdd() を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:

APIの詳細はGaussianMixture Java ドキュメント および GaussianMixtureModel Java ドキュメント を参照してください。

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

public class GaussianMixtureExample {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("GaussianMixture Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse data
    String path = "data/mllib/gmm_data.txt";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Vector> parsedData = data.map(
      new Function<String, Vector>() {
        public Vector call(String s) {
          String[] sarray = s.trim().split(" ");
          double[] values = new double[sarray.length];
          for (int i = 0; i < sarray.length; i++)
            values[i] = Double.parseDouble(sarray[i]);
          return Vectors.dense(values);
        }
      }
    );
    parsedData.cache();

    // Cluster the data into two classes using GaussianMixture
    GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

    // Save and load GaussianMixtureModel
    gmm.save(sc.sc(), "myGMMModel");
    GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(), "myGMMModel");
    // Output the parameters of the mixture model
    for(int j=0; j<gmm.k(); j++) {
        System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
            gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
    }
  }
}

以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにGaussianMixtureオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから混合モデルのパラメータを出力します。

APIについての詳細はGaussianMixture Python ドキュメント およびGaussianMixtureModel Python ドキュメントを参照してください。

from pyspark.mllib.clustering import GaussianMixture
from numpy import array

# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))

# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)

# output parameters of model
for i in range(2):
    print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
        "sigma = ", gmm.gaussians[i].sigma.toArray())

べき乗クラスタリング (PIC)

べき乗クラスタリング(PIC) は、Lin and Cohen, Power Iteration Clusteringで説明される辺のプロパティとして指定されたペアのグラフの頂点のクラスタリングのためのスケーラブルで効果的なアルゴリズムです。power iterationを使ってグラフの正規化された類似性のマトリックスの固有ベクトルを計算し、それを使って頂点のクラスタ化を行います。spark.mllib はバックエンドとしてGraphXを使ったPICの実装を含んでいます。(srcId, dstId, similarity)タプルのRDDを取り、割り当てのクラスタリングを持つモデルを出力します。類似性は非負数でなければなりません。PIC は類似性の指標を対照的なものとして仮定します。順番に関係なく (srcId, dstId)のペアは入力データの中で最大一度だけ現れるべきです。ペアが入力に無い場合、それらの類似性はゼロとして扱われます。spark.mllibの PIC 実装は以下の(ハイパー)パラメータを取ります:

以下では、spark.mllibでPICをどうやって使うかを実演するためにコードの断片を示します。

PowerIterationClustering はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDD を取ります。PowerIterationClustering.runの呼び出しは PowerIterationClusteringModelを返します。これは計算されたクラスタリングの割り当てを含みます。

APIの詳細はPowerIterationClustering Scala ドキュメント および PowerIterationClusteringModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/pic_data.txt")
val similarities = data.map { line =>
  val parts = line.split(' ')
  (parts(0).toLong, parts(1).toLong, parts(2).toDouble)
}

// Cluster the data into two classes using PowerIterationClustering
val pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10)
val model = pic.run(similarities)

model.assignments.foreach { a =>
  println(s"${a.id} -> ${a.cluster}")
}

// Save and load model
model.save(sc, "myModelPath")
val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")

PICの論文で説明される実験を生成する完全な例はexamples/の下で見つかります。

PowerIterationClustering はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double) タプルのJavaRDDを取ります。PowerIterationClustering.runの呼び出しは PowerIterationClusteringModelを返します。これは計算されたクラスタリングの割り当てを含みます。

APIの詳細はPowerIterationClustring Java ドキュメント および PowerIterationClustringModel Java ドキュメント を参照してください。

import scala.Tuple2;
import scala.Tuple3;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;

// Load and parse the data
JavaRDD<String> data = sc.textFile("data/mllib/pic_data.txt");
JavaRDD<Tuple3<Long, Long, Double>> similarities = data.map(
  new Function<String, Tuple3<Long, Long, Double>>() {
    public Tuple3<Long, Long, Double> call(String line) {
      String[] parts = line.split(" ");
      return new Tuple3<>(new Long(parts[0]), new Long(parts[1]), new Double(parts[2]));
    }
  }
);

// Cluster the data into two classes using PowerIterationClustering
PowerIterationClustering pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);

for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
  System.out.println(a.id() + " -> " + a.cluster());
}

// Save and load model
model.save(sc.sc(), "myModelPath");
PowerIterationClusteringModel sameModel = PowerIterationClusteringModel.load(sc.sc(), "myModelPath");

PowerIterationClustering はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDD を取ります。PowerIterationClustering.runの呼び出しは PowerIterationClusteringModelを返します。これは計算されたクラスタリングの割り当てを含みます。

APIについての詳細はPowerIterationClustring Python ドキュメント およびPowerIterationClustringModel Python ドキュメントを参照してください。

from __future__ import print_function
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel

# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))

# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)

model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))

# Save and load model
model.save(sc, "myModelPath")
sameModel = PowerIterationClusteringModel.load(sc, "myModelPath")

潜在的ディレクレ配分法(LDA)

潜在的ディレクレ配分法 (LDA) はテキストドキュメントのコレクションからトピックを推測するトピックモデルです。LDA は以下のようにクラスタリングアルゴリズムとして考えることができます:

LDA はsetOptimizer 関数を使って異なる推測アルゴリズムをサポートします。EMLDAOptimizer は尤度関数上で期待最大化法を使ってクラスタリングを学習し、包括的な結果をもたらします。一方でOnlineLDAOptimizerオンライン変分推定法のために繰り返しのミニバッチ標本化を使用し、一般的にメモリに優しいです。

LDAは単語の数と以下のパラメータ(ビルダーパターンを使って設定されます)のベクトルとしてドキュメントのコレクションを取り入れます:

全てのspark.mllibの LDA モデルは以下をサポートします:

Note: LDAはまだ活発に開発中の実験的な機能です。結果として、ある特徴はオプティマイザーによって生成された2つのオプティマイザー/モデルのうちの一つの中でのみ利用可能です。現在のところ、分散型モデルはローカルモデルに変換可能ですが、逆はそうではありません。

以下の議論で各最適器/モデルのペアを別々に説明するつもりです。

予測の最大化

EMLDAOptimizer および DistributedLDAModelの中で実装されています。

以下は LDAに提供されているパラメータです:

注意: 十分な繰り返しを行うことが重要です。前回の繰り返しの中で、EMはしばしば意味の無いトピックを持ちますが、それらのトピックは更に繰り返しをした後で動的に改善します。データセットによりますが、少なくとも20、時には50-100の繰り返しが妥当です。

EMLDAOptimizerDistributedLDAModelを生成します。これは推測されたトピックだけではなく、完全な訓練コーパスおよび訓練コーパス内の各ドキュメントのためのトピックの分散を格納します。DistributedLDAModel は以下をサポートします:

オンライン変動ベイズ

OnlineLDAOptimizer および LocalLDAModelの中で実装されています。

以下は LDAに提供されているパラメータです:

更にOnlineLDAOptimizer は以下のパラエメータを受け付けます:

OnlineLDAOptimizerLocalLDAModelを生成します。これは推測されたトピックのみを格納します。LocalLDAModel は以下をサポートします:

以下の例では、ドキュメントのコープスを表すワードカウントベクトルをロードします。そしてドキュメントから3つのトピックスを推測するために LDA を使用します。要求するクラスタの数はアルゴリズムに渡されます。そして、単語の確率分散として表されるトピックスを出力します。

APIの詳細はLDA Scala ドキュメント および DistributedLDAModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print("Topic " + topic + ":")
  for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
  println()
}

// Save and load model.
ldaModel.save(sc, "myLDAModel")
val sameModel = DistributedLDAModel.load(sc, "myLDAModel")

APIの詳細はLDA Java ドキュメント および DistributedLDAModel Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;

public class JavaLDAExample {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("LDA Example");
    JavaSparkContext sc = new JavaSparkContext(conf);

    // Load and parse the data
    String path = "data/mllib/sample_lda_data.txt";
    JavaRDD<String> data = sc.textFile(path);
    JavaRDD<Vector> parsedData = data.map(
        new Function<String, Vector>() {
          public Vector call(String s) {
            String[] sarray = s.trim().split(" ");
            double[] values = new double[sarray.length];
            for (int i = 0; i < sarray.length; i++)
              values[i] = Double.parseDouble(sarray[i]);
            return Vectors.dense(values);
          }
        }
    );
    // Index documents with unique IDs
    JavaPairRDD<Long, Vector> corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
        new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
          public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
            return doc_id.swap();
          }
        }
    ));
    corpus.cache();

    // Cluster the documents into three topics using LDA
    DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);

    // Output topics. それぞれは単語上の分散です(ワードカウントベクトルに一致します)
    System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
        + " words):");
    Matrix topics = ldaModel.topicsMatrix();
    for (int topic = 0; topic < 3; topic++) {
      System.out.print("Topic " + topic + ":");
      for (int word = 0; word < ldaModel.vocabSize(); word++) {
        System.out.print(" " + topics.apply(word, topic));
      }
      System.out.println();
    }

    ldaModel.save(sc.sc(), "myLDAModel");
    DistributedLDAModel sameModel = DistributedLDAModel.load(sc.sc(), "myLDAModel");
  }
}

APIについての詳細はLDA Python ドキュメント およびLDAModel Python ドキュメントを参照してください。

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. それぞれは単語上の分散です(ワードカウントベクトルに一致します)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))
		
# Save and load model
model.save(sc, "myModelPath")
sameModel = LDAModel.load(sc, "myModelPath")

二値k平均法

二値k平均法はしばしば通常のK平均法よりかなり速くなりますが、一般的に異なるクラスタリングを生成します。

二値k平均法は階層クラスタリングの一種です。階層クラスタリングはクラスタ階層を構築しようとするクラスタの解析に最もよく使われる方法です。階層クラスタリングのためのストラテジは一般的に2つの種類に分類されます:

二値k平均法アルゴリズムはdivisiveアルゴリズムです。MLlibでの実装は以下のパラメータを持ちます:

APIの詳細はBisectingKMeans Scala ドキュメント および BisectingKMeansModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)

// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
  println(s"Cluster Center ${idx}: ${center}")
}
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala" で見つかります。

APIの詳細はBisectingKMeans Java ドキュメント および BisectingKMeans Java ドキュメント を参照してください。

import com.google.common.collect.Lists;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

ArrayList<Vector> localData = Lists.newArrayList(
  Vectors.dense(0.1, 0.1),   Vectors.dense(0.3, 0.3),
  Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
  Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
  Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);

BisectingKMeans bkm = new BisectingKMeans()
  .setK(4);
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));
for (Vector center: model.clusterCenters()) {
  System.out.println("");
}
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
  Vector clusterCenter = clusterCenters[i];
  System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java" で見つかります。

ストリーミング k-平均法

データがストリーミングで到着する場合、動的にクラスタを推測し、新しいデータの到着とともにそれらを更新したいと思うかも知れません。spark.mllib は推測の衰退(あるいは"忘却")を制御するパラメータを持つストリーミングk-平均クラスタリングのサポートを提供します。アルゴリズムはmin-batch k-平均法 更新ルールの一般化を使用します。それぞれのデータの束について、それらの最も近いクラスタに全ての点を割り当て、新しいクラスタの中心を計算し、各クラスタを更新します:

\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}

ここで$c_t$ はクラスタの以前の中心で、$n_t$ は今のところ割り当てられている点の数、$x_t$ は現在の束による新しいクラスタの中心、そして $m_t$ は現在の束でクラスタに追加される点の数です。減衰要素$\alpha$ は過去を無視するために使うことができます: $\alpha$=1 は最初からの全てのデータが使われるでしょう; $\alpha$=0 は最も最近のデータだけが使われるでしょう。これは対数移動平均に似ています。

The decay can be specified using a halfLife parameter, which determines the correct decay factor a such that, for data acquired at time t, its contribution by time t + halfLife will have dropped to 0.5. batches あるいはpoints のどちからで時間の単位を指定することができ、更新ルールはそれに応じて調整されるでしょう。

この例はストリーミングデータ上でクラスタを推測する方法を示します。

APIの詳細はStreamingKMeans Scala ドキュメント を参照してください。

まず必要なクラスをimportします。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans

そして、訓練のためにベクトルの入力ストリームとテスト用のラベル付きのデータの点のストリームを作成します。StreamingContext ssc が生成されていると仮定します。詳細な情報はSpark ストリーミング プログラミングガイド を見てください。

val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

ランダムなクラスタを使ってモデルを生成し、見つけるクラスタの数を指定します

val numDimensions = 3
val numClusters = 2
val model = new StreamingKMeans()
  .setK(numClusters)
  .setDecayFactor(1.0)
  .setRandomCenters(numDimensions, 0.0)

これで、訓練とテストのためのストリームを登録し、ジョブを開始し、新しいデータの点が到達した時の予想クラスタの割り当てを出力します。

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()

APIの詳細はStreamingKMeans Python ドキュメント を参照してください。

まず必要なクラスをimportします。

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

そして、訓練のためにベクトルの入力ストリームとテスト用のラベル付きのデータの点のストリームを作成します。StreamingContext ssc が生成されていると仮定します。詳細な情報はSpark ストリーミング プログラミングガイド を見てください。

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
testData = ssc.textFileStream("/testing/data/dir").map(parse)

ランダムなクラスタを使ってモデルを生成し、見つけるクラスタの数を指定します

model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)

これで、訓練とテストのためのストリームを登録し、ジョブを開始し、新しいデータの点が到達した時の予想クラスタの割り当てを出力します。

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()

データを使って新しいテキストファイルを追加すると、クラスタの中心が更新されるでしょう。各訓練の点は[x1, x2, x3]で、各テストデータの点は(y, [x1, x2, x3])で整形されていなければなりません。ここでyはなんらかの有用なラベルまたは識別子です(例えば、trueの分類割り当て)。/training/data/dirにテキストファイルが置かれるといつでもモデルが更新されるでしょう。/testing/data/dirにテキストファイルが置かれるといつでも予想を見ることができるでしょう。新しいデータを使ってクラスタの中心は変更されるでしょう!

TOP
inserted by FC2 system