特徴の抽出、変換および選択

この章は以下のグループに大まかに分類される特徴のために動作するアルゴリズムを対象にします:

目次

特徴抽出器

TF-IDF

索引語頻度の逆出現頻度 (TF-IDF) はテキストマイニングで単語の重要度をコープス内のドキュメントに反映するために広く使われている特徴ベクトル化です。$t$によって単語を、$d$によってドキュメントを、$D$でコープスを示します。索引語頻度 $TF(t, d)$ は単語 $t$ がドキュメント $d$に現れる数で、ドキュメントの文章頻度 $DF(t, D)$は単語 $t$を含むドキュメントの数です。重要度を測定するために索引語頻度だけを使う場合、頻繁に現れる単語を過度に強調しすぎますが、ドキュメントについてほとんど情報を運びません。例えば "a"、"the" および "of"。コープスを横断してしばしば単語が現れる場合、それは特定のドキュメントに関する特別な情報を運ばないことを意味します。逆出現頻度はどれだけの情報を単語が提供するかの数値的な指標です: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] ここで $|D|$ はコープス内のドキュメントの総数です。対数が使われるため、全てのドキュメントで単語が現れる場合、IDFは0になります。補正単語はコープス外の単語のためにゼロで割られることを避けるために適用されることに注意してください。TF-IDF 指標は単純にTF と IDFの産物です: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 索引語頻度と文章頻度の定義上の幾つかの変数があります。MLlibではそれらを柔軟にするためにTFとIDFを分割します。

TF: HashingTFCountVectorizer の両方は、単語の頻度ベクトルを生成するために使うことができます。

HashingTF は単語のセットを取り、それらのセットを固定長の特徴量ベクトルに設定する変換器です。テキスト処理において、"単語のセット"は bag of words です。HashingTFハッシュのトリックを使います。生の特徴はハッシュ関数を適用することでインデックス(単語)にマップされます。ここで使われるハッシュ関数はMurmurHash 3です。そして、索引語頻度はマップされたインデックスに基づいて計算されます。このやり方はグローバルな単語からインデックスへのマップの計算の必要性を避けます。これは大きなコープスには高くつくでしょうが、潜在的なハッシュの衝突の影響を受けます。生の特徴の違いはハッシュの後で同じ単語になるかも知れません。衝突の可能性を減らすために、目標の特徴次元を増やすことができます。つまり、ハッシュテーブルのバケットの数です。単純なmoduloはハッシュ関数をカラムインデックスに変換するために使われるため、特徴次元として二乗を使うことは当を得ています。そうでなければ、機能はカラムに等しくマップされるでしょう。デフォルトの特徴次元は $2^{18} = 262,144$です。任意の二値トグルパラメータは単語の頻度のカウントを制御します。trueに設定された場合は、全ての非ゼロの頻度のカウントは1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。

CountVectorizer はテキストドキュメントを単語のカウントのベクトルに変換します。詳細はCountVectorizer を参照してください。

IDF: IDF はデータセットに当てはまり IDFModelを生成するEstimatorです。IDFModel は特徴量ベクトル(一般的にHashingTFあるいはCountVectorizerから生成されます)を取り、各カラムをスケールします。直感的にそれはコープス内で頻繁に現れるカラムの重み付けを減らします。

注意: spark.ml はテキストの文節化のためのツールを提供しません。ユーザはStanford NLP Group および scalanlp/chalk を参照してください。

以下のコードの断片で、文章のセットから始めます。各文章を Tokenizerを使って単語に分割します。各文章 (bag of words)について、文章を特徴量ベクトルにハッシュするためにHashingTF を使います。特徴量ベクトルをリスケールするために IDF を使います; これは一般的にテキストを特徴として使用する場合にパフォーマンスを改善します。それから特徴量ベクトルは学習アルゴリズムに渡されることができます。

APIについての詳細はHashingTF Scala ドキュメント およびIDF Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
例の完全なコードはSparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala" で見つかります。

APIについての詳細はHashingTF Java ドキュメント およびIDF Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
例の完全なコードはSparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java" で見つかります。

APIについての詳細はHashingTF Python ドキュメント およびIDF Python ドキュメントを参照してください。

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
例の完全なコードはSparkリポジトリの "examples/src/main/python/ml/tf_idf_example.py" で見つかります。

Word2Vec

Word2Vec は ドキュメントを表す単語の系列を取り、Word2VecModelを訓練する Estimatorです。モデルは各単語をユニークな固定長のベクトルにマップします。Word2VecModelは各ドキュメントをドキュメント内の全ての単語の平均を使ってベクトルに変換します; このベクトルは予想、ドキュメントの類似性の計算などのために特徴として使うことができます。詳細は Word2VecのMLlibユーザガイドを参照してください。

以下のコードの断片の中で、ドキュメントのセットを使って始めます。それぞれは単語の系列を表します。各ドキュメントのために、それを特徴量ベクトルに変換します。そして、この特徴量ベクトルは学習アルゴリズムに渡されることができます。

APIの詳細はWord2Vec Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala" で見つかります。

