クラスタリング

このページではMLlibでのクラスタリング アルゴリズムを説明します。RDDベースのAPIでのクラスタリングのためのガイド もこれらのアルゴリズムに関連する情報があります。

目次

K-平均法

k-平均法 は最も一般的に使われる、事前に定義したクラスタ数までデータを群にする、クラスタリング アルゴリズムです。MLlibの実装はkmeans||と呼ばれるk-means++メソッドの parallelized variant を含んでいます。

KMeansEstimatorとして実装され、基本モデルとしてKMeansModelを生成します。

入力のカラム

パラメータ名 タイプ デフォルト 説明
featuresCol Vector "features" 特徴ベクトル

出力のカラム

パラメータ名 タイプ デフォルト 説明
predictionCol Int "prediction" 予測されたクラスタの中心

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/kmeans_example.py" で見つかります。

詳細は R API ドキュメント を参照してください。

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/kmeans.R" で見つかります。

潜在的ディレクレ配分法 (LDA)

LDAEMLDAOptimizerOnlineLDAOptimizerの両方をサポートする予測器として実装され、基本モデルとしてLDAModel を生成します。熟練のユーザは必要であればEMLDAOptimizerによって生成されたLDAModelDistributedLDAModel にキャストするかも知れません。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/lda_example.py" で見つかります。

詳細は R API ドキュメント を参照してください。

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/lda.R" で見つかります。

二値k平均法

二値k平均法はdivisive(あるいは"トップダウン")のやり方を使った階層的なクラスタリングの一種です: 各観測は一つのクラスタの中で開始し、階層を下る1つとして分離が実行されます。

二値k平均法はしばしば通常のK平均法よりかなり速くなりますが、一般的に異なるクラスタリングを生成します。

BisectingKMeansEstimatorとして実装され、基本モデルとしてBisectingKMeansModelを生成します。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/bisecting_k_means_example.py" で見つかります。

詳細は R API ドキュメント を参照してください。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/bisectingKmeans.R" で見つかります。

ガウス混合モデル (GMM)

混合ガウスモデルはそれぞれが独自の確率を持つ kガウシアン副分布の一つから引き出される合成分布を表します。spark.mlの実装は、指定された標本のセットの最尤推定モデルを引き起こすために期待値最大化法アルゴリズムを使用します。

GaussianMixtureEstimatorとして実装され、基本モデルとしてGaussianMixtureModelを生成します。

入力のカラム

パラメータ名 タイプ デフォルト 説明
featuresCol Vector "features" 特徴ベクトル

出力のカラム

パラメータ名 タイプ デフォルト 説明
predictionCol Int "prediction" 予測されたクラスタの中心
probabilityCol Vector "probability" 各クラスタの確率

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/gaussian_mixture_example.py" で見つかります。

詳細は R API ドキュメント を参照してください。

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/gaussianMixture.R" で見つかります。

べき乗クラスタリング (PIC)

Power Iteration Clustering (PIC) is a scalable graph clustering algorithm developed by Lin and Cohen. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.

spark.mlの PowerIterationClustering 実装は以下の(ハイパー)パラメータを取ります:

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" で見つかります。

詳細は Python API ドキュメントを参照してください。

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/power_iteration_clustering_example.py" で見つかります。

詳細は R API ドキュメント を参照してください。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/powerIterationClustering.R" で見つかります。
TOP
inserted by FC2 system