特徴の抽出、変換および選択 - spark.ml

この章は以下のグループに大まかに分類される特徴のために動作するアルゴリズムを対象にします:

目次

特徴抽出器

TF-IDF (ハッシュTFとIDF)

Term Frequency-Inverse Document Frequency (TF-IDF) は一般的なテキストの前処理のステップです。Spark MLでは、TF-IDFは2つの部分に分割されます: TF (+hashing) と IDF。

TF: HashingTF は単語のセットを取り、それらのセットを固定長の特徴量ベクトルに設定する変換器です。In text processing, a “set of terms” might be a bag of words. アルゴリズムは次元削減のためにhashing trickを使って Term Frequency (TF) counts を組み合わせます。

IDF: IDF はデータセットに当てはまり IDFModelを生成するEstimatorです。IDFModel は特徴量ベクトル(一般的にHashingTFから生成されます)を取り、各カラムをスケールします。直感的にそれはコープス内で頻繁に現れるカラムの重み付けを減らします。

Term Frequency と Inverse Document Frequency についての詳細はTF-IDFのMLlib ユーザガイド を参照してください。

以下のコードの断片で、文章のセットから始めます。各文章を Tokenizerを使って単語に分割します。各文章 (bag of words)について、文章を特徴量ベクトルにハッシュするためにHashingTF を使います。特徴量ベクトルをリスケールするために IDF を使います; これは一般的にテキストを特徴として使用する場合にパフォーマンスを改善します。それから特徴量ベクトルは学習アルゴリズムに渡されることができます。

APIについての詳細はHashingTF Scala ドキュメント およびIDF Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = sqlContext.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").take(3).foreach(println)
例の完全なコードはSparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala" で見つかります。

APIについての詳細はHashingTF Java ドキュメント およびIDF Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(0, "I wish Java could use case classes"),
  RowFactory.create(1, "Logistic regression models are neat")
));
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
DataFrame sentenceData = sqlContext.createDataFrame(jrdd, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
DataFrame wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);
DataFrame featurizedData = hashingTF.transform(wordsData);
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
DataFrame rescaledData = idfModel.transform(featurizedData);
for (Row r : rescaledData.select("features", "label").take(3)) {
  Vector features = r.getAs(0);
  Double label = r.getDouble(1);
  System.out.println(features);
  System.out.println(label);
}
例の完全なコードはSparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java" で見つかります。

APIについての詳細はHashingTF Python ドキュメント およびIDF Python ドキュメントを参照してください。

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
    print(features_label)
例の完全なコードはSparkリポジトリの "examples/src/main/python/ml/tf_idf_example.py" で見つかります。

Word2Vec

Word2Vec は ドキュメントを表す単語の系列を取り、Word2VecModelを訓練する Estimatorです。モデルは各単語をユニークな固定長のベクトルにマップします。Word2VecModelは各ドキュメントをドキュメント内の全ての単語の平均を使ってベクトルに変換します; このベクトルは予想、ドキュメントの類似性の計算などのために特徴として使うことができます。詳細は Word2VecのMLlibユーザガイドを参照してください。

以下のコードの断片の中で、ドキュメントのセットを使って始めます。それぞれは単語の系列を表します。各ドキュメントのために、それを特徴量ベクトルに変換します。そして、この特徴量ベクトルは学習アルゴリズムに渡されることができます。

APIの詳細はWord2Vec Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Word2Vec

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = sqlContext.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala" で見つかります。