APIの詳細はWord2Vec Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java" で見つかります。

APIの詳細はWord2Vec Python ドキュメント を参照してください。

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/word2vec_example.py" で見つかります。

CountVectorizer

CountVectorizerCountVectorizerModel はテキスト文章のコレクションをトークンのカウントのベクトルに変換することを目的としています。推測的な辞書を利用できない場合、CountVectorizer は語彙を抽出するためにEstimatorとしてつかうことができ、CountVectorizerModelを生成します。モデルは語彙上のドキュメントのための希薄な表現を生成します。これはLDAのような他のアルゴリズムに渡すことができます。

合致プロセスの中で、CountVectorizer はコープスに渡って単語の頻度が高い順に並べられた単語の中で最も長いvocabSizeを選択するでしょう。語彙に含まれるべき単語がドキュメントに現れる最小の数(断片が<1.0)を指定することで、任意のパラメータ minDF も合致プロセスに影響します。他の任意の二値トグルのパラメータが出力ベクトルを制御します。trueに設定されると全ての非ゼロのカウントが1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。

idtextsのカラムを持つ以下のデータフレームを仮定します:

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts の中の各行はArray[String] 型のドキュメントです。CountVectorizer の適合の開始は語彙(a, b, c)を持つ CountVectorizerModel を生成します。そして、変換後のカラム“vector” は以下を含みます:

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

各ベクトルは語彙上のドキュメントのトークンの数を表します。

APIの詳細はCountVectorizer Scala ドキュメントCountVectorizerModel Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala" で見つかります。

APIの詳細はCountVectorizer Java ドキュメントCountVectorizerModel Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java" で見つかります。

APIの詳細はCountVectorizer Python ドキュメントCountVectorizerModel Python ドキュメント を参照してください。

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/count_vectorizer_example.py" で見つかります。

特徴変換

Tokenizer

Tokenization は(文章のような)テキストを取るプロセスで、個々の(通常は単語の)用語に分解するプロセスです。単純なTokenizer クラスはこの機能を提供します。以下の例は文章を単語の列に分割する方法を説明します。

RegexTokenizer は正規表現(regex)一致に基づいた更に進んだtokenizationをすることができます。デフォルトでは、パラメータ"pattern"(regex, デフォルト: "\s+")は入力テキストを分割するためのデリミタとして使われます。もう一つの方法として、ユーザはパラメータ "gaps"をfalseに設定して、正規表現"pattern"がgapを分割するのではなく"tokens"を意味するようにして、全ての適合の存在をトークン化の結果として見つけることができます。

APIの詳細はTokenizer Scala ドキュメントRegexTokenizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala"で見つかります。

