Clustering - spark.ml

この章では、mllibでのクラスタリングのためのパイプラインAPIを紹介します。

目次

K-平均法

k-平均法 は最も一般的に使われる、事前に定義したクラスタ数までデータを群にする、クラスタリング アルゴリズムです。MLlibの実装はkmeans||と呼ばれるk-means++メソッドの parallelized variant を含んでいます。

KMeansEstimatorとして実装され、基本モデルとしてKMeansModelを生成します。

入力のカラム

パラメータ名 タイプ デフォルト 解説
featuresCol Vector "features" 特徴ベクトル

出力のカラム

パラメータ名 タイプ デフォルト 解説
predictionCol Int "prediction" 予測されたクラスタの中心

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

// Crates a DataFrame
val dataset: DataFrame = sqlContext.createDataFrame(Seq(
  (1, Vectors.dense(0.0, 0.0, 0.0)),
  (2, Vectors.dense(0.1, 0.1, 0.1)),
  (3, Vectors.dense(0.2, 0.2, 0.2)),
  (4, Vectors.dense(9.0, 9.0, 9.0)),
  (5, Vectors.dense(9.1, 9.1, 9.1)),
  (6, Vectors.dense(9.2, 9.2, 9.2))
)).toDF("id", "features")

// Trains a k-means model
val kmeans = new KMeans()
  .setK(2)
  .setFeaturesCol("features")
  .setPredictionCol("prediction")
val model = kmeans.fit(dataset)

// Shows the result
println("Final Centers: ")
model.clusterCenters.foreach(println)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Loads data
JavaRDD<Row> points = jsc.textFile(inputFile).map(new ParsePoint());
StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())};
StructType schema = new StructType(fields);
DataFrame dataset = sqlContext.createDataFrame(points, schema);

// Trains a k-means model
KMeans kmeans = new KMeans()
  .setK(k);
KMeansModel model = kmeans.fit(dataset);

// Shows the result
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" で見つかります。

潜在的ディレクレ配分法(LDA)

LDA is implemented as an Estimator that supports both EMLDAOptimizer and OnlineLDAOptimizer, and generates a LDAModel as the base models. 熟練のユーザは必要であればEMLDAOptimizerによって生成されたLDAModelDistributedLDAModel にキャストするかも知れません。

詳細は Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.LDA
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}

// Loads data
val rowRDD = sc.textFile(input).filter(_.nonEmpty)
  .map(_.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_))
val schema = StructType(Array(StructField(FEATURES_COL, new VectorUDT, false)))
val dataset = sqlContext.createDataFrame(rowRDD, schema)

// Trains a LDA model
val lda = new LDA()
  .setK(10)
  .setMaxIter(10)
  .setFeaturesCol(FEATURES_COL)
val model = lda.fit(dataset)
val transformed = model.transform(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)

// describeTopics
val topics = model.describeTopics(3)

// Shows the result
topics.show(false)
transformed.show(false)
例の完全なコードはSparkのリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" で見つかります。

詳細は Java API ドキュメントを参照してください。

import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

private static class ParseVector implements Function<String, Row> {
  private static final Pattern separator = Pattern.compile(" ");

  @Override
  public Row call(String line) {
    String[] tok = separator.split(line);
    double[] point = new double[tok.length];
    for (int i = 0; i < tok.length; ++i) {
      point[i] = Double.parseDouble(tok[i]);
    }
    Vector[] points = {Vectors.dense(point)};
    return new GenericRow(points);
  }
}

public static void main(String[] args) {

  String inputFile = "data/mllib/sample_lda_data.txt";

  // Parses the arguments
  SparkConf conf = new SparkConf().setAppName("JavaLDAExample");
  JavaSparkContext jsc = new JavaSparkContext(conf);
  SQLContext sqlContext = new SQLContext(jsc);

  // Loads data
  JavaRDD<Row> points = jsc.textFile(inputFile).map(new ParseVector());
  StructField[] fields = {new StructField("features", new VectorUDT(), false, Metadata.empty())};
  StructType schema = new StructType(fields);
  DataFrame dataset = sqlContext.createDataFrame(points, schema);

  // Trains a LDA model
  LDA lda = new LDA()
    .setK(10)
    .setMaxIter(10);
  LDAModel model = lda.fit(dataset);

  System.out.println(model.logLikelihood(dataset));
  System.out.println(model.logPerplexity(dataset));

  // Shows the result
  DataFrame topics = model.describeTopics(3);
  topics.show(false);
  model.transform(dataset).show(false);

  jsc.stop();
}
例の完全なコードはSparkのリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" で見つかります。
TOP
inserted by FC2 system