APIの詳細はWord2Vec Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
));
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
DataFrame documentDF = sqlContext.createDataFrame(jrdd, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
DataFrame result = model.transform(documentDF);
for (Row r : result.select("result").take(3)) {
  System.out.println(r);
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java" で見つかります。

APIの詳細はWord2Vec Python ドキュメント を参照してください。

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = sqlContext.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for feature in result.select("result").take(3):
    print(feature)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/word2vec_example.py" で見つかります。

CountVectorizer

CountVectorizerCountVectorizerModel はテキスト文章のコレクションをトークンのカウントのベクトルに変換することを目的としています。推測的な辞書を利用できない場合、CountVectorizer は語彙を抽出するためにEstimatorとしてつかうことができ、CountVectorizerModelを生成します。モデルは語彙上のドキュメントのための希薄な表現を生成します。これはLDAのような他のアルゴリズムに渡すことができます。

合致プロセスの中で、CountVectorizer はコープスに渡って単語の頻度が高い順に並べられた単語の中で最も長いvocabSizeを選択するでしょう。An optional parameter “minDF” also affect the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary.

idtextsのカラムを持つ以下のデータフレームを仮定します:

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts中の各行はArray[String]タイプのドキュメントです。CountVectorizer の合致は語彙(a, b, c) を持つCountVectorizerModelを生成し、変換後の出力カラム"vector"は以下を含みます:

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

各ベクトルは語彙上のドキュメントのトークンの数を表します。

APIの詳細はCountVectorizer Scala ドキュメントCountVectorizerModel Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = sqlContext.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).select("features").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala" で見つかります。

APIの詳細はCountVectorizer Java ドキュメントCountVectorizerModel Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
));
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java" で見つかります。

特徴変換

Tokenizer

Tokenization は(文章のような)テキストを取るプロセスで、個々の(通常は単語の)用語に分解するプロセスです。単純なTokenizer クラスはこの機能を提供します。以下の例は文章を単語の列に分割する方法を説明します。

RegexTokenizer は正規表現(regex)一致に基づいた更に進んだtokenizationをすることができます。デフォルトでは、パラメータ"pattern"(regex, デフォルト: \s+)は入力テキストを分割するためのデリミタとして使われます。Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.

APIの詳細はTokenizer Scala ドキュメントRegexTokenizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

val sentenceDataFrame = sqlContext.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala"で見つかります。

APIについての詳細はTokenizer Java ドキュメントRegexTokenizer Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
));

StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

DataFrame sentenceDataFrame = sqlContext.createDataFrame(jrdd, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

DataFrame wordsDataFrame = tokenizer.transform(sentenceDataFrame);
for (Row r : wordsDataFrame.select("words", "label"). take(3)) {
  java.util.List<String> words = r.getList(0);
  for (String word : words) System.out.print(word + " ");
  System.out.println();
}

RegexTokenizer regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java" で見つかります。

APIについての詳細はTokenizer Python ドキュメントRegexTokenizer Python ドキュメントを参照してください。

from pyspark.ml.feature import Tokenizer, RegexTokenizer

sentenceDataFrame = sqlContext.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
for words_label in wordsDataFrame.select("words", "label").take(3):
    print(words_label)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/tokenizer_example.py" で見つかります。

StopWordsRemover

Stop wordsは入力から除外されなければならない単語で、一般的にはその単語は頻繁に現れ、意味を持たないからです。

StopWordsRemover は入力として文字列の系列(例えば、Tokenizerの出力)を取り、入力の系列から全てのstop wordを取りこぼします。stopwordのリストはstopWords パラメータによって指定されます。デフォルトでstop wordsのリスト を提供し、新しくインスタンス化されたStopWordsRemoverインスタンス上でgetStopWordsを呼び出すことでアクセス可能です。真偽値パラメータ caseSensitive は一致が大文字小文字を区別するかを示します(デフォルトはfalse)。

idrawのカラムを持つ以下のデータフレームを仮定します:

 id | raw
----|----------
 0  | [I, saw, the, red, baloon]
 1  | [Mary, had, a, little, lamb]

入力カラムとしてrawを持ち、出力カラムとしてfilteredを持つStopWordsRemoverを適用すると、以下を取得するでしょう:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, baloon]  |  [saw, red, baloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filteredでは、stop words “I”, “the”, “had” および “a” がフィルタされます。

APIの詳細はStopWordsRemover Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = sqlContext.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "baloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala" で見つかります。

APIの詳細はStopWordsRemover Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
));

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