APIについての詳細はTokenizer Java ドキュメントRegexTokenizer Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.WrappedArray;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (WrappedArray words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", callUDF("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", callUDF("countTokens", col("words")))
    .show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java" で見つかります。

APIについての詳細はTokenizer Python ドキュメントRegexTokenizer Python ドキュメントを参照してください。

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/tokenizer_example.py" で見つかります。

StopWordsRemover

Stop wordsは入力から除外されなければならない単語で、一般的にはその単語は頻繁に現れ、意味を持たないからです。

StopWordsRemover は入力として文字列の系列(例えば、Tokenizerの出力)を取り、入力の系列から全てのstop wordを取りこぼします。stopwordのリストはstopWords パラメータによって指定されます。StopWordsRemover.loadDefaultStopWords(language)を呼び出すことで幾つかの言語のデフォルトの停止単語が利用可能です。利用可能なオプションは "danish", "dutch", "english", "finnish", "french", "german", "hungarian", "italian", "norwegian", "portuguese", "russian", "spanish", "swedish" および "turkish" です。真偽値パラメータ caseSensitive は一致が大文字小文字を区別するかを示します(デフォルトはfalse)。

idrawのカラムを持つ以下のデータフレームを仮定します:

 id | raw
----|----------
 0  | [I, saw, the, red, baloon]
 1  | [Mary, had, a, little, lamb]

入力カラムとしてrawを持ち、出力カラムとしてfilteredを持つStopWordsRemoverを適用すると、以下を取得するでしょう:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, baloon]  |  [saw, red, baloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filteredでは、stop words "I", "the", "had" および "a" がフィルタされます。

APIの詳細はStopWordsRemover Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala" で見つかります。

APIの詳細はStopWordsRemover Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java" で見つかります。

APIの詳細はStopWordsRemover Python ドキュメント を参照してください。

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/stopwords_remover_example.py" で見つかります。

$n$-gram

n-gramはなんらかの整数$n$のための$n$トークン(一般的に単語)の系列です。NGramクラスは入力の特徴量を$n$-gramに変換するために使うことができます。

NGram は入力として文字列の系列(例えば、Tokenizerの出力)を取ります。パラメータn は各$n$-gramの単語の数を決定するために使われます。出力は、$n$個の連続した単語の空白で区切られた文字列によって表される各$n$-gramの系列からなるでしょう。もし入力系列がn より少ない文字列からなる場合、何も出力されません。

APIの詳細はNGram Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala" で見つかります。

APIの詳細はNGram Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java" で見つかります。

APIの詳細はNGram Python ドキュメント を参照してください。

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/n_gram_example.py" で見つかります。

Binarizer

二値化は数値的な特徴の閾値化を二値 (0/1) の特徴にする処理です。

Binarizer は、二値化のためのthresholdと同様に、共通のパラメータ inputColoutputColを取ります。閾値より大きな特徴値は1.0に2値化されます; 閾値より小さい値は0.0に2値化されます。Vector と Double の両方の種類がinputColのためにサポートされます。

APIの詳細はBinarizer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala" で見つかります。

APIの詳細はBinarizer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java" で見つかります。

APIの詳細はBinarizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/binarizer_example.py" で見つかります。

PCA

PCA は直交変換をおそらく相互関係のある変数の観測のセットを主要なコンポーネントと呼ばれる線形的な相互関係の無い変数の値のセットに変換するために使用する統計学上の手法です。PCA クラスはPCAを使ってベクトルを低次元に投影するためにモデルを訓練します。以下の例は5次元の特徴値ベクトルを3次元の主要な成分に投影する方法を示します。

APIの詳細はPCA Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala" で見つかります。

APIの詳細はPCA Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java" で見つかります。

APIの詳細はPCA Python ドキュメント を参照してください。

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
Find full example code at "examples/src/main/python/ml/pca_example.py" で見つかります。

PolynomialExpansion

Polynomial expansionは特徴値を多項式空間に拡張する処理です。これは元の次元のn-次の組み合わせによって定式化されます。PolynomialExpansion クラスはこの機能を提供します。以下の例は特徴値を3次元の多項式空間に拡張する方法を示します。

APIの詳細はPolynomialExpansion Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala" で見つかります。

APIの詳細はPylynomialExpansion Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java" で見つかります。

APIの詳細はPolynomialExpansion Python ドキュメント を参照してください。

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/polynomial_expansion_example.py" で見つかります。

離散コサイン変換 (DCT)

離散コサイン変換 は時間領域の長さ$N$ リアル値の系列を他の周波数領域の長さ$N$ リアル値の系列に変換します。DCT クラスはこの機能を提供し、DCT-II を実装し、変換のための表現マトリックスが単位元になるように結果に $1/\sqrt{2}$ の倍率を掛けます。変換される手順には移動は適用されません(たとえば、変換手順の $0$th の要素は、DCTの $0$th の係数で $N/2$th ではありません

APIの詳細はDCT Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala" で見つかります。

APIの詳細はDCT Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java" で見つかります。

APIの詳細はDCT Python ドキュメント を参照してください。

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/dct_example.py" で見つかります。

StringIndexer

StringIndexer はラベルの文字列カラムをラベルのインデックスのカラムにエンコードします。このインデックスは[0, numLabels)の中にあり、ラベルの頻度によって並べられます。そして、最も頻度の高いラベルはインデックス0を得ます。ユーザがまだ見たことが無いラベルを維持することを選択した場合、それらはインデックス numLabels に格納されるでしょう。入力カラムが数値の場合、それを文字列にキャストし、文字列値をインデックスします。Estimator または Transformer のようなダウンストリームパイプライン コンポーネントがこの文字列のインデックスラベルを利用する場合、コンポーネントの入力カラムをこの文字列のインデックスのカラム名に設定する必要があります。多くの場合において、setInputColを使って入力カラムを設定することができます。

idcategoryのカラムを持つ以下のデータフレームを仮定します:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category は3つのラベルを持つ文字列のカラムです: “a", “b", and “c". 入力カラムとしてcategoryを持ち、出力カラムとしてcategoryIndexを持つ StringIndexer を適用すると、以下を取得するでしょう:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

"a" は最も頻度が高いためインデックス0 を取得し、インデックス1を持つ"c"とインデックス2を持つ"b"が続きます。

さらに、StringIndexerを1つのデータセットに適合し、それを他のものを変換するのに使う場合、どのようにStringIndexerが未発見のラベルを扱うかについて、3つの戦略があります。

前回の例に戻りますが、今回は以前定義したStringIndexer を以下のデータセットに再利用します:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

StringIndexerがどのように未発見のラベルを扱うかを設定していないか、それを"error"に設定した場合は、例外が投げられるでしょう。しかし、setHandleInvalid("skip")を呼んだ場合は、以下のデータセットが生成されるでしょう:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

“d” あるいは “e” を含む行は現れないことに注意してください。

setHandleInvalid("keep") を呼び出す場合、以下のデータセットが生成されるでしょう:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

“d” あるいは “e” を含む行がインデックス“3.0” にマップされることに注意してください。

APIの詳細はStringIndexer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala" で見つかります。

APIの詳細はStringIndexer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java" iで見つかります。

APIの詳細はStringIndexer Python ドキュメント を参照してください。

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/string_indexer_example.py" で見つかります。

IndexToString

StringIndexerと対象的に、IndexToString はラベルのインデックスのカラムを元のラベルのを含むカラムの文字列としてマップし直します。一般的な使い方は、StringIndexerを使ってラベルからインデックスを生成し、それらのインデックスを使ってモデルを訓練し、IndexToStringを使って予測されたインデックスのカラムから元のラベルを取り出します。しかし、自由に独自のラベルを提供しても良いです。

StringIndexer の例を基礎にして、カラム idcategoryIndexを持つ以下のデータフレームを持つと仮定しましょう:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

入力カラムとして categoryIndex、出力カラムとして originalCategoryと一緒に IndexToStringを適用を適用すると、元のラベルを取り出すことができます(それらはカラムのメタデータから推測されるでしょう)。

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

APIの詳細はIndexToString Scala ドキュメント を参照してください。

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala" で見つかります。

APIの詳細はIndexToString Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java" で見つかります。

APIの詳細はIndexToString Python ドキュメント を参照してください。

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/index_to_string_example.py" で見つかります。

OneHotEncoder

One-hot encoding はラベルのインデックスのカラムを、最大1の値を持つ2値のベクトルにマップします。この変換により、ロジスティック回帰のような連続する特徴値を期待するアルゴリズムが categorical featuresを使うことができます。

APIの詳細はOneHotEncoder Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)
encoded.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala" で見つかります。

APIの詳細はOneHotEncoder Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec");

Dataset<Row> encoded = encoder.transform(indexed);
encoded.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java" で見つかります。

APIの詳細はOneHotEncoder Python ドキュメント を参照してください。

from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/onehot_encoder_example.py" で見つかります。

VectorIndexer

VectorIndexerVectorのデータセット内のカテゴリ特徴量のインデックスを手助けします。それは自動的にどの機能が絶対的かを決め、元の値を分類のインデックスに変換します。厳密に言うと、以下のことを行います:

  1. タイプVectorとパラメータ maxCategoriesを取ります。
  2. 明確な値の数に基づいてどの特徴が絶対的であるかを決定します。ここで最も大きいmaxCategoriesを持つ特徴が絶対的であると明らかにされます。
  3. 各分類特徴について0から始まる分類インデックスを計算します。
  4. 分類的な特徴をインデックスし、元の特徴値をインデックスに変換します。

分類的な特徴をインデックスすることにより、決定木およびツリーアンサンブルのようなアルゴリズムが分類的な特徴を適切に扱うことができ、パフォーマンスを改善します。

以下の例では、ラベル付けされた点のデータセットを読み、どの毒長が分類的であるかを決めるために VectorIndexer を使います。分類的な特徴値をそれらのインデックスに変換します。この変換されたデータはカテゴリ特徴量を扱うDecisionTreeRegressorのようなアルゴリズムに渡すことができます。

APIの詳細はVectorIndexer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala" で見つかります。

APIの詳細はVectorIndexer Java ドキュメント を参照してください。

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java" で見つかります。

APIの詳細はVectorIndexer Python ドキュメント を参照してください。

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/vector_indexer_example.py" で見つかります。

Interaction

Interaction はベクトルあるいは二つの値のカラムを取る Transformer で、各入力カラムから1つの値の全ての組み合わせの積を含む1つのベクトルのカラムを生成します。

例えば、入力カラムとして3次元のそれぞれが2つのベクトル型のカラムを持つ場合、出力カラムとして9次元のベクトルを取得するでしょう。

“id1”, “vec1” と “vec2” のカラムを持つ以下のデータフレームがあると仮定します:

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

それらの入力カラムを使ってInteraction を適用すると、出力カラムとしてのinteractedCol は以下を含みます:

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

APIの詳細はInteraction Scala docs を参照してください。

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala" で見つかります。

APIの詳細はInteractionExample.scala Java ドキュメント を参照してください。

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java" で見つかります。

平均器

Normalizer は、各Vectorが単位ノルムを持つように正規化し、Vector行のデータセットを変換するTransformerです。パラメータ pを取り、それは正規化に使われる p-norm を指定します。(デフォルトは $p = 2$ ) この正規化は入力データの標準化の役に立ち、学習アルゴリズムの挙動を改善します。

以下の例はlibsvm形式のデータセットをロードし、各行がunit $L^1$ ノルムおよび unit $L^\infty$ ノルムを持つように正規化する方法を実演します。

APIの詳細はNormalizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala" で見つかります。

APIの詳細はNormalizer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java" で見つかります。

APIの詳細はNormalizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/normalizer_example.py" で見つかります。

StandardScaler

StandardScalerVector 行のデータセットを変換し、各特長値が標準偏差1 および/あるいは 平均0を持つように正規化します。以下のパラメータを取ります:

StandardScalerStandardScalerModelを生成するためにデータセットにfitするEstimator です; これは概要の統計を計算することに該当します。モデルはデータセット中のVector が単位標準偏差 および/あるいは ゼロ平均特徴 を持つように変換することができます。

特徴の標準偏差がゼロの場合、その特徴のためのVectorの中でデフォルトの0.0 値を返すだろうことに注意してください。

以下の例はlibsvm形式のデータセットをロードし、各特長値がunit 標準偏差を持つように正規化する方法を実演します。

APIの詳細はStandardScaler Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala" で見つかります。

APIの詳細はStandardScaler Java ドキュメント を参照してください。

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java" で見つかります。

APIの詳細はStandardScaler Python ドキュメント を参照してください。

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
例の完全なコードは Spark のリポジトリの"examples/src/main/python/ml/standard_scaler_example.py" で見つかります。

MinMaxScaler

MinMaxScalerVector 行のデータセットを変換し、各特長値を特定の範囲(よくあるのは[0, 1])に再スケールします。以下のパラメータを取ります:

MinMaxScaler はデータセットの総統計を計算し、MinMaxScalerModelを生成します。モデルはそれぞれの特徴値を指定された領域に入るように変換することができます。

特徴Eの再スケールされた値は、\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}として計算されます。$E_{max} == E_{min}$の場合は $Rescaled(e_i) = 0.5 * (max + min)$

