特徴の抽出、変換および選択
この章は以下のグループに大まかに分類される特徴のために動作するアルゴリズムを対象にします:
- 抽出: "raw"データから特徴を抽出
- 変換: スケーリング、変換、あるいは特徴の修正
- 選択: 特徴の大きなセットから下位集合を選択
目次
特徴抽出器
TF-IDF
索引語頻度の逆出現頻度 (TF-IDF) はテキストマイニングで単語の重要度をコープス内のドキュメントに反映するために広く使われている特徴ベクトル化です。$t$
によって単語を、$d$
によってドキュメントを、$D$
でコープスを示します。索引語頻度 $TF(t, d)$
は単語 $t$
がドキュメント $d$
に現れる数で、ドキュメントの文章頻度 $DF(t, D)$
は単語 $t$
を含むドキュメントの数です。重要度を測定するために索引語頻度だけを使う場合、頻繁に現れる単語を過度に強調しすぎますが、ドキュメントについてほとんど情報を運びません。例えば "a"、"the" および "of"。コープスを横断してしばしば単語が現れる場合、それは特定のドキュメントに関する特別な情報を運ばないことを意味します。逆出現頻度はどれだけの情報を単語が提供するかの数値的な指標です: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
ここで $|D|$
はコープス内のドキュメントの総数です。対数が使われるため、全てのドキュメントで単語が現れる場合、IDFは0になります。補正単語はコープス外の単語のためにゼロで割られることを避けるために適用されることに注意してください。TF-IDF 指標は単純にTF と IDFの産物です: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
索引語頻度と文章頻度の定義上の幾つかの変数があります。MLlibではそれらを柔軟にするためにTFとIDFを分割します。
TF: HashingTF
とCountVectorizer
の両方は、単語の頻度ベクトルを生成するために使うことができます。
HashingTF
は単語のセットを取り、それらのセットを固定長の特徴量ベクトルに設定する変換器
です。テキスト処理において、"単語のセット"は bag of words です。HashingTF
は ハッシュのトリックを使います。生の特徴はハッシュ関数を適用することでインデックス(単語)にマップされます。ここで使われるハッシュ関数はMurmurHash 3です。そして、索引語頻度はマップされたインデックスに基づいて計算されます。このやり方はグローバルな単語からインデックスへのマップの計算の必要性を避けます。これは大きなコープスには高くつくでしょうが、潜在的なハッシュの衝突の影響を受けます。生の特徴の違いはハッシュの後で同じ単語になるかも知れません。衝突の可能性を減らすために、目標の特徴次元を増やすことができます。つまり、ハッシュテーブルのバケットの数です。単純なmoduloはハッシュ関数をカラムインデックスに変換するために使われるため、特徴次元として二乗を使うことは当を得ています。そうでなければ、機能はカラムに等しくマップされるでしょう。デフォルトの特徴次元は $2^{18} = 262,144$
です。任意の二値トグルパラメータは単語の頻度のカウントを制御します。trueに設定された場合は、全ての非ゼロの頻度のカウントは1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。
CountVectorizer
はテキストドキュメントを単語のカウントのベクトルに変換します。詳細はCountVectorizer を参照してください。
IDF: IDF
はデータセットに当てはまり IDFModel
を生成するEstimator
です。IDFModel
は特徴量ベクトル(一般的にHashingTF
あるいはCountVectorizer
から生成されます)を取り、各カラムをスケールします。直感的にそれはコープス内で頻繁に現れるカラムの重み付けを減らします。
注意: spark.ml
はテキストの文節化のためのツールを提供しません。ユーザはStanford NLP Group および scalanlp/chalk を参照してください。
例
以下のコードの断片で、文章のセットから始めます。各文章を Tokenizer
を使って単語に分割します。各文章 (bag of words)について、文章を特徴量ベクトルにハッシュするためにHashingTF
を使います。特徴量ベクトルをリスケールするために IDF
を使います; これは一般的にテキストを特徴として使用する場合にパフォーマンスを改善します。それから特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIについての詳細はHashingTF Scala ドキュメント およびIDF Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").take(3).foreach(println)
APIについての詳細はHashingTF Java ドキュメント およびIDF Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
for (Row r : rescaledData.select("features", "label").takeAsList(3)) {
Vector features = r.getAs(0);
Double label = r.getDouble(1);
System.out.println(features);
System.out.println(label);
}
APIについての詳細はHashingTF Python ドキュメント およびIDF Python ドキュメントを参照してください。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
print(features_label)
Word2Vec
Word2Vec
は ドキュメントを表す単語の系列を取り、Word2VecModel
を訓練する Estimator
です。モデルは各単語をユニークな固定長のベクトルにマップします。Word2VecModel
は各ドキュメントをドキュメント内の全ての単語の平均を使ってベクトルに変換します; このベクトルは予想、ドキュメントの類似性の計算などのために特徴として使うことができます。詳細は Word2VecのMLlibユーザガイドを参照してください。
以下のコードの断片の中で、ドキュメントのセットを使って始めます。それぞれは単語の系列を表します。各ドキュメントのために、それを特徴量ベクトルに変換します。そして、この特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIの詳細はWord2Vec Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Word2Vec
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
APIの詳細はWord2Vec Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row r : result.select("result").takeAsList(3)) {
System.out.println(r);
}
APIの詳細はWord2Vec Python ドキュメント を参照してください。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for feature in result.select("result").take(3):
print(feature)
CountVectorizer
CountVectorizer
と CountVectorizerModel
はテキスト文章のコレクションをトークンのカウントのベクトルに変換することを目的としています。推測的な辞書を利用できない場合、CountVectorizer
は語彙を抽出するためにEstimator
としてつかうことができ、CountVectorizerModel
を生成します。モデルは語彙上のドキュメントのための希薄な表現を生成します。これはLDAのような他のアルゴリズムに渡すことができます。
合致プロセスの中で、CountVectorizer
はコープスに渡って単語の頻度が高い順に並べられた単語の中で最も長いvocabSize
を選択するでしょう。語彙に含まれるべき単語がドキュメントに現れる最小の数(断片が<1.0)を指定することで、任意のパラメータ minDF
も合致プロセスに影響します。他の任意の二値トグルのパラメータが出力ベクトルを制御します。trueに設定されると全ての非ゼロのカウントが1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。
例
id
と texts
のカラムを持つ以下のデータフレームを仮定します:
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
each row in texts
is a document of type Array[String].
Invoking fit of CountVectorizer
produces a CountVectorizerModel
with vocabulary (a, b, c).
Then the output column “vector” after transformation contains:
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
各ベクトルは語彙上のドキュメントのトークンの数を表します。
APIの詳細はCountVectorizer Scala ドキュメント と CountVectorizerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).select("features").show()
APIの詳細はCountVectorizer Java ドキュメント と CountVectorizerModel Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show();
APIの詳細はCountVectorizer Python ドキュメント と CountVectorizerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show()
特徴変換
Tokenizer
Tokenization は(文章のような)テキストを取るプロセスで、個々の(通常は単語の)用語に分解するプロセスです。単純なTokenizer クラスはこの機能を提供します。以下の例は文章を単語の列に分割する方法を説明します。
RegexTokenizer は正規表現(regex)一致に基づいた更に進んだtokenizationをすることができます。By default, the parameter “pattern” (regex, default: "\\s+"
) is used as delimiters to split the input text. もう一つの方法として、ユーザはパラメータ "gaps"をfalseに設定して、正規表現"pattern"がgapを分割するのではなく"tokens"を意味するようにして、全ての適合の存在をトークン化の結果として見つけることができます。
APIの詳細はTokenizer Scala ドキュメント と RegexTokenizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
APIについての詳細はTokenizer Java ドキュメント と RegexTokenizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsDataFrame = tokenizer.transform(sentenceDataFrame);
for (Row r : wordsDataFrame.select("words", "label").takeAsList(3)) {
java.util.List<String> words = r.getList(0);
for (String word : words) System.out.print(word + " ");
System.out.println();
}
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
APIについての詳細はTokenizer Python ドキュメント と RegexTokenizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
for words_label in wordsDataFrame.select("words", "label").take(3):
print(words_label)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
StopWordsRemover
Stop wordsは入力から除外されなければならない単語で、一般的にはその単語は頻繁に現れ、意味を持たないからです。
StopWordsRemover
は入力として文字列の系列(例えば、Tokenizerの出力)を取り、入力の系列から全てのstop wordを取りこぼします。stopwordのリストはstopWords
パラメータによって指定されます。StopWordsRemover.loadDefaultStopWords(language)
を呼び出すことで幾つかの言語のデフォルトの停止単語が利用可能です。利用可能なオプションは "danish", "dutch", "english", "finnish", "french", "german", "hungarian", "italian", "norwegian", "portuguese", "russian", "spanish", "swedish" および "turkish" です。真偽値パラメータ caseSensitive
は一致が大文字小文字を区別するかを示します(デフォルトはfalse)。
例
id
と raw
のカラムを持つ以下のデータフレームを仮定します:
id | raw
----|----------
0 | [I, saw, the, red, baloon]
1 | [Mary, had, a, little, lamb]
入力カラムとしてraw
を持ち、出力カラムとしてfiltered
を持つStopWordsRemover
を適用すると、以下を取得するでしょう:
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, baloon] | [saw, red, baloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
filtered
では、stop words "I", "the", "had" および "a" がフィルタされます。
APIの詳細はStopWordsRemover Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "baloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show()
APIの詳細はStopWordsRemover Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show();
APIの詳細はStopWordsRemover Python ドキュメント を参照してください。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "baloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["label", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
$n$-gram
n-gramはなんらかの整数$n$のための$n$トークン(一般的に単語)の系列です。NGram
クラスは入力の特徴量を$n$-gramに変換するために使うことができます。
NGram
は入力として文字列の系列(例えば、Tokenizerの出力)を取ります。パラメータn
は各$n$-gramの単語の数を決定するために使われます。出力は、$n$個の連続した単語の空白で区切られた文字列によって表される各$n$-gramの系列からなるでしょう。もし入力系列がn
より少ない文字列からなる場合、何も出力されません。
APIの詳細はNGram Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("label", "words")
val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)
APIの詳細はNGram Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
for (Row r : ngramDataFrame.select("ngrams", "label").takeAsList(3)) {
java.util.List<String> ngrams = r.getList(0);
for (String ngram : ngrams) System.out.print(ngram + " --- ");
System.out.println();
}
APIの詳細はNGram Python ドキュメント を参照してください。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["label", "words"])
ngram = NGram(inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):
print(ngrams_label)
Binarizer
二値化は数値的な特徴の閾値化を二値 (0/1) の特徴にする処理です。
Binarizer
は、二値化のためのthreshold
と同様に、共通のパラメータ inputCol
とoutputCol
を取ります。閾値より大きな特徴値は1.0に2値化されます; 閾値より小さい値は0.0に2値化されます。Vector と Double の両方の種類がinputCol
のためにサポートされます。
APIの詳細はBinarizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("label", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeatures = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)
APIの詳細はBinarizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
Dataset<Row> binarizedFeatures = binarizedDataFrame.select("binarized_feature");
for (Row r : binarizedFeatures.collectAsList()) {
Double binarized_value = r.getDouble(0);
System.out.println(binarized_value);
}
APIの詳細はBinarizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["label", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedFeatures = binarizedDataFrame.select("binarized_feature")
for binarized_feature, in binarizedFeatures.collect():
print(binarized_feature)
PCA
PCA は直交変換をおそらく相互関係のある変数の観測のセットを主要なコンポーネントと呼ばれる線形的な相互関係の無い変数の値のセットに変換するために使用する統計学上の手法です。PCA クラスはPCAを使ってベクトルを低次元に投影するためにモデルを訓練します。以下の例は5次元の特徴値ベクトルを3次元の主要な成分に投影する方法を示します。
APIの詳細はPCA Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
APIの詳細はPCA Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show();
APIの詳細はPCA Python ドキュメント を参照してください。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
PolynomialExpansion
Polynomial expansionは特徴値を多項式空間に拡張する処理です。これは元の次元のn-次の組み合わせによって定式化されます。PolynomialExpansion クラスはこの機能を提供します。以下の例は特徴値を3次元の多項式空間に拡張する方法を示します。
APIの詳細はPolynomialExpansion Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0),
Vectors.dense(0.6, -1.1)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
APIの詳細はPylynomialExpansion Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(-2.0, 2.3)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(0.6, -1.1))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
List<Row> rows = polyDF.select("polyFeatures").takeAsList(3);
for (Row r : rows) {
System.out.println(r.get(0));
}
APIの詳細はPolynomialExpansion Python ドキュメント を参照してください。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark\
.createDataFrame([(Vectors.dense([-2.0, 2.3]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.6, -1.1]),)],
["features"])
px = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
print(expanded)
離散コサイン変換 (DCT)
離散コサイン変換 は時間領域の長さ$N$ リアル値の系列を他の周波数領域の長さ$N$ リアル値の系列に変換します。DCT クラスはこの機能を提供し、DCT-II を実装し、変換のための表現マトリックスが単位元になるように結果に $1/\sqrt{2}$ の倍率を掛けます。変換される手順には移動は適用されません(たとえば、変換手順の $0$th の要素は、DCTの $0$th の係数で $N/2$th ではありません。
APIの詳細はDCT Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(3)
APIの詳細はDCT Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(3);
APIの詳細はDCT Python ドキュメント を参照してください。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
for dcts in dctDf.select("featuresDCT").take(3):
print(dcts)
StringIndexer
StringIndexer
はラベルの文字列カラムをラベルのインデックスのカラムにエンコードします。このインデックスは[0, numLabels)
の中にあり、ラベルの頻度によって並べられます。そして、最も頻度の高いラベルはインデックス0
を得ます。入力カラムが数値の場合、それを文字列にキャストし、文字列値をインデックスします。Estimator
または Transformer
のようなダウンストリームパイプライン コンポーネントがこの文字列のインデックスラベルを利用する場合、コンポーネントの入力カラムをこの文字列のインデックスのカラム名に設定する必要があります。多くの場合において、setInputCol
を使って入力カラムを設定することができます。
例
id
と category
のカラムを持つ以下のデータフレームを仮定します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
は3つのラベルを持つ文字列のカラムです: “a", “b", and “c". 入力カラムとしてcategory
を持ち、出力カラムとしてcategoryIndex
を持つ StringIndexer
を適用すると、以下を取得するでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
"a" は最も頻度が高いためインデックス0
を取得し、インデックス1
を持つ"c"とインデックス2
を持つ"b"が続きます。
さらに、StringIndexer
を1つのデータセットに適合し、それを他のものを変換するのに使う場合、どのようにStringIndexer
が未発見のラベルを扱うかについて、二つの戦略があります。
- 例外を投げる (これがデフォルトです)
- 現在未発見のラベルを含んでいる行を完全にスキップする
例
前回の例に戻りますが、今回は以前定義したStringIndexer
を以下のデータセットに再利用します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
StringIndexer
がどのように未発見のラベルを扱うかを設定していないか、それを"error"に設定した場合は、例外が投げられるでしょう。しかし、setHandleInvalid("skip")
を呼んだ場合は、以下のデータセットが生成されるでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
"d"を含む行が現れないことに注意してください。
APIの詳細はStringIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
APIの詳細はStringIndexer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
APIの詳細はStringIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
IndexToString
StringIndexer
と対象的に、IndexToString
はラベルのインデックスのカラムを元のラベルのを含むカラムの文字列としてマップし直します。一般的な使い方は、StringIndexer
を使ってラベルからインデックスを生成し、それらのインデックスを使ってモデルを訓練し、IndexToString
を使って予測されたインデックスのカラムから元のラベルを取り出します。しかし、自由に独自のラベルを提供しても良いです。
例
StringIndexer
の例を基礎にして、カラム id
と categoryIndex
を持つ以下のデータフレームを持つと仮定しましょう:
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
入力カラムとして categoryIndex
、出力カラムとして originalCategory
と一緒に IndexToString
を適用を適用すると、元のラベルを取り出すことができます(それらはカラムのメタデータから推測されるでしょう)。
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
APIの詳細はIndexToString Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
APIの詳細はIndexToString Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
converted.select("id", "originalCategory").show();
APIの詳細はIndexToString Python ドキュメント を参照してください。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
OneHotEncoder
One-hot encoding はラベルのインデックスのカラムを、最大1の値を持つ2値のベクトルにマップします。この変換により、ロジスティック回帰のような連続する特徴値を期待するアルゴリズムが categorical featuresを使うことができます。
APIの詳細はOneHotEncoder Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
APIの詳細はOneHotEncoder Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec");
Dataset<Row> encoded = encoder.transform(indexed);
encoded.select("id", "categoryVec").show();
APIの詳細はOneHotEncoder Python ドキュメント を参照してください。
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
VectorIndexer
VectorIndexer
はVector
のデータセット内のカテゴリ特徴量のインデックスを手助けします。それは自動的にどの機能が絶対的かを決め、元の値を分類のインデックスに変換します。厳密に言うと、以下のことを行います:
- タイプVectorとパラメータ
maxCategories
を取ります。 - 明確な値の数に基づいてどの特徴が絶対的であるかを決定します。ここで最も大きい
maxCategories
を持つ特徴が絶対的であると明らかにされます。 - 各分類特徴について0から始まる分類インデックスを計算します。
- 分類的な特徴をインデックスし、元の特徴値をインデックスに変換します。
分類的な特徴をインデックスすることにより、決定木およびツリーアンサンブルのようなアルゴリズムが分類的な特徴を適切に扱うことができ、パフォーマンスを改善します。
以下の例では、ラベル付けされた点のデータセットを読み、どの毒長が分類的であるかを決めるために VectorIndexer
を使います。分類的な特徴値をそれらのインデックスに変換します。この変換されたデータはカテゴリ特徴量を扱うDecisionTreeRegressor
のようなアルゴリズムに渡すことができます。
APIの詳細はVectorIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
categoricalFeatures.mkString(", "))
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
APIの詳細はVectorIndexer Java ドキュメント を参照してください。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
APIの詳細はVectorIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
平均器
Normalizer
は、各Vector
が単位ノルムを持つように正規化し、Vector
行のデータセットを変換するTransformer
です。パラメータ p
を取り、それは正規化に使われる p-norm を指定します。(デフォルトは $p = 2$ ) この正規化は入力データの標準化の役に立ち、学習アルゴリズムの挙動を改善します。
以下の例はlibsvm形式のデータセットをロードし、各行がunit $L^2$ ノルムおよび unit $L^\infty$ ノルムを持つように正規化する方法を実演します。
APIの詳細はNormalizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Normalizer
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()
APIの詳細はNormalizer Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
APIの詳細はNormalizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Normalizer
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()
StandardScaler
StandardScaler
はVector
行のデータセットを変換し、各特長値が標準偏差1 および/あるいは 平均0を持つように正規化します。以下のパラメータを取ります:
withStd
: デフォルトはtrueデータを標準偏差1にスケールします。withMean
: デフォルトはfalse。スケーリングする前に平均を使ってデータを中心化します。dense出力を構築するため、これはsparse入力では動作せず、例外を発生するでしょう。
StandardScaler
は StandardScalerModel
を生成するためにデータセットにfit
するEstimator
です; これは概要の統計を計算することに該当します。モデルはデータセット中のVector
が単位標準偏差 および/あるいは ゼロ平均特徴 を持つように変換することができます。
特徴の標準偏差がゼロの場合、その特徴のためのVector
の中でデフォルトの0.0
値を返すだろうことに注意してください。
以下の例はlibsvm形式のデータセットをロードし、各特長値がunit 標準偏差を持つように正規化する方法を実演します。
APIの詳細はStandardScaler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細はStandardScaler Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はStandardScaler Python ドキュメント を参照してください。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler
MinMaxScaler
は Vector
行のデータセットを変換し、各特長値を特定の範囲(よくあるのは[0, 1])に再スケールします。以下のパラメータを取ります:
min
: デフォルトは0.0。変換後の下限値は全ての特徴値によって共有されます。max
: デフォルトは1.0。変換後の上限値は全ての特徴値によって共有されます。
MinMaxScaler
はデータセットの総統計を計算し、MinMaxScalerModel
を生成します。モデルはそれぞれの特徴値を指定された領域に入るように変換することができます。
The rescaled value for a feature E is calculated as,
\begin{equation}
Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min
\end{equation}
For the case $E_{max} == E_{min}$
, $Rescaled(e_i) = 0.5 * (max + min)$
ゼロの値はおそらく非ゼロ値に変換されるため、変換器の出力は薄い入力に対してもDenseVector
になるでしょう。
以下の例はlibsvm形式のデータセットをロードし、各特長値を [0, 1] に再スケールします。
APIの詳細はMinMaxScaler Scala ドキュメント と MinMaxScalerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.MinMaxScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIについての詳細はMinMaxScaler Java ドキュメント と MinMaxScalerModel Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はMinMaxScaler Python ドキュメント と MinMaxScalerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import MinMaxScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MaxAbsScaler
MaxAbsScaler
はVector
の行のデータセットを変換し、各特徴の最大の絶対値を使って割ることで各特徴を範囲 [-1, 1] 再び倍率を掛けます。それはデータを移動/中央に寄せないため、まばらな度合を破壊しません。
MaxAbsScaler
はデータセット上の総統計を計算し、MaxAbsScalerModel
を生成します。モデルはそれぞれの特徴値を個々に範囲[-1, 1] に変換することができます。
以下の例はlibsvm形式のデータセットをロードし、各特長値を [-1, 1] に再スケールします。
APIの詳細はMaxAbsScaler Scala ドキュメント と MaxAbsScalerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.MaxAbsScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIについての詳細はMaxAbsScaler Java ドキュメント と MaxAbsScalerModel Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame = spark
.read()
.format("libsvm")
.load("data/mllib/sample_libsvm_data.txt");
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はMaxAbsScaler Python ドキュメント と MaxAbsScalerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import MaxAbsScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
Bucketizer
Bucketizer
は連続する特徴値のカラムをユーザによって指定される特徴値のバケットに変換します。以下のパラメータを取ります:
splits
: 連続する特徴値をバケットにマップするためのパラメータ。n+1 分割の場合、nバケットになります。split x,y によって定義されたバケットは最後のバケットを除いて [x,y) の範囲の値を持ちます。最後のバケットはyも含みます。splits は厳密に増加しなければなりません。全てのdouble値の範囲をカバーするために、-inf, inf の値も明示的に提供されなければなりません; そうでなければ、指定されたsplit外の値はエラーとして扱われるでしょう。splits
の2つの例としてArray(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
とArray(0.0, 1.0, 2.0)
があります。
目的とするカラムの上限と下限に何も意図が無い場合は、splitのBucketizer境界例外の可能性を避けるために境界としてDouble.NegativeInfinity
と Double.PositiveInfinity
を追加するべきです。
指定するsplitは厳密に増加する順番でなければならないことにも注意してください。つまりs0 < s1 < s2 < ... < sn
。
Bucketizerについての詳細はAPIドキュメントの中で見つかります。
以下の例はDouble
のカラムを他のインデックス化されたカラムにバケット化する方法を実演します。
APIの詳細はBucketizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
APIの詳細はBucketizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();
APIの詳細はBucketizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
ElementwiseProduct
ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。これは結果のベクトルを生成するための、入力ベクトル v
と 変換ベクトル w
のアダマール積を表します。
\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]
以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。
APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
APIの詳細はElementwiseProduct Java ドキュメント を参照してください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
APIの詳細はElementwiseProduct Python ドキュメント を参照してください。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
SQLTransformer
SQLTransformer
はSQL文によって定義される変換を実装します。現在のところ、以下のようなSQL構文だけをサポートします。"SELECT ... FROM __THIS__ ..."
where "__THIS__"
は入力データセットの基礎をなすテーブルを表します。select句は出力で表示するフィールド、定数および表現を指定します。Spark SQLがサポートするどのようなselect句もありえます。ユーザは選択されるカラム上で操作するSpark SQLのビルトイン関数とUDFも使うことができます。例えば、SQLTransformer
は以下のような文をサポートします:
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例
id
、v1
および category
のカラムを持つ以下のデータフレームを仮定します:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
これは"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
のSQLTransformer
の出力です :
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
APIの詳細はSQLTransformer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
APIの詳細はSQLTransformer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
APIの詳細はSQLTransformer Python ドキュメント を参照してください。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
VectorAssembler
VectorAssembler
は指定されたカラムのリストを一つのベクトルのカラムに合成する変換器です。ロジスティック回帰や決定木のようなMLモデルを訓練するためには、生の特徴と、異なる特徴変換器によって生成された特徴を、1つの特徴ベクトルに結合すると便利です。VectorAssembler
は以下の入力カラムタイプを受け入れます: 全ての数値型、真偽値、およびベクトル型。各行内で入力カラムの値は指定された順でベクトルに結合されます。
例
カラム id
, hour
, mobile
, userFeatures
および clicked
を持つデータフレームがあると仮定します:
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。hour
, mobile
および userFeatures
を features
と呼ばれる一つの特徴ベクトルに結合し、それを使ってclicked
かそうでないかを予想するために使いたいとします。VectorAssembler
の入力カラムを hour
, mobile
および userFeatures
に設定し、出力カラムをfeatures
に設定した場合、変換後に以下のデータフレームを取得するはずです:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
APIの詳細はVectorAssembler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
APIの詳細はVectorAssembler Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println(output.select("features", "clicked").first());
APIの詳細はVectorAssembler Python ドキュメント を参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())
QuantileDiscretizer
QuantileDiscretizer
は連続する特徴を持つカラムを取り、ビンされたカテゴリの機能を持つカラムを出力します。ビンの数はnumBuckets
パラメータによって設定されます。ビンの範囲は近似アルゴリズムを使って選択されます(詳細な説明はapproxQuantile のドキュメントを見てください)。近似の精度はrelativeError
パラメータを使って制御することができます。0に設定した場合は、正確な変位量が計算されます(注意: 正確な変位量の計算は高価な操作です)。下限および上限のbinの境界は全ての実世界の値をカバーする-Infinity
と +Infinity
です。
例
id
, hour
のカラムを持つデータフレームがあるとします:
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
はDouble
型の連続する特徴です。連続する特徴を明確な1つに変化したいとします。numBuckets = 3
の場合、以下のデータフレームを取得するはずです:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
APIの詳細はQuantileDiscretizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()
APIの詳細はQuantileDiscretizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show();
APIの詳細はQuantileDiscretizer Python ドキュメント を参照してください。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
Feature Selectors
VectorSlicer
VectorSlicer
は特徴値ベクトルを取る変換器で、元の特徴値のsub配列を持つ新しい特徴ベクトルを出力します。ベクトルのカラムから特徴値を抽出するのに役立ちます。
VectorSlicer
は指定されたインデックスを持つベクトルカラムを受け付け、それらのインデックスを使って選択された値を持つ新しいベクトルカラムを出力します。インデックスには2つの種類があります
-
インデックスがベクトルを表す数値インデックス
setIndices()
; -
特徴値の名前がベクトルを表す文字列インデックス
setNames()
。これは実装がAttribute
の名前部分に一致するため、AttributeGroup
を持つためにベクトルカラムを必要とします。
数値および文字列の両方による指定が受け付けられます。更に、同時に数値と文字列による名前を使用することができます。少なくとも1つの特徴量が選択されなければなりません。特徴量の複製は許可されません。つまり選択されたインデックスと名前の間には重複は無いでしょう。特徴の名前が選択されると、もし空の入力属性に遭遇した場合に例外が投げられることに注意してください。
出力ベクトルは選択されたインデックスを最初(指定された順番)に、続いて選択された名前を次(指定された順番)に、整列するでしょう。
例
userFeatures
のカラムを持つデータフレームを持つと仮定します:
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。userFeatures
の最初のカラムはゼロでそれを削除し、最後の2つのカラムだけが選択されるようにしたいとします。VectorSlicer
はsetIndices(1, 2)
を持つ最後の2つの要素を選択し、features
という名前の新しいベクトルを生成します:
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
userFeatures
として ["f1", "f2", "f3"]
の可能性がある入力属性があるとした場合、それらを選択するためにsetNames("f2", "f3")
を使うことができます。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
APIの詳細はVectorSlicer Scala ドキュメント を参照してください。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(Row(Vectors.dense(-2.0, 2.3, 0.0)))
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
println(output.select("userFeatures", "features").first())
APIの詳細はVectorSlicer Java ドキュメント を参照してください。
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = new Attribute[]{
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Lists.newArrayList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
System.out.println(output.select("userFeatures", "features").first());
APIの詳細はVectorSlicer Python ドキュメント を参照してください。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3}),),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]),)])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
RFormula
RFormula
はR model formulaで指定されたカラムを選択します。現在のところ、‘~’, ‘.’, ‘:’, ‘+’ および ‘-‘ を含む R オペレータの制限された部分集合をサポートします。基本的なオペレータは以下の通りです:
~
目的のものと単語を分割します+
単語を結合します。“+ 0" は切片の削除を意味します-
単語を削除します。“- 1" は切片の削除を意味します:
相互作用 (数値の掛け算、あるいは2値化された分類の値).
目的のものを除く全てのカラム
a
とb
が double のカラムと仮定した場合、RFormula
の結果を説明するために以下の単純な例を使います:
y ~ a + b
はモデルy ~ w0 + w1 * a + w2 * b
を意味し、w0
は切片で、w1, w2
は係数です。y ~ a + b + a:b - 1
はモデルy ~ w1 * a + w2 * b + w3 * a * b
を意味し、w1, w2, w3
は係数です。
RFormula
は、特徴のカラムと、doubleあるいは文字列のカラムのベクトルを生成します。Rで線形回帰のために公式が使われるように、文字列入力カラムはone-hotエンコードされ、数値カラムはdoubleにキャストされるでしょう。ラベルのカラムが文字の種類の場合、それはStringIndexer
を使ってまず二倍に変換されるでしょう。データフレーム中にラベルのカラムが存在しない場合は、出力ラベルのカラムは公式内の指定された応答変数から生成されるでしょう。
例
id
, country
, hour
および clicked
のカラムを持つデータフレームがあるとします:
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
clicked ~ country + hour
の公式文字列を使ってRFormula
を使う場合は、これはcountry
および hour
に基づいて clicked
を予想したいことを意味し、変換後に以下のデータフレームを取得するでしょう:
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
APIの詳細はRFormula Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
APIの詳細はRFormula Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
APIの詳細はRFormula Python ドキュメント を参照してください。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
ChiSqSelector
ChiSqSelector
はカイ二乗特徴抽出を意味します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelectorはクラスから独立したカイ二乗検定に基づいて特徴を整列し、クラスのラベルが最も依存する一番上の特徴をフィルター(選択)します。これは最も予知力が高い特徴に明け渡すことに似ています。
例
id
, features
および clicked
のカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
numTopFeatures = 1
を持つ ChiSqSelector
を使う場合、ラベルclicked
に応じて、features
内の最後のカラムは最も有用な特徴として選択されます。
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
APIの詳細はChiSqSelector Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
result.show()
APIの詳細はChiSqSelector Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
result.show();
APIの詳細はChiSqSelector Python ドキュメント を参照してください。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
result.show()