DataFrame dataset = jsql.createDataFrame(rdd, schema);
remover.transform(dataset).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java" で見つかります。

APIの詳細はStopWordsRemover Python ドキュメント を参照してください。

from pyspark.ml.feature import StopWordsRemover

sentenceData = sqlContext.createDataFrame([
    (0, ["I", "saw", "the", "red", "baloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["label", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/stopwords_remover_example.py" で見つかります。

$n$-gram

n-gramはなんらかの整数$n$のための$n$トークン(一般的に単語)の系列です。NGramクラスは入力の特徴量を$n$-gramに変換するために使うことができます。

NGram は入力として文字列の系列(例えば、Tokenizerの出力)を取ります。パラメータn は各$n$-gramの単語の数を決定するために使われます。出力は、$n$個の連続した単語の空白で区切られた文字列によって表される各$n$-gramの系列からなるでしょう。もし入力系列がn より少ない文字列からなる場合、何も出力されません。

APIの詳細はNGram Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.NGram

val wordDataFrame = sqlContext.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("label", "words")

val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala" で見つかります。

APIの詳細はNGram Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
));

StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

DataFrame wordDataFrame = sqlContext.createDataFrame(jrdd, schema);

NGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams");

DataFrame ngramDataFrame = ngramTransformer.transform(wordDataFrame);

for (Row r : ngramDataFrame.select("ngrams", "label").take(3)) {
  java.util.List<String> ngrams = r.getList(0);
  for (String ngram : ngrams) System.out.print(ngram + " --- ");
  System.out.println();
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java" で見つかります。

APIの詳細はNGram Python ドキュメント を参照してください。

from pyspark.ml.feature import NGram

wordDataFrame = sqlContext.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["label", "words"])
ngram = NGram(inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):
    print(ngrams_label)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/n_gram_example.py" で見つかります。

Binarizer

二分化は数値的な特徴の閾値化を二値 (0/1) の特徴にする処理です。

Binarizer は、二分化のためのthresholdと同様に、共通のパラメータ inputColoutputColを取ります。閾値より大きな特徴値は1.0に2値化されます; 閾値より小さい値は0.0に2値化されます。

APIの詳細はBinarizer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame: DataFrame = sqlContext.createDataFrame(data).toDF("label", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeatures = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala" で見つかります。

APIの詳細はBinarizer Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
));
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame continuousDataFrame = jsql.createDataFrame(jrdd, schema);
Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);
DataFrame binarizedDataFrame = binarizer.transform(continuousDataFrame);
DataFrame binarizedFeatures = binarizedDataFrame.select("binarized_feature");
for (Row r : binarizedFeatures.collect()) {
  Double binarized_value = r.getDouble(0);
  System.out.println(binarized_value);
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java" で見つかります。

APIの詳細はBinarizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Binarizer

continuousDataFrame = sqlContext.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["label", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedFeatures = binarizedDataFrame.select("binarized_feature")
for binarized_feature, in binarizedFeatures.collect():
    print(binarized_feature)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/binarizer_example.py" で見つかります。

PCA

PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. PCA クラスはPCAを使ってベクトルを低次元に投影するためにモデルを訓練します。以下の例は5次元の特徴値ベクトルを3次元の主要な成分に投影する方法を示します。

APIの詳細はPCA Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.mllib.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala" で見つかります。

APIの詳細はPCA Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
));

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

DataFrame df = jsql.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

DataFrame result = pca.transform(df).select("pcaFeatures");
result.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java" で見つかります。

APIの詳細はPCA Python ドキュメント を参照してください。

from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
Find full example code at "examples/src/main/python/ml/pca_example.py" で見つかります。

PolynomialExpansion

Polynomial expansionは特徴値を多項式空間に拡張する処理です。これは元の次元のn-次の組み合わせによって定式化されます。PolynomialExpansion クラスはこの機能を提供します。以下の例は特徴値を3次元の多項式空間に拡張する方法を示します。

APIの詳細はPolynomialExpansion Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.mllib.linalg.Vectors