ゼロの値はおそらく非ゼロ値に変換されるため、変換器の出力は薄い入力に対してもDenseVectorになるでしょう。

以下の例はlibsvm形式のデータセットをロードし、各特長値を [0, 1] に再スケールします。

APIの詳細はMinMaxScaler Scala ドキュメントMinMaxScalerModel Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala" で見つかります。

APIについての詳細はMinMaxScaler Java ドキュメントMinMaxScalerModel Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java" で見つかります。

APIの詳細はMinMaxScaler Python ドキュメントMinMaxScalerModel Python ドキュメント を参照してください。

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/min_max_scaler_example.py" で見つかります。

MaxAbsScaler

MaxAbsScalerVector の行のデータセットを変換し、各特徴の最大の絶対値を使って割ることで各特徴を範囲 [-1, 1] 再び倍率を掛けます。それはデータを移動/中央に寄せないため、まばらな度合を破壊しません。

MaxAbsScaler はデータセット上の総統計を計算し、MaxAbsScalerModelを生成します。モデルはそれぞれの特徴値を個々に範囲[-1, 1] に変換することができます。

以下の例はlibsvm形式のデータセットをロードし、各特長値を [-1, 1] に再スケールします。

APIの詳細はMaxAbsScaler Scala ドキュメントMaxAbsScalerModel Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala" で見つかります。

