クラスタリング - RDDベースのAPI
クラスタリングはなんらかの類似性の注釈に基づいてエンティティのサブセットのお互いにグループ化することを目的とする管理されない学習問題です。クラスタリングは探索的解析 および/あるいは 階層的な管理された学習パイプライン(個々の分類器あるいは回帰モデルはそれぞれのクラスタのために訓練されます)の構成要素としてしばしば使われます。
spark.mllib
パッケージは以下のモデルをサポートします:
K-平均法
k-平均法 は最も一般的に使われる、事前に定義したクラスタ数までデータを群にする、クラスタリング アルゴリズムです。spark.mllib
の実装はkmeans||と呼ばれるk-means++メソッドの並行化された変形を含んでいます。spark.ml
での実装は以下のパラメータを持ちます:
- k は要求するクラスタの数です。
- maxIterations は実行の繰り返しの最大数です。
- initializationMode はランダム初期化あるいはk-means||を使って初期化のどちらかを指定します。
- runs はk-meansアルゴリズムを実行する回数です(k-meansは大局的に最適な解決策を見つけることが保証されません。指定されたデータセット上で複数回実行した場合、アルゴリズムは最も良くクラスタ化された結果を返します。)
- initializationSteps は k-means|| アルゴリズム内でのステップ数を決定します。
- epsilon はk-meansが収束したと見なす距離の閾値を決定します。
- initialModel は初期化に使用されるクラスタの中心点の任意のセットです。このパラメータが渡された場合、実行は一度だけ行われます。
例
以下のコード断片は spark-shell
の中で実行することができます。
以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにKMeans
オブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから Within Set Sum of Squared Error (WSSSE) を計算します。kを増やすことでこのエラー指標を減らすことができます。実際のところ最適なk は通常1つで、WSSSE グラフの中で"elbow"があります。
APIの詳細はKMeans
Scala ドキュメント および KMeansModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD
クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDD
オブジェクト上で.rdd()
を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:
APIの詳細はKMeans
Java ドキュメント および KMeansModel
Java ドキュメント を参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
以下の例はPySparkシェルの中でテストすることができます。
以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにKMeansオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから Within Set Sum of Squared Error (WSSSE) を計算します。kを増やすことでこのエラー指標を減らすことができます。実際のところ最適なk は通常1つで、WSSSE グラフの中で"elbow"があります。
APIについての詳細はKMeans
Python ドキュメント およびKMeansModel
Python ドキュメントを参照してください。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
混合ガウス
混合ガウスモデルはそれぞれが独自の確率を持つ kガウシアン副分布の一つから引き出される合成分布を表します。spark.mllib
の実装は、指定された標本のセットの最尤推定モデルを引き起こすために期待値最大化法アルゴリズムを使用します。実装は以下のパラメータを持ちます:
- k は要求するクラスタの数です。
- convergenceTol は収束が達成されたと見なす対数尤度の最大の変更です。
- maxIterations は収束に至らないで達成するための最大の繰り返しの数です。
- initialModel はEMアルゴリズムを開始するための最適な開始場所です。パラメータが省略された場合、ランダムな開始点がデータからから構築されるでしょう。
例
以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにGaussianMixtureオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから混合モデルのパラメータを出力します。
APIの詳細はGaussianMixture
Scala ドキュメント および GaussianMixtureModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD
クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDD
オブジェクト上で.rdd()
を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:
APIの詳細はGaussianMixture
Java ドキュメント および GaussianMixtureModel
Java ドキュメント を参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
}
}
);
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
以下の例ではデータをロードおよびパースした後で、データを2つのクラスタにクラスタ化するためにGaussianMixtureオブジェクトを使います。要求するクラスタの数はアルゴリズムに渡されます。それから混合モデルのパラメータを出力します。
APIについての詳細はGaussianMixture
Python ドキュメント およびGaussianMixtureModel
Python ドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
べき乗クラスタリング (PIC)
べき乗クラスタリング(PIC) は、Lin and Cohen, Power Iteration Clusteringで説明される辺のプロパティとして指定されたペアのグラフの頂点のクラスタリングのためのスケーラブルで効果的なアルゴリズムです。power iterationを使ってグラフの正規化された類似性のマトリックスの固有ベクトルを計算し、それを使って頂点のクラスタ化を行います。spark.mllib
はバックエンドとしてGraphXを使ったPICの実装を含んでいます。(srcId, dstId, similarity)
タプルのRDD
を取り、割り当てのクラスタリングを持つモデルを出力します。類似性は非負数でなければなりません。PIC は類似性の指標を対照的なものとして仮定します。順番に関係なく (srcId, dstId)
のペアは入力データの中で最大一度だけ現れるべきです。ペアが入力に無い場合、それらの類似性はゼロとして扱われます。spark.mllib
の PIC 実装は以下の(ハイパー)パラメータを取ります:
k
: クラスタの数maxIterations
: べき乗の繰り返しの最大数initializationMode
: 初期化モデル。これは、頂点プロパティとしてランダムベクトルを使用するためにデフォルトの"random"、あるいは正規化された合計の類似性を使うために"degree"のどちらかです。
例
以下では、spark.mllib
でPICをどうやって使うかを実演するためにコードの断片を示します。
PowerIterationClustering
はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double)
タプルのRDD
を取ります。PowerIterationClustering.run
の呼び出しは PowerIterationClusteringModel
を返します。これは計算されたクラスタリングの割り当てを含みます。
APIの詳細はPowerIterationClustering
Scala ドキュメント および PowerIterationClusteringModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
PowerIterationClustering
はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double)
タプルのJavaRDD
を取ります。PowerIterationClustering.run
の呼び出しは PowerIterationClusteringModel
を返します。これは計算されたクラスタリングの割り当てを含みます。
APIの詳細はPowerIterationClustring
Java ドキュメント および PowerIterationClustringModel
Java ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Lists.newArrayList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
PowerIterationClustering
はPICアルゴリズムを実装します。類似性のマトリックスを表す(srcId: Long, dstId: Long, similarity: Double)
タプルのRDD
を取ります。PowerIterationClustering.run
の呼び出しは PowerIterationClusteringModel
を返します。これは計算されたクラスタリングの割り当てを含みます。
APIについての詳細はPowerIterationClustring
Python ドキュメント およびPowerIterationClustringModel
Python ドキュメントを参照してください。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
潜在的ディレクレ配分法 (LDA)
潜在的ディレクレ配分法 (LDA) はテキストドキュメントのコレクションからトピックを推測するトピックモデルです。LDA は以下のようにクラスタリングアルゴリズムとして考えることができます:
- クラスタの中心に対応するトピックスおよびドキュメントはデータセット中の例(行)に対応します。
- トピックスとドキュメントの両方は特徴空間に存在します。この時特徴ベクトルは単語のカウント数(bag of words)のベクトルです。
- 伝統的な距離を使ってクラスタリングの評価をするのではなく、LDAはテキストドキュメントが生成される方法の統計的モデルに基づいた関数を使用します。
LDA はsetOptimizer
関数を使って異なる推測アルゴリズムをサポートします。EMLDAOptimizer
は尤度関数上で期待最大化法を使ってクラスタリングを学習し、包括的な結果をもたらします。一方でOnlineLDAOptimizer
はオンライン変分推定法のために繰り返しのミニバッチ標本化を使用し、一般的にメモリに優しいです。
LDAは単語の数と以下のパラメータ(ビルダーパターンを使って設定されます)のベクトルとしてドキュメントのコレクションを取り入れます:
k
: トピックスの数(つまり、クラスタの中心)optimizer
: LADモデルを学習するために使われる最適器。EMLDAOptimizer
あるいはOnlineLDAOptimizer
。docConcentration
: トピック上のドキュメントの分散の優先度のためのディレクレパラメータ。大きな値は推測の分散の滑らかさを促します。topicConcentration
: 用語(単語)上のトピックの分散の優先度のためのディレクレパラメータ。大きな値は推測の分散の滑らかさを促します。maxIterations
: 繰り返しの数の制限。checkpointInterval
: (Spark設定の中で設定して)チェックポイントを使う場合、このパラメータはチェックポイントが生成される頻度を指定します。maxIterations
が大きい場合、チェックポイントを使うことでディスク上のシャッフルファイルサイズを削減するのに役立ち、障害時のリカバリに役に立ちます。
全てのspark.mllib
の LDA モデルは以下をサポートします:
describeTopics
: 最も重要な単語と単語の重みの配列としてトピックを返しますtopicsMatrix
: 各カラムがトピックのk
マトリックスによってvocabSize
を返します
Note: LDAはまだ活発に開発中の実験的な機能です。結果として、ある特徴はオプティマイザーによって生成された2つのオプティマイザー/モデルのうちの一つの中でのみ利用可能です。現在のところ、分散型モデルはローカルモデルに変換可能ですが、逆はそうではありません。
以下の議論で各最適器/モデルのペアを別々に説明するつもりです。
予測の最大化
EMLDAOptimizer
および DistributedLDAModel
の中で実装されています。
以下は LDA
に提供されているパラメータです:
docConcentration
: 対称的な優先度のみサポートされます。つまり、与えられるk
-次元ベクトルの全ての値は同一でなければなりません。また全ての値は $> 1.0$ でなければなりません。Vector(-1)
results in default behavior (uniformk
dimensional vector with value $(50 / k) + 1$topicConcentration
: 対称的な優先度のみサポートされます。値は $>1.0$ でなければなりません。もし-1
であれば、$0.1 + 1$ のデフォルト値になります。maxIterations
: EM繰り返しの最大数。
注意: 十分な繰り返しを行うことが重要です。前回の繰り返しの中で、EMはしばしば意味の無いトピックを持ちますが、それらのトピックは更に繰り返しをした後で動的に改善します。データセットによりますが、少なくとも20、時には50-100の繰り返しが妥当です。
EMLDAOptimizer
はDistributedLDAModel
を生成します。これは推測されたトピックだけではなく、完全な訓練コーパスおよび訓練コーパス内の各ドキュメントのためのトピックの分散を格納します。DistributedLDAModel
は以下をサポートします:
topTopicsPerDocument
: 訓練コーパス内の各ドキュメントのためのトップトピックスとそれらの重み付けtopDocumentsPerTopic
: 各トピックのためのトップドキュメントとドキュメント内のトピックに対応する重み付け。logPrior
: 指定されたハイパーパラメータdocConcentration
とtopicConcentration
の予測されたトピックとドキュメントトピックの分散の確率の対数logLikelihood
: 推測されたトピックとドキュメントトピックの分布と仮定した場合の訓練コープスの対数尤度
オンライン変動ベイズ
OnlineLDAOptimizer
および LocalLDAModel
の中で実装されています。
以下は LDA
に提供されているパラメータです:
docConcentration
: 各k
次元のディレクレパラメータに等しい値をもつベクトルで渡すことで対称的な優先度を使うことができます。値は $>= 0$ でなければなりません。Vector(-1)
の条件下でデフォルトの結果になります(値 $(1.0 / k)$ を持つk
次元のベクトルを揃えます)。topicConcentration
: 対称的な優先度のみサポートされます。値は $>= 0$ でなければなりません。もし-1
であれば、$(1.0 / k)$ のデフォルト値になります。maxIterations
: サブミットするためのミニバッチの最大数。
更にOnlineLDAOptimizer
は以下のパラエメータを受け付けます:
miniBatchFraction
: 各繰り返しで抽出され使用されるコーパスの断片optimizeDocConcentration
: trueに設定されると、各ミニバッチの後でハイパーパラメータdocConcentration
(別名alpha
) の最尤推定を実施し、返されたLocalLDAModel
内の最適化されたdocConcentration
を設定します。tau0
およびkappa
: 学習レートの減衰に使われ、$iter$ が現在の繰り返しの数とした時に $(\tau_0 + iter)^{-\kappa}$ によって計算されます。
OnlineLDAOptimizer
はLocalLDAModel
を生成します。これは推測されたトピックのみを格納します。LocalLDAModel
は以下をサポートします:
logLikelihood(documents)
: 与えられたdocuments
の指定された推測トピックの下限を計算します。logPerplexity(documents)
: 与えられたdocuments
の指定されたトピックの難問の上限を計算します。
例
以下の例では、ドキュメントのコープスを表すワードカウントベクトルをロードします。そしてドキュメントから3つのトピックスを推測するために LDA を使用します。要求するクラスタの数はアルゴリズムに渡されます。そして、単語の確率分散として表されるトピックスを出力します。
APIの詳細はLDA
Scala ドキュメント および DistributedLDAModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print("Topic " + topic + ":")
for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
APIの詳細はLDA
Java ドキュメント および DistributedLDAModel
Java ドキュメント を参照してください。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(
new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
}
}
);
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(
new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
return doc_id.swap();
}
}
)
);
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
APIについての詳細はLDA
Python ドキュメント およびLDAModel
Python ドキュメントを参照してください。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
二値k平均法
二値k平均法はしばしば通常のK平均法よりかなり速くなりますが、一般的に異なるクラスタリングを生成します。
二値k平均法は階層クラスタリングの一種です。階層クラスタリングはクラスタ階層を構築しようとするクラスタの解析に最もよく使われる方法です。階層クラスタリングのためのストラテジは一般的に2つの種類に分類されます:
- 集積: これは"ボトムアップ"のやり方です: 各観測は自身のクラスタ内で開始し、クラスタのペアが階層を上がる1つとしてマージされます。
- 分離: これは"トップダウン"のやり方です: 各観測は一つのクラスタの中で開始し、階層を下る1つとして分離が実行されます。
二値k平均法アルゴリズムはdivisiveアルゴリズムです。MLlibでの実装は以下のパラメータを持ちます:
- k: 期待する葉クラスタの数(デフォルト: 4)。分離できる葉クラスタが無い場合は実際の数は小さくなりえます。
- maxIterations: クラスタに分割するためのk平均法の繰り返しの最大数 (デフォルト: 20)
- minDivisibleClusterSize: (>=1.0であれば)点の最小の数。あるいは(<1.0であれば)分割可能なクラスタの点の最小比率 (デフォルト: 1)。
- seed: ランダムシード (デフォルト: クラス名のハッシュ値)
例
APIの詳細はBisectingKMeans
Scala ドキュメント および BisectingKMeansModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
APIの詳細はBisectingKMeans
Java ドキュメント および BisectingKMeans
Java ドキュメント を参照してください。
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
ArrayList<Vector> localData = Lists.newArrayList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
APIについての詳細はBisectingKMeans
Python ドキュメント およびBisectingKMeansModel
Python ドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans, BisectingKMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
# Save and load model
path = "target/org/apache/spark/PythonBisectingKMeansExample/BisectingKMeansModel"
model.save(sc, path)
sameModel = BisectingKMeansModel.load(sc, path)
ストリーミング k-平均法
データがストリーミングで到着する場合、動的にクラスタを推測し、新しいデータの到着とともにそれらを更新したいと思うかも知れません。spark.mllib
は推測の衰退(あるいは"忘却")を制御するパラメータを持つストリーミングk-平均クラスタリングのサポートを提供します。アルゴリズムはmin-batch k-平均法 更新ルールの一般化を使用します。それぞれのデータの束について、それらの最も近いクラスタに全ての点を割り当て、新しいクラスタの中心を計算し、各クラスタを更新します:
\begin{equation}
c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
\end{equation}
\begin{equation}
n_{t+1} = n_t + m_t
\end{equation}
ここで$c_t$
はクラスタの以前の中心で、$n_t$
は今のところ割り当てられている点の数、$x_t$
は現在の束による新しいクラスタの中心、そして $m_t$
は現在の束でクラスタに追加される点の数です。減衰要素$\alpha$
は過去を無視するために使うことができます: $\alpha$=1
は最初からの全てのデータが使われるでしょう; $\alpha$=0
は最も最近のデータだけが使われるでしょう。これは対数移動平均に似ています。
The decay can be specified using a halfLife
parameter, which determines the correct decay factor a
such that, for data acquired at time t
, its contribution by time t + halfLife
will have dropped to 0.5. batches
あるいはpoints
のどちからで時間の単位を指定することができ、更新ルールはそれに応じて調整されるでしょう。
例
この例はストリーミングデータ上でクラスタを推測する方法を示します。
APIの詳細はStreamingKMeans
Scala ドキュメント を参照してください。そして、StreamingContextの詳細については、Spark ストリーミング プログラミングガイド を参照してください。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
APIの詳細はStreamingKMeans
Python ドキュメント を参照してください。そして、StreamingContextの詳細については、Spark ストリーミング プログラミングガイド を参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
データを使って新しいテキストファイルを追加すると、クラスタの中心が更新されるでしょう。各訓練の点は[x1, x2, x3]
で、各テストデータの点は(y, [x1, x2, x3])
で整形されていなければなりません。ここでy
はなんらかの有用なラベルまたは識別子です(例えば、trueの分類割り当て)。/training/data/dir
にテキストファイルが置かれるといつでもモデルが更新されるでしょう。/testing/data/dir
にテキストファイルが置かれるといつでも予想を見ることができるでしょう。新しいデータを使ってクラスタの中心は変更されるでしょう!