val data = Array(
  Vectors.dense(-2.0, 2.3),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(0.6, -1.1)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala" で見つかります。

APIの詳細はPylynomialExpansion Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
  RowFactory.create(Vectors.dense(-2.0, 2.3)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(0.6, -1.1))
));

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

DataFrame df = jsql.createDataFrame(data, schema);
DataFrame polyDF = polyExpansion.transform(df);

Row[] row = polyDF.select("polyFeatures").take(3);
for (Row r : row) {
  System.out.println(r.get(0));
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java" で見つかります。

APIの詳細はPolynomialExpansion Python ドキュメント を参照してください。

from pyspark.ml.feature import PolynomialExpansion
from pyspark.mllib.linalg import Vectors

df = sqlContext\
    .createDataFrame([(Vectors.dense([-2.0, 2.3]),),
                      (Vectors.dense([0.0, 0.0]),),
                      (Vectors.dense([0.6, -1.1]),)],
                     ["features"])
px = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
    print(expanded)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/polynomial_expansion_example.py" で見つかります。

離散コサイン変換 (DCT)

離散コサイン変換 は時間領域の長さ$N$ リアル値の系列を他の周波数領域の長さ$N$ リアル値の系列に変換します。A DCT class provides this functionality, implementing the DCT-II and scaling the result by $1/\sqrt{2}$ such that the representing matrix for the transform is unitary. No shift is applied to the transformed sequence (e.g. the $0$th element of the transformed sequence is the $0$th DCT coefficient and not the $N/2$th).

APIの詳細はDCT Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.DCT
import org.apache.spark.mllib.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(3)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala" で見つかります。

APIの詳細はDCT Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
));
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);
DataFrame dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(3);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java" で見つかります。

StringIndexer

StringIndexer はラベルの文字列カラムをラベルのインデックスのカラムにエンコードします。このインデックスは[0, numLabels)の中にあり、ラベルの頻度によって並べられます。つまり、最も頻度の高いラベルはインデックス 0です。入力カラムが数値の場合、それを文字列にキャストし、文字列値をインデックスします。Estimator または Transformer のようなダウンストリームパイプライン コンポーネントがこの文字列のインデックスラベルを利用する場合、コンポーネントの入力カラムをこの文字列のインデックスのカラム名に設定する必要があります。多くの場合において、setInputColを使って入力カラムを設定することができます。

idcategoryのカラムを持つ以下のデータフレームを仮定します:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category は3つのラベルを持つ文字列のカラムです: “a”, “b”, and “c”. 入力カラムとしてcategoryを持ち、出力カラムとしてcategoryIndexを持つ StringIndexer を適用すると、以下を取得するでしょう:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

"a" は最も頻度が高いためインデックス0 を取得し、インデックス1を持つ"c"とインデックス2を持つ"b"が続きます。

Additionaly, there are two strategies regarding how StringIndexer will handle unseen labels when you have fit a StringIndexer on one dataset and then use it to transform another:

前回の例に戻りますが、今回は以前定義したStringIndexer を以下のデータセットに再利用します:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d

If you’ve not set how StringIndexer handles unseen labels or set it to “error”, an exception will be thrown. しかし、setHandleInvalid("skip")を呼んだ場合は、以下のデータセットが生成されるでしょう:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

"d"を含む行が現れないことに注意してください。

APIの詳細はStringIndexer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StringIndexer

val df = sqlContext.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala" で見つかります。

APIの詳細はStringIndexer Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");
DataFrame indexed = indexer.fit(df).transform(df);
indexed.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java" iで見つかります。

APIの詳細はStringIndexer Python ドキュメント を参照してください。

from pyspark.ml.feature import StringIndexer

df = sqlContext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/string_indexer_example.py" で見つかります。

IndexToString

Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. The common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString. しかし、独自のラベルを提供しても良いです。

StringIndexer の例を基礎にして、カラム idcategoryIndexを持つ以下のデータフレームを持つと仮定しましょう:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