APIについての詳細はMaxAbsScaler Java ドキュメントMaxAbsScalerModel Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java" で見つかります。

APIの詳細はMaxAbsScaler Python ドキュメントMaxAbsScalerModel Python ドキュメント を参照してください。

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/max_abs_scaler_example.py" で見つかります。

Bucketizer

Bucketizerは連続する特徴値のカラムをユーザによって指定される特徴値のバケットに変換します。以下のパラメータを取ります:

目的とするカラムの上限と下限に何も意図が無い場合は、splitのBucketizer境界例外の可能性を避けるために境界としてDouble.NegativeInfinityDouble.PositiveInfinity を追加するべきです。

指定するsplitは厳密に増加する順番でなければならないことにも注意してください。つまりs0 < s1 < s2 < ... < sn

Bucketizerについての詳細はAPIドキュメントの中で見つかります。

以下の例はDoubleのカラムを他のインデックス化されたカラムにバケット化する方法を実演します。

APIの詳細はBucketizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala" で見つかります。

APIの詳細はBucketizer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java" で見つかります。

APIの詳細はBucketizer Python ドキュメント を参照してください。

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/bucketizer_example.py" で見つかります。

ElementwiseProduct

ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。これは結果のベクトルを生成するための、入力ベクトル v と 変換ベクトル wアダマール積を表します。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。

APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala" で見つかります。

APIの詳細はElementwiseProduct Java ドキュメント を参照してください。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java" で見つかります。

APIの詳細はElementwiseProduct Python ドキュメント を参照してください。

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/elementwise_product_example.py" で見つかります。

SQLTransformer

SQLTransformer はSQL文によって定義される変換を実装します。現在のところ、以下のようなSQL構文だけをサポートします。"SELECT ... FROM __THIS__ ..." where "__THIS__" は入力データセットの基礎をなすテーブルを表します。select句は出力で表示するフィールド、定数および表現を指定します。Spark SQLがサポートするどのようなselect句もありえます。ユーザは選択されるカラム上で操作するSpark SQLのビルトイン関数とUDFも使うことができます。例えば、SQLTransformer は以下のような文をサポートします:

idv1 および categoryのカラムを持つ以下のデータフレームを仮定します:

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

これは"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"SQLTransformerの出力です :

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

APIの詳細はSQLTransformer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala" で見つかります。

APIの詳細はSQLTransformer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java" で見つかります。

APIの詳細はSQLTransformer Python ドキュメント を参照してください。

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/sql_transformer.py" で見つかります。

VectorAssembler

VectorAssembler は指定されたカラムのリストを一つのベクトルのカラムに合成する変換器です。ロジスティック回帰や決定木のようなMLモデルを訓練するためには、生の特徴と、異なる特徴変換器によって生成された特徴を、1つの特徴ベクトルに結合すると便利です。VectorAssembler は以下の入力カラムタイプを受け入れます: 全ての数値型、真偽値、およびベクトル型。各行内で入力カラムの値は指定された順でベクトルに結合されます。