Applying IndexToString with categoryIndex as the input column, originalCategory as the output column, we are able to retrieve our original labels (they will be inferred from the columns’ metadata):

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

APIの詳細はIndexToString Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{StringIndexer, IndexToString}

val df = sqlContext.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala" で見つかります。

APIの詳細はIndexToString Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
DataFrame indexed = indexer.transform(df);

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
DataFrame converted = converter.transform(indexed);
converted.select("id", "originalCategory").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java" で見つかります。

APIの詳細はIndexToString Python ドキュメント を参照してください。

from pyspark.ml.feature import IndexToString, StringIndexer

df = sqlContext.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

converted.select("id", "originalCategory").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/index_to_string_example.py" で見つかります。

OneHotEncoder

One-hot encoding はラベルのインデックスのカラムを、最大1の値を持つ2値のベクトルにマップします。この変換により、ロジスティック回帰のような連続する特徴値を期待するアルゴリズムが categorical featuresを使うことができます。

APIの詳細はOneHotEncoder Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = sqlContext.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala" で見つかります。

APIの詳細はOneHotEncoder Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
));

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});

DataFrame df = sqlContext.createDataFrame(jrdd, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
DataFrame indexed = indexer.transform(df);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec");
DataFrame encoded = encoder.transform(indexed);
encoded.select("id", "categoryVec").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java" で見つかります。

APIの詳細はOneHotEncoder Python ドキュメント を参照してください。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/onehot_encoder_example.py" で見つかります。

VectorIndexer

VectorIndexerVectorのデータセット内のカテゴリ特徴量のインデックスを手助けします。It can both automatically decide which features are categorical and convert original values to category indices. 厳密に言うと、以下のことを行います:

  1. タイプVectorとパラメータ maxCategoriesを取ります。
  2. Decide which features should be categorical based on the number of distinct values, where features with at most maxCategories are declared categorical.
  3. 各分類特徴について0から始まる分類インデックスを計算します。
  4. 分類的な特徴をインデックスし、元の特徴値をインデックスに変換します。

分類的な特徴をインデックスすることにより、決定木およびツリーアンサンブルのようなアルゴリズムが分類的な特徴を適切に扱うことができ、パフォーマンスを改善します。

以下の例では、ラベル付けされた点のデータセットを読み、どの毒長が分類的であるかを決めるために VectorIndexer を使います。分類的な特徴値をそれらのインデックスに変換します。この変換されたデータはカテゴリ特徴量を扱うDecisionTreeRegressorのようなアルゴリズムに渡すことができます。

APIの詳細はVectorIndexer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.VectorIndexer

val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala" で見つかります。

APIの詳細はVectorIndexer Java ドキュメント を参照してください。

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.DataFrame;

DataFrame data = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
DataFrame indexedData = indexerModel.transform(data);
indexedData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java" で見つかります。

APIの詳細はVectorIndexer Python ドキュメント を参照してください。

from pyspark.ml.feature import VectorIndexer

data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/vector_indexer_example.py" で見つかります。

平均器

Normalizer は、各Vectorが単位ノルムを持つように正規化し、Vector行のデータセットを変換するTransformerです。パラメータ pを取り、それは正規化に使われる p-norm を指定します。(デフォルトは $p = 2$ ) この正規化は入力データの標準化の役に立ち、学習アルゴリズムの挙動を改善します。

以下の例はlibsvm形式のデータセットをロードし、各行がunit $L^2$ ノルムおよび unit $L^\infty$ ノルムを持つように正規化する方法を実演します。

APIの詳細はNormalizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Normalizer

val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala" で見つかります。

APIの詳細はNormalizer Java ドキュメント を参照してください。

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.DataFrame;

DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

DataFrame l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
DataFrame lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java" で見つかります。

APIの詳細はNormalizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Normalizer

dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/normalizer_example.py" で見つかります。

StandardScaler

StandardScalerVector 行のデータセットを変換し、各特長値が標準偏差1 および/あるいは 平均0を持つように正規化します。以下のパラメータを取ります:

StandardScalerStandardScalerModelを生成するためにデータセットにfitするEstimator です; これは概要の統計を計算することに該当します。モデルはデータセット中のVector が単位標準偏差 および/あるいは ゼロ平均特徴 を持つように変換することができます。

特徴の標準偏差がゼロの場合、その特徴のためのVectorの中でデフォルトの0.0 値を返すだろうことに注意してください。

以下の例はlibsvm形式のデータセットをロードし、各特長値がunit 標準偏差を持つように正規化する方法を実演します。

APIの詳細はStandardScaler Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala" で見つかります。

APIの詳細はStandardScaler Java ドキュメント を参照してください。

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.DataFrame;

DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
DataFrame scaledData = scalerModel.transform(dataFrame);
scaledData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java" で見つかります。

APIの詳細はStandardScaler Python ドキュメント を参照してください。

from pyspark.ml.feature import StandardScaler

dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
例の完全なコードは Spark のリポジトリの"examples/src/main/python/ml/standard_scaler_example.py" で見つかります。

MinMaxScaler

MinMaxScalerVector 行のデータセットを変換し、各特長値を特定の範囲(よくあるのは[0, 1])に再スケールします。以下のパラメータを取ります:

MinMaxScaler はデータセットの総統計を計算し、MinMaxScalerModelを生成します。モデルはそれぞれの特徴値を指定された領域に入るように変換することができます。

特徴Eの再スケールされた値は \begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} のように計算されます。E_{max} == E_{min}の場合は Rescaled(e_i) = 0.5 * (max + min)