カラム id, hour, mobile, userFeatures および clicked を持つデータフレームがあると仮定します:

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures は3つのユーザ特徴値を持つベクトルカラムです。hour, mobile および userFeaturesfeaturesと呼ばれる一つの特徴ベクトルに結合し、それを使ってclicked かそうでないかを予想するために使いたいとします。VectorAssemblerの入力カラムを hour, mobile および userFeatures に設定し、出力カラムをfeaturesに設定した場合、変換後に以下のデータフレームを取得するはずです:

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

APIの詳細はVectorAssembler Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala" で見つかります。

APIの詳細はVectorAssembler Java ドキュメント を参照してください。

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java" で見つかります。

APIの詳細はVectorAssembler Python ドキュメント を参照してください。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/vector_assembler_example.py"で見つかります。

QuantileDiscretizer

QuantileDiscretizer は連続する特徴を持つカラムを取り、ビンされたカテゴリの機能を持つカラムを出力します。ビンの数はnumBuckets パラメータによって設定されます。使用されるバケットの数はこの値より小さいかも知れません。例えば、十分にはっきりと異なる変異を生成するには入力の個別の値があまりにも少ない場合。

NaN 値: NaN 値はQuantileDiscretizer フィッティングの間にカラムから削除されるでしょう。これは予想するためのBucketizer モデルを生成するでしょう。変換の間に、Bucketizer がデータセット内にNaN値を見つけるとエラーを上げるかも知れませんが、ユーザはhandleInvalidを設定してデータセット内のNaN値を維持するか削除するかのどちらかを選択することもできます。ユーザがNaN値を維持することを選択した場合、それらは特別に扱われ、自身のバケット内に配置されるでしょう。例えば、4つのバケットが使われる場合、非NaNデータはバケット[0-3]に入れられるでしょうが、NaNは特別なバケット[4]に入れられるでしょう。

アルゴリズム: ビンの範囲は近似アルゴリズムを使って選択されます(詳細な説明はapproxQuantile のドキュメントを見てください)。近似の精度はrelativeError パラメータを使って制御することができます。0に設定した場合は、正確な変位量が計算されます(注意: 正確な変位量の計算は高価な操作です)。下限および上限のbinの境界は全ての実世界の値をカバーする-Infinity+Infinity です。

id, hourのカラムを持つデータフレームがあるとします:

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hourDouble型の連続する特徴です。連続する特徴を明確な1つに変化したいとします。numBuckets = 3の場合、以下のデータフレームを取得するはずです:

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

APIの詳細はQuantileDiscretizer Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala" で見つかります。

APIの詳細はQuantileDiscretizer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java" で見つかります。

APIの詳細はQuantileDiscretizer Python ドキュメント を参照してください。

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/quantile_discretizer_example.py" で見つかります。

Imputer

Imputer 変換器は、失われた値の箇所の平均あるいは中央値のどちらかを使って、データセット中の失われた値を補完します。入力カラムは DoubleType または FloatType でなければなりません。現在のところ、Imputer はカテゴリ特徴量をサポートせず、カテゴリ特徴量を含むカラムについてはたぶん正しくない値を生成します。

入力カラム中の全てのnull 値は失われたものとして扱われ、推測されることに注意してください。

カラム ab を持つデータフレームがあると仮定します:

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

この例では、Imputer は存在する全てのDouble.NaN (失われた値のデフォルト) を対応するカラムの他の値から計算した平均(デフォルトの補完ストラテジ)で置き換えるでしょう。この例では、カラムab の代理の値は それぞれ 3.0 と 4.0 です。変換の後で、出力カラム内の失われた値は関連するカラムの代理の値によって置き換えられるでしょう。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

APIの詳細はImputer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala" で見つかります。

APIの詳細はImputer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java" で見つかります。

APIの詳細はImputer Python ドキュメント を参照してください。

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/imputer_example.py" で見つかります。

Feature Selectors

VectorSlicer

VectorSlicer は特徴値ベクトルを取る変換器で、元の特徴値のsub配列を持つ新しい特徴ベクトルを出力します。ベクトルのカラムから特徴値を抽出するのに役立ちます。

VectorSlicer は指定されたインデックスを持つベクトルカラムを受け付け、それらのインデックスを使って選択された値を持つ新しいベクトルカラムを出力します。インデックスには2つの種類があります

  1. インデックスがベクトルを表す数値インデックス setIndices();

  2. 特徴値の名前がベクトルを表す文字列インデックス setNames()これは実装がAttributeの名前部分に一致するため、AttributeGroupを持つためにベクトルカラムを必要とします。

数値および文字列の両方による指定が受け付けられます。更に、同時に数値と文字列による名前を使用することができます。少なくとも1つの特徴量が選択されなければなりません。特徴量の複製は許可されません。つまり選択されたインデックスと名前の間には重複は無いでしょう。特徴の名前が選択されると、もし空の入力属性に遭遇した場合に例外が投げられることに注意してください。