ゼロの値はおそらく非ゼロ値に変換されるため、変換器の出力は薄い入力に対してもDenseVectorになるでしょう。

以下の例はlibsvm形式のデータセットをロードし、各特長値を [0, 1] に再スケールします。

APIの詳細はMinMaxScaler Scala ドキュメントMinMaxScalerModel Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.MinMaxScaler

val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala" で見つかります。

APIについての詳細はMinMaxScaler Java ドキュメントMinMaxScalerModel Java ドキュメントを参照してください。

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.DataFrame;

DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
DataFrame scaledData = scalerModel.transform(dataFrame);
scaledData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java" で見つかります。

Bucketizer

Bucketizerは連続する特徴値のカラムをユーザによって指定される特徴値のバケットに変換します。以下のパラメータを取ります:

目的とするカラムの上限と下限に何も意図が無い場合は、splitのBucketizer境界例外の可能性を避けるために境界としてDouble.NegativeInfinityDouble.PositiveInfinity を追加したほうが良いでしょう。

指定するsplitは厳密に増加する順番でなければならないことにも注意してください。つまりs0 < s1 < s2 < ... < sn

Bucketizerについての詳細はAPIドキュメントの中で見つかります。

以下の例はDoubleのカラムを他のインデックス化されたカラムにバケット化する方法を実演します。

APIの詳細はBucketizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala" で見つかります。

APIの詳細はBucketizer Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2)
));
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame dataFrame = jsql.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
DataFrame bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java" で見つかります。

APIの詳細はBucketizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = sqlContext.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/bucketizer_example.py" で見つかります。

ElementwiseProduct

ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。This represents the Hadamard product between the input vector, v and transforming vector, w, to yield a result vector.

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。

APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = sqlContext.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala" で見つかります。

APIの詳細はElementwiseProduct Java ドキュメント を参照してください。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
));

List<StructField> fields = new ArrayList<StructField>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

DataFrame dataFrame = sqlContext.createDataFrame(jrdd, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java" で見つかります。

APIの詳細はElementwiseProduct Python ドキュメント を参照してください。

from pyspark.ml.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors

data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = sqlContext.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
transformer.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/elementwise_product_example.py" で見つかります。

SQLTransformer

SQLTransformer はSQL文によって定義される変換を実装します。Currently we only support SQL syntax like "SELECT ... FROM __THIS__ ..." where "__THIS__" represents the underlying table of the input dataset. select句は出力で表示するフィールド、定数および表現を指定します。Spark SQLがサポートするどのようなselect句もありえます。ユーザは選択されるカラム上で操作するSpark SQLのビルトイン関数とUDFも使うことができます。例えば、SQLTransformer は以下のような文をサポートします:

idv1 および categoryのカラムを持つ以下のデータフレームを仮定します:

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

これは"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"SQLTransformerの出力です :

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

APIの詳細はSQLTransformer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.SQLTransformer

val df = sqlContext.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala" で見つかります。

APIの詳細はSQLTransformer Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
));
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java" で見つかります。

APIの詳細はSQLTransformer Python ドキュメント を参照してください。

from pyspark.ml.feature import SQLTransformer

df = sqlContext.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/sql_transformer.py" で見つかります。

VectorAssembler

VectorAssembler は指定されたカラムのリストを一つのベクトルのカラムに合成する変換器です。ロジスティック回帰や決定木のようなMLモデルを訓練するためには、生の特徴と、異なる特徴変換器によって生成された特徴を、1つの特徴ベクトルに結合すると便利です。VectorAssembler は以下の入力カラムタイプを受け入れます: 全ての数値型、真偽値、およびベクトル型。各行内で入力カラムの値は指定された順でベクトルに結合されます。

カラム id, hour, mobile, userFeatures および clicked を持つデータフレームがあると仮定します:

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures は3つのユーザ特徴値を持つベクトルカラムです。hour, mobile および userFeaturesfeaturesと呼ばれる一つの特徴ベクトルに結合し、それを使ってclicked かそうでないかを予想するために使いたいとします。VectorAssemblerの入力カラムを hour, mobile および userFeatures に設定し、出力カラムをfeaturesに設定した場合、変換後に以下のデータフレームを取得するはずです:

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

APIの詳細はVectorAssembler Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors

val dataset = sqlContext.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala" で見つかります。