出力ベクトルは選択されたインデックスを最初(指定された順番)に、続いて選択された名前を次(指定された順番)に、整列するでしょう。

userFeaturesのカラムを持つデータフレームを持つと仮定します:

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures は3つのユーザ特徴値を持つベクトルカラムです。userFeaturesの最初のカラムはゼロでそれを削除し、最後の2つのカラムだけが選択されるようにしたいとします。VectorSlicersetIndices(1, 2) を持つ最後の2つの要素を選択し、featuresという名前の新しいベクトルを生成します:

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

userFeaturesとして ["f1", "f2", "f3"]の可能性がある入力属性があるとした場合、それらを選択するためにsetNames("f2", "f3") を使うことができます。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

APIの詳細はVectorSlicer Scala ドキュメント を参照してください。

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala" で見つかります。

APIの詳細はVectorSlicer Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java" で見つかります。

APIの詳細はVectorSlicer Python ドキュメント を参照してください。

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/vector_slicer_example.py" で見つかります。

RFormula

RFormulaR model formulaで指定されたカラムを選択します。現在のところ、‘~’, ‘.’, ‘:’, ‘+’ および ‘-‘ を含む R オペレータの制限された部分集合をサポートします。基本的なオペレータは以下の通りです:

ab が double のカラムと仮定した場合、RFormulaの結果を説明するために以下の単純な例を使います:

RFormula は、特徴のカラムと、doubleあるいは文字列のカラムのベクトルを生成します。Rで線形回帰のために公式が使われるように、文字列入力カラムはone-hotエンコードされ、数値カラムはdoubleにキャストされるでしょう。ラベルのカラムが文字の種類の場合、それはStringIndexerを使ってまず二倍に変換されるでしょう。データフレーム中にラベルのカラムが存在しない場合は、出力ラベルのカラムは公式内の指定された応答変数から生成されるでしょう。

id, country, hour および clickedのカラムを持つデータフレームがあるとします:

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

clicked ~ country + hourの公式文字列を使ってRFormulaを使う場合は、これはcountry および hourに基づいて clickedを予想したいことを意味し、変換後に以下のデータフレームを取得するでしょう:

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

APIの詳細はRFormula Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala" で見つかります。

APIの詳細はRFormula Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java" で見つかります。

APIの詳細はRFormula Python ドキュメント を参照してください。

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/rformula_example.py" で見つかります。

ChiSqSelector

ChiSqSelector はカイ二乗特徴抽出を意味します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelector はどの特徴を選択するかを決めるために 独立性のカイ二乗検定 を使います。5つの選択メソッドをサポートします: numTopFeatures, percentile, fpr, fdr, fwe: * numTopFeatures カイ二乗検定に従って固定数のトップの特徴を選択します。これは最も予知力が高い特徴に明け渡すことに似ています。* percentilenumTopFeatures に似ていますが、固定数の代わりに全ての特徴の分数を選択します。* fpr はp値が閾値以下の全ての特徴を選択します。従って選択のfalse positiveレートを制御します。* fdr はfalse discoveryレートが閾値より低い全ての特徴を選択するために Benjamini-Hochberg procedure を使います。* fwe はp値が閾値以下の全ての特徴を選択します。閾値は 1/numFeatures によってスケールされ、従って選択のfamily-withエラーが制御されます。デフォルトでは、選択メソッドはデフォルトのトップの特徴の数が50に設定されたnumTopFeaturesです。ユーザは setSelectorTypeを使って選択メソッドを選択することができます。

id, features および clickedのカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

numTopFeatures = 1を持つ ChiSqSelector を使う場合、ラベルclicked に応じて、features内の最後のカラムは最も有用な特徴として選択されます。

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

APIの詳細はChiSqSelector Scala ドキュメント を参照してください。

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala" で見つかります。

APIの詳細はChiSqSelector Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java" で見つかります。

APIの詳細はChiSqSelector Python ドキュメント を参照してください。

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/chisq_selector_example.py" で見つかります。

一貫性のあるハッシュ

局所性鋭敏型ハッシュ (LSH) はハッシュ技術の重要なクラスです。クラスタリング、近似近傍検索および大きなデータセットの外れ値検知で一般的に使われます。

LSHの全体的な考えは、お互いに近いデータポイントが高い確率を持つ同じバケット内にあり一方でお互いに遠くにあるデータポイントが異なるバケット内にあるように、ハッシュデータポイントがバケットを指す関数のファミリー(“LSH families”)を使うことです。LSH ファミリーは正式には以下のように定義されます。

距離空間 (M, d)M はセットでdM上での距離関数、の中で、LSHファミリーは以下の特性を満たす関数のファミリー h です: \[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] この LSH ファミリーは (r1, r2, p1, p2)-センシティブ と呼ばれます。

Sparkでは、異なるLSHファミリーは別個のクラス (例えば MinHash)内で実装され、特徴の変換のためのAPI、approximate similarity join および 近似最近傍 は各クラス内で提供されます。

LSHでは、false positiveを入力特徴の距離のペア ($d(p,q) \geq r2$を持つ) として定義します。これは同じパケットにハッシュ化されます。false negativeを 近くの特徴のペア($d(p,q) \leq r1$を持つ) と定義します。これは異なるバケットにハッシュ化されます。

LSH オペレーション

LSHが代用することができる主要なオペレータを説明します。合致したLSHモデルはこれらのオペレーションのそれぞれのためのメソッドを持ちます。

特徴変換

特徴変換はハッシュ化された値を新しいカラムとして追加する基本的な機能です。これは次元削減に役に立つかも知れません。ユーザはinputCol および outputColを設定することで入力と出力のカラム名を指定することができます。

LSH は複数のLSHハッシュテーブルもサポートします。ユーザはnumHashTablesを設定することでハッシュテーブルの数を指定することができます。これは類似結合近似と近似最近傍においてOR-amplificationに使う事もできます。ハッシュテーブルの数を増やすと精度が高まりますが、通信コストと実行時間も増えるでしょう。

outputColの型は配列の次元が numHashTablesに等しく、ベクトルの次元が現在ところ1に設定されている Seq[Vector]です。将来のリリースでは、ユーザがこれらのベクトルの次元を指定できるように AND-amplification を実装するつもりです。

類似結合近似

類似結合近似は二つのデータセットを取り、ユーザ定義の閾値より小さい距離のデータセット中の行のペアを大体返します。類似結合近似は二つの異なるデータセットと自身への結合の両方をサポートします。自身への結合は幾つかの重複ペアを生成するでしょう。

類似結合近似は入力として変換および非変換されたデータセットの両方を受け付けます。非変換のデータセットが使われた場合、自動的に変換されるでしょう。この場合、ハッシュの署名はoutputColとして生成されるでしょう。

結合されたデータセットの中で、元のデータセットは datasetA および datasetB の中でクエリすることができます。返された各行のペア間の本当の距離を示すために、出力データセットに距離カラムが追加されるでしょう。

近似最近傍検索は(特徴ベクトルの)データセットと(1つの特徴ベクトルの)キーを取り、ベクトルに最も近いデータセット内の指定された数の行を大体返します。

近似最近傍検索は入力として変換および非変換されたデータセットの両方を受け付けます。非変換のデータセットが使われた場合、自動的に変換されるでしょう。この場合、ハッシュの署名はoutputColとして生成されるでしょう。

各出力行と検索されたキーの間の本当の距離を示すために、出力データセットに距離カラムが追加されるでしょう。

注意: 近似最近傍検索は、ハッシュバケット内に十分な候補が無い場合にk より少ない行を返すでしょう。

LSH アルゴリズム

ユークリッド距離についてのバケット化ランダム写像

バケット化ランダム写像はユークリッド距離のためのLSHファミリーです。ユークリッド距離は以下のように定義されます: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \] LSHファミリーは特徴ベクトル $\mathbf{x}$ をランダムな単位ベクトル $\mathbf{v}$ 上に写像し、写像された結果をハッシュバケットに割り当てます: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \] r はユーザ定義のバケットの長さです。バケットの長さはハッシュバケットの平均長(と、したがってバケットの数)を制御するために使うことができます。バケット長が大きくなる(つまり、バケットが少なくなる)と、同じバケットにハッシュ化される特徴の確率が増加します(trueおよびfalse positiveの数が増えます)。

バケット化ランダム写像は任意のベクトルと入力特徴として受け取り、sparseとdenseベクトルの両方をサポートします。

APIの詳細はBucketedRandomProjectionLSH Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala" で見つかります。

APIの詳細はBucketedRandomProjectionLSH Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java" で見つかります。

APIの詳細はBucketedRandomProjectionLSH Python ドキュメントを参照してください。

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/bucketed_random_projection_lsh_example.py" で見つかります。

ジャッカード距離のためのMinHash

MinHash は入力特徴が自然数のセットの場合のジャッカー度距離のLSHファミリーです。二つのセットのジャッカード距離はそれらの和と積の濃度によって定義されます: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHash はランダムハッシュ関数g をセット中の各要素に適用し、すべてのハッシュ化された値の最小値を取ります: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHashの入力セットは二元のベクトルを表します。ベクトルのインデックスは要素自身を表し、ベクトル内の非0の値はセット内の要素の存在を表します。denseとsparseベクトルの両方がサポートされますが、一般的には効率化のためにsparseベクトルがお勧めです。例えば、Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) は空間内に10個の要素があることを意味します。このセットは elem 2, elem 3 および elem 5 を含みます。全ての非ゼロの値は2値の “1” の値として扱われます。

注意: 空のセットはMinHashによって変換することができません。このことは任意の入力ベクトルは少なくとも一つの非ゼロのエントリを持つ必要があることを意味します。

APIの詳細はMinHashLSH Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala" で見つかります。

APIの詳細はMinHashLSH Java ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/MinHashLSH" で見つかります。

APIの詳細はMinHashLSH Python ドキュメント を参照してください。

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
例の完全なコードは Spark のリポジトリの "examples/src/main/python/ml/min_hash_lsh_example.py" で見つかります。
TOP
inserted by FC2 system