APIの詳細はVectorAssembler Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(row));
DataFrame dataset = sqlContext.createDataFrame(rdd, schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

DataFrame output = assembler.transform(dataset);
System.out.println(output.select("features", "clicked").first());
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java" で見つかります。

APIの詳細はVectorAssembler Python ドキュメント を参照してください。

from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/vector_assembler_example.py"で見つかります。

QuantileDiscretizer

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The bin ranges are chosen by taking a sample of the data and dividing it into roughly equal parts. The lower and upper bin bounds will be -Infinity and +Infinity, covering all real values. This attempts to find numBuckets partitions based on a sample of the given input data, but it may find fewer depending on the data sample values.

背後の標本ストラテジは決定的ではないため、実行するたびに異なる値を得ることに注意してください。

id, hourのカラムを持つデータフレームがあるとします:

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hourDouble型の連続する特徴です。連続する特徴を明確な1つに変化したいとします。numBuckets = 3の場合、以下のデータフレームを取得するはずです:

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

APIの詳細はQuantileDiscretizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = sc.parallelize(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala" で見つかります。

APIの詳細はQuantileDiscretizer Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(
  Arrays.asList(
    RowFactory.create(0, 18.0),
    RowFactory.create(1, 19.0),
    RowFactory.create(2, 8.0),
    RowFactory.create(3, 5.0),
    RowFactory.create(4, 2.2)
  )
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

DataFrame df = sqlContext.createDataFrame(jrdd, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

DataFrame result = discretizer.fit(df).transform(df);
result.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java" で見つかります。

Feature Selectors

VectorSlicer

VectorSlicer は特徴値ベクトルを取る変換器で、元の特徴値のsub配列を持つ新しい特徴ベクトルを出力します。ベクトルのカラムから特徴値を抽出するのに役立ちます。

VectorSlicer は指定されたインデックスを持つベクトルカラムを受け付け、それらのインデックスを使って選択された値を持つ新しいベクトルカラムを出力します。インデックスには2つの種類があります

  1. インデックスがベクトルを表す数値インデックス setIndices();

  2. 特徴値の名前がベクトルを表す文字列インデックス setNames()これは実装がAttributeの名前部分に一致するため、AttributeGroupを持つためにベクトルカラムを必要とします。

数値および文字列の両方による指定が受け付けられます。更に、同時に数値と文字列による名前を使用することができます。少なくとも1つの特徴量が選択されなければなりません。特徴量の複製は許可されません。つまり選択されたインデックスと名前の間には重複は無いでしょう。特徴量の名前が選択された場合、空の入力属性に遭遇した場合に例外が投げられるだろうことに注意してください。

出力ベクトルは選択されたインデックスを最初(指定された順番)に、続いて選択された名前を次(指定された順番)に、整列するでしょう。

userFeaturesのカラムを持つデータフレームを持つと仮定します:

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures は3つのユーザ特徴値を持つベクトルカラムです。userFeaturesの最初のカラムはゼロでそれを削除し、最後の2つのカラムだけが選択されるようにしたいとします。VectorSlicersetIndices(1, 2) を持つ最後の2つの要素を選択し、featuresという名前の新しいベクトルを生成します:

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

Suppose also that we have a potential input attributes for the userFeatures, i.e. ["f1", "f2", "f3"], then we can use setNames("f2", "f3") to select them.

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

APIの詳細はVectorSlicer Scala ドキュメント を参照してください。

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

val data = Array(Row(Vectors.dense(-2.0, 2.3, 0.0)))

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataRDD = sc.parallelize(data)
val dataset = sqlContext.createDataFrame(dataRDD, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
println(output.select("userFeatures", "features").first())
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala" で見つかります。

APIの詳細はVectorSlicer Java ドキュメント を参照してください。

import com.google.common.collect.Lists;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = new Attribute[]{
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
));

DataFrame dataset = jsql.createDataFrame(jrdd, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

DataFrame output = vectorSlicer.transform(dataset);

System.out.println(output.select("userFeatures", "features").first());
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java" で見つかります。

RFormula

RFormulaR model formulaで指定されたカラムを選択します。それは特徴量のベクトルカラムと2つのラベルのカラムを生成します。Rで線形回帰のために公式が使われるように、文字列入力カラムはone-hotエンコードされ、数値カラムはdoubleにキャストされるでしょう。DataFrameに既に存在しない場合は、出力ラベルカラムは公式内の指定された応答変数から生成されるでしょう。

id, country, hour および clickedのカラムを持つデータフレームがあるとします:

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

clicked ~ country + hourの公式文字列を使ってRFormulaを使う場合は、これはcountry および hourに基づいて clickedを予想したいことを意味し、変換後に以下のデータフレームを取得するでしょう:

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

APIの詳細はRFormula Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.RFormula

val dataset = sqlContext.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala" で見つかります。

APIの詳細はRFormula Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
));

DataFrame dataset = sqlContext.createDataFrame(rdd, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
DataFrame output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java" で見つかります。

APIの詳細はRFormula Python ドキュメント を参照してください。

from pyspark.ml.feature import RFormula

dataset = sqlContext.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])
formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/rformula_example.py" で見つかります。

ChiSqSelector

ChiSqSelector はカイ二乗特徴抽出を意味します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelectorはクラスから独立したカイ二乗検定に基づいて特徴を整列し、クラスのラベルが最も依存する一番上の特徴をフィルター(選択)します。これは最も予知力が高い特徴に明け渡すことに似ています。

id, features および clickedのカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

If we use ChiSqSelector with a numTopFeatures = 1, then according to our label clicked the last column in our features chosen as the most useful feature:

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

APIの詳細はChiSqSelector Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = sc.parallelize(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)
result.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala" で見つかります。

APIの詳細はChiSqSelector Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
));
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

DataFrame df = sqlContext.createDataFrame(jrdd, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

DataFrame result = selector.fit(df).transform(df);
result.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java" で見つかります。
TOP
inserted by FC2 system