特徴の抽出、変換および選択 - spark.ml
この章は以下のグループに大まかに分類される特徴のために動作するアルゴリズムを対象にします:
- 抽出: "raw"データから特徴を抽出
- 変換: スケーリング、変換、あるいは特徴の修正
- 選択: 特徴の大きなセットから下位集合を選択
目次
特徴抽出器
TF-IDF (ハッシュTFとIDF)
Term Frequency-Inverse Document Frequency (TF-IDF) は一般的なテキストの前処理のステップです。Spark MLでは、TF-IDFは2つの部分に分割されます: TF (+hashing) と IDF。
TF: HashingTF
は単語のセットを取り、それらのセットを固定長の特徴量ベクトルに設定する変換器
です。In text processing, a “set of terms” might be a bag of words. アルゴリズムは次元削減のためにhashing trickを使って Term Frequency (TF) counts を組み合わせます。
IDF: IDF
はデータセットに当てはまり IDFModel
を生成するEstimator
です。IDFModel
は特徴量ベクトル(一般的にHashingTF
から生成されます)を取り、各カラムをスケールします。直感的にそれはコープス内で頻繁に現れるカラムの重み付けを減らします。
Term Frequency と Inverse Document Frequency についての詳細はTF-IDFのMLlib ユーザガイド を参照してください。
以下のコードの断片で、文章のセットから始めます。各文章を Tokenizer
を使って単語に分割します。各文章 (bag of words)について、文章を特徴量ベクトルにハッシュするためにHashingTF
を使います。特徴量ベクトルをリスケールするために IDF
を使います; これは一般的にテキストを特徴として使用する場合にパフォーマンスを改善します。それから特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIについての詳細はHashingTF Scala ドキュメント およびIDF Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = sqlContext.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").take(3).foreach(println)
APIについての詳細はHashingTF Java ドキュメント およびIDF Java ドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(0, "I wish Java could use case classes"),
RowFactory.create(1, "Logistic regression models are neat")
));
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
DataFrame sentenceData = sqlContext.createDataFrame(jrdd, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
DataFrame wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
DataFrame featurizedData = hashingTF.transform(wordsData);
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
DataFrame rescaledData = idfModel.transform(featurizedData);
for (Row r : rescaledData.select("features", "label").take(3)) {
Vector features = r.getAs(0);
Double label = r.getDouble(1);
System.out.println(features);
System.out.println(label);
}
APIについての詳細はHashingTF Python ドキュメント およびIDF Python ドキュメントを参照してください。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = sqlContext.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
print(features_label)
Word2Vec
Word2Vec
は ドキュメントを表す単語の系列を取り、Word2VecModel
を訓練する Estimator
です。モデルは各単語をユニークな固定長のベクトルにマップします。Word2VecModel
は各ドキュメントをドキュメント内の全ての単語の平均を使ってベクトルに変換します; このベクトルは予想、ドキュメントの類似性の計算などのために特徴として使うことができます。詳細は Word2VecのMLlibユーザガイドを参照してください。
以下のコードの断片の中で、ドキュメントのセットを使って始めます。それぞれは単語の系列を表します。各ドキュメントのために、それを特徴量ベクトルに変換します。そして、この特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIの詳細はWord2Vec Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Word2Vec
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = sqlContext.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
APIの詳細はWord2Vec Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
));
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
DataFrame documentDF = sqlContext.createDataFrame(jrdd, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
DataFrame result = model.transform(documentDF);
for (Row r : result.select("result").take(3)) {
System.out.println(r);
}
APIの詳細はWord2Vec Python ドキュメント を参照してください。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = sqlContext.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for feature in result.select("result").take(3):
print(feature)
CountVectorizer
CountVectorizer
と CountVectorizerModel
はテキスト文章のコレクションをトークンのカウントのベクトルに変換することを目的としています。推測的な辞書を利用できない場合、CountVectorizer
は語彙を抽出するためにEstimator
としてつかうことができ、CountVectorizerModel
を生成します。モデルは語彙上のドキュメントのための希薄な表現を生成します。これはLDAのような他のアルゴリズムに渡すことができます。
合致プロセスの中で、CountVectorizer
はコープスに渡って単語の頻度が高い順に並べられた単語の中で最も長いvocabSize
を選択するでしょう。An optional parameter “minDF” also affect the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary.
例
id
と texts
のカラムを持つ以下のデータフレームを仮定します:
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts
中の各行はArray[String]タイプのドキュメントです。CountVectorizer
の合致は語彙(a, b, c) を持つCountVectorizerModel
を生成し、変換後の出力カラム"vector"は以下を含みます:
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
各ベクトルは語彙上のドキュメントのトークンの数を表します。
APIの詳細はCountVectorizer Scala ドキュメント と CountVectorizerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = sqlContext.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).select("features").show()
APIの詳細はCountVectorizer Java ドキュメント と CountVectorizerModel Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
));
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show();
特徴変換
Tokenizer
Tokenization は(文章のような)テキストを取るプロセスで、個々の(通常は単語の)用語に分解するプロセスです。単純なTokenizer クラスはこの機能を提供します。以下の例は文章を単語の列に分割する方法を説明します。
RegexTokenizer は正規表現(regex)一致に基づいた更に進んだtokenizationをすることができます。デフォルトでは、パラメータ"pattern"(regex, デフォルト: \s+)は入力テキストを分割するためのデリミタとして使われます。Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.
APIの詳細はTokenizer Scala ドキュメント と RegexTokenizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
val sentenceDataFrame = sqlContext.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
APIについての詳細はTokenizer Java ドキュメント と RegexTokenizer Java ドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
));
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
DataFrame sentenceDataFrame = sqlContext.createDataFrame(jrdd, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
DataFrame wordsDataFrame = tokenizer.transform(sentenceDataFrame);
for (Row r : wordsDataFrame.select("words", "label"). take(3)) {
java.util.List<String> words = r.getList(0);
for (String word : words) System.out.print(word + " ");
System.out.println();
}
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
APIについての詳細はTokenizer Python ドキュメント と RegexTokenizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
sentenceDataFrame = sqlContext.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
for words_label in wordsDataFrame.select("words", "label").take(3):
print(words_label)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
StopWordsRemover
Stop wordsは入力から除外されなければならない単語で、一般的にはその単語は頻繁に現れ、意味を持たないからです。
StopWordsRemover
は入力として文字列の系列(例えば、Tokenizerの出力)を取り、入力の系列から全てのstop wordを取りこぼします。stopwordのリストはstopWords
パラメータによって指定されます。デフォルトでstop wordsのリスト を提供し、新しくインスタンス化されたStopWordsRemover
インスタンス上でgetStopWords
を呼び出すことでアクセス可能です。真偽値パラメータ caseSensitive
は一致が大文字小文字を区別するかを示します(デフォルトはfalse)。
例
id
と raw
のカラムを持つ以下のデータフレームを仮定します:
id | raw
----|----------
0 | [I, saw, the, red, baloon]
1 | [Mary, had, a, little, lamb]
入力カラムとしてraw
を持ち、出力カラムとしてfiltered
を持つStopWordsRemover
を適用すると、以下を取得するでしょう:
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, baloon] | [saw, red, baloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
filtered
では、stop words “I”, “the”, “had” および “a” がフィルタされます。
APIの詳細はStopWordsRemover Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = sqlContext.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "baloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show()
APIの詳細はStopWordsRemover Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
));
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
DataFrame dataset = jsql.createDataFrame(rdd, schema);
remover.transform(dataset).show();
APIの詳細はStopWordsRemover Python ドキュメント を参照してください。
from pyspark.ml.feature import StopWordsRemover
sentenceData = sqlContext.createDataFrame([
(0, ["I", "saw", "the", "red", "baloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["label", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
$n$-gram
n-gramはなんらかの整数$n$のための$n$トークン(一般的に単語)の系列です。NGram
クラスは入力の特徴量を$n$-gramに変換するために使うことができます。
NGram
は入力として文字列の系列(例えば、Tokenizerの出力)を取ります。パラメータn
は各$n$-gramの単語の数を決定するために使われます。出力は、$n$個の連続した単語の空白で区切られた文字列によって表される各$n$-gramの系列からなるでしょう。もし入力系列がn
より少ない文字列からなる場合、何も出力されません。
APIの詳細はNGram Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = sqlContext.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("label", "words")
val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)
APIの詳細はNGram Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
));
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
DataFrame wordDataFrame = sqlContext.createDataFrame(jrdd, schema);
NGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams");
DataFrame ngramDataFrame = ngramTransformer.transform(wordDataFrame);
for (Row r : ngramDataFrame.select("ngrams", "label").take(3)) {
java.util.List<String> ngrams = r.getList(0);
for (String ngram : ngrams) System.out.print(ngram + " --- ");
System.out.println();
}
APIの詳細はNGram Python ドキュメント を参照してください。
from pyspark.ml.feature import NGram
wordDataFrame = sqlContext.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["label", "words"])
ngram = NGram(inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):
print(ngrams_label)
Binarizer
二分化は数値的な特徴の閾値化を二値 (0/1) の特徴にする処理です。
Binarizer
は、二分化のためのthreshold
と同様に、共通のパラメータ inputCol
とoutputCol
を取ります。閾値より大きな特徴値は1.0に2値化されます; 閾値より小さい値は0.0に2値化されます。
APIの詳細はBinarizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame: DataFrame = sqlContext.createDataFrame(data).toDF("label", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
val binarizedFeatures = binarizedDataFrame.select("binarized_feature")
binarizedFeatures.collect().foreach(println)
APIの詳細はBinarizer Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
));
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame continuousDataFrame = jsql.createDataFrame(jrdd, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
DataFrame binarizedDataFrame = binarizer.transform(continuousDataFrame);
DataFrame binarizedFeatures = binarizedDataFrame.select("binarized_feature");
for (Row r : binarizedFeatures.collect()) {
Double binarized_value = r.getDouble(0);
System.out.println(binarized_value);
}
APIの詳細はBinarizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Binarizer
continuousDataFrame = sqlContext.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["label", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedFeatures = binarizedDataFrame.select("binarized_feature")
for binarized_feature, in binarizedFeatures.collect():
print(binarized_feature)
PCA
PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. PCA クラスはPCAを使ってベクトルを低次元に投影するためにモデルを訓練します。以下の例は5次元の特徴値ベクトルを3次元の主要な成分に投影する方法を示します。
APIの詳細はPCA Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val pcaDF = pca.transform(df)
val result = pcaDF.select("pcaFeatures")
result.show()
APIの詳細はPCA Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
));
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
DataFrame result = pca.transform(df).select("pcaFeatures");
result.show();
APIの詳細はPCA Python ドキュメント を参照してください。
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
PolynomialExpansion
Polynomial expansionは特徴値を多項式空間に拡張する処理です。これは元の次元のn-次の組み合わせによって定式化されます。PolynomialExpansion クラスはこの機能を提供します。以下の例は特徴値を3次元の多項式空間に拡張する方法を示します。
APIの詳細はPolynomialExpansion Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.mllib.linalg.Vectors
val data = Array(
Vectors.dense(-2.0, 2.3),
Vectors.dense(0.0, 0.0),
Vectors.dense(0.6, -1.1)
)
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polynomialExpansion.transform(df)
polyDF.select("polyFeatures").take(3).foreach(println)
APIの詳細はPylynomialExpansion Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
RowFactory.create(Vectors.dense(-2.0, 2.3)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(0.6, -1.1))
));
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
DataFrame polyDF = polyExpansion.transform(df);
Row[] row = polyDF.select("polyFeatures").take(3);
for (Row r : row) {
System.out.println(r.get(0));
}
APIの詳細はPolynomialExpansion Python ドキュメント を参照してください。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.mllib.linalg import Vectors
df = sqlContext\
.createDataFrame([(Vectors.dense([-2.0, 2.3]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.6, -1.1]),)],
["features"])
px = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
print(expanded)
離散コサイン変換 (DCT)
離散コサイン変換 は時間領域の長さ$N$ リアル値の系列を他の周波数領域の長さ$N$ リアル値の系列に変換します。A DCT class provides this functionality, implementing the DCT-II and scaling the result by $1/\sqrt{2}$ such that the representing matrix for the transform is unitary. No shift is applied to the transformed sequence (e.g. the $0$th element of the transformed sequence is the $0$th DCT coefficient and not the $N/2$th).
APIの詳細はDCT Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.mllib.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(3)
APIの詳細はDCT Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
));
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
DataFrame dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(3);
StringIndexer
StringIndexer
はラベルの文字列カラムをラベルのインデックスのカラムにエンコードします。このインデックスは[0, numLabels)
の中にあり、ラベルの頻度によって並べられます。つまり、最も頻度の高いラベルはインデックス 0
です。入力カラムが数値の場合、それを文字列にキャストし、文字列値をインデックスします。Estimator
または Transformer
のようなダウンストリームパイプライン コンポーネントがこの文字列のインデックスラベルを利用する場合、コンポーネントの入力カラムをこの文字列のインデックスのカラム名に設定する必要があります。多くの場合において、setInputCol
を使って入力カラムを設定することができます。
例
id
と category
のカラムを持つ以下のデータフレームを仮定します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
は3つのラベルを持つ文字列のカラムです: “a”, “b”, and “c”. 入力カラムとしてcategory
を持ち、出力カラムとしてcategoryIndex
を持つ StringIndexer
を適用すると、以下を取得するでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
"a" は最も頻度が高いためインデックス0
を取得し、インデックス1
を持つ"c"とインデックス2
を持つ"b"が続きます。
Additionaly, there are two strategies regarding how StringIndexer
will handle
unseen labels when you have fit a StringIndexer
on one dataset and then use it
to transform another:
- 例外を投げる (これがデフォルトです)
- skip the row containing the unseen label entirely
例
前回の例に戻りますが、今回は以前定義したStringIndexer
を以下のデータセットに再利用します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
If you’ve not set how StringIndexer
handles unseen labels or set it to “error”, an exception will be thrown. しかし、setHandleInvalid("skip")
を呼んだ場合は、以下のデータセットが生成されるでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
"d"を含む行が現れないことに注意してください。
APIの詳細はStringIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StringIndexer
val df = sqlContext.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
APIの詳細はStringIndexer Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
DataFrame indexed = indexer.fit(df).transform(df);
indexed.show();
APIの詳細はStringIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import StringIndexer
df = sqlContext.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
IndexToString
Symmetrically to StringIndexer
, IndexToString
maps a column of label indices back to a column containing the original labels as strings. The common use case is to produce indices from labels with StringIndexer
, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString
. しかし、独自のラベルを提供しても良いです。
例
StringIndexer
の例を基礎にして、カラム id
と categoryIndex
を持つ以下のデータフレームを持つと仮定しましょう:
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
Applying IndexToString
with categoryIndex
as the input column,
originalCategory
as the output column, we are able to retrieve our original
labels (they will be inferred from the columns’ metadata):
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
APIの詳細はIndexToString Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{StringIndexer, IndexToString}
val df = sqlContext.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
APIの詳細はIndexToString Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
DataFrame indexed = indexer.transform(df);
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
DataFrame converted = converter.transform(indexed);
converted.select("id", "originalCategory").show();
APIの詳細はIndexToString Python ドキュメント を参照してください。
from pyspark.ml.feature import IndexToString, StringIndexer
df = sqlContext.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
converted.select("id", "originalCategory").show()
OneHotEncoder
One-hot encoding はラベルのインデックスのカラムを、最大1の値を持つ2値のベクトルにマップします。この変換により、ロジスティック回帰のような連続する特徴値を期待するアルゴリズムが categorical featuresを使うことができます。
APIの詳細はOneHotEncoder Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
val df = sqlContext.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
val encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec")
val encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
APIの詳細はOneHotEncoder Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
));
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
DataFrame indexed = indexer.transform(df);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCol("categoryIndex")
.setOutputCol("categoryVec");
DataFrame encoded = encoder.transform(indexed);
encoded.select("id", "categoryVec").show();
APIの詳細はOneHotEncoder Python ドキュメント を参照してください。
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = sqlContext.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
VectorIndexer
VectorIndexer
はVector
のデータセット内のカテゴリ特徴量のインデックスを手助けします。It can both automatically decide which features are categorical and convert original values to category indices. 厳密に言うと、以下のことを行います:
- タイプVectorとパラメータ
maxCategories
を取ります。 - Decide which features should be categorical based on the number of distinct values, where features with at most
maxCategories
are declared categorical. - 各分類特徴について0から始まる分類インデックスを計算します。
- 分類的な特徴をインデックスし、元の特徴値をインデックスに変換します。
分類的な特徴をインデックスすることにより、決定木およびツリーアンサンブルのようなアルゴリズムが分類的な特徴を適切に扱うことができ、パフォーマンスを改善します。
以下の例では、ラベル付けされた点のデータセットを読み、どの毒長が分類的であるかを決めるために VectorIndexer
を使います。分類的な特徴値をそれらのインデックスに変換します。この変換されたデータはカテゴリ特徴量を扱うDecisionTreeRegressor
のようなアルゴリズムに渡すことができます。
APIの詳細はVectorIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorIndexer
val data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
categoricalFeatures.mkString(", "))
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
APIの詳細はVectorIndexer Java ドキュメント を参照してください。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.DataFrame;
DataFrame data = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
DataFrame indexedData = indexerModel.transform(data);
indexedData.show();
APIの詳細はVectorIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import VectorIndexer
data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
平均器
Normalizer
は、各Vector
が単位ノルムを持つように正規化し、Vector
行のデータセットを変換するTransformer
です。パラメータ p
を取り、それは正規化に使われる p-norm を指定します。(デフォルトは $p = 2$ ) この正規化は入力データの標準化の役に立ち、学習アルゴリズムの挙動を改善します。
以下の例はlibsvm形式のデータセットをロードし、各行がunit $L^2$ ノルムおよび unit $L^\infty$ ノルムを持つように正規化する方法を実演します。
APIの詳細はNormalizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Normalizer
val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
lInfNormData.show()
APIの詳細はNormalizer Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.sql.DataFrame;
DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
DataFrame l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
DataFrame lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
APIの詳細はNormalizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Normalizer
dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
lInfNormData.show()
StandardScaler
StandardScaler
はVector
行のデータセットを変換し、各特長値が標準偏差1 および/あるいは 平均0を持つように正規化します。以下のパラメータを取ります:
withStd
: デフォルトはtrueデータを標準偏差1にスケールします。withMean
: デフォルトはfalse。スケーリングする前に平均を使ってデータを中心化します。dense出力を構築するため、これはsparse入力では動作せず、例外を発生するでしょう。
StandardScaler
は StandardScalerModel
を生成するためにデータセットにfit
するEstimator
です; これは概要の統計を計算することに該当します。モデルはデータセット中のVector
が単位標準偏差 および/あるいは ゼロ平均特徴 を持つように変換することができます。
特徴の標準偏差がゼロの場合、その特徴のためのVector
の中でデフォルトの0.0
値を返すだろうことに注意してください。
以下の例はlibsvm形式のデータセットをロードし、各特長値がunit 標準偏差を持つように正規化する方法を実演します。
APIの詳細はStandardScaler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細はStandardScaler Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.DataFrame;
DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
DataFrame scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はStandardScaler Python ドキュメント を参照してください。
from pyspark.ml.feature import StandardScaler
dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler
MinMaxScaler
は Vector
行のデータセットを変換し、各特長値を特定の範囲(よくあるのは[0, 1])に再スケールします。以下のパラメータを取ります:
min
: デフォルトは0.0。変換後の下限値は全ての特徴値によって共有されます。max
: デフォルトは1.0。変換後の上限値は全ての特徴値によって共有されます。
MinMaxScaler
はデータセットの総統計を計算し、MinMaxScalerModel
を生成します。モデルはそれぞれの特徴値を指定された領域に入るように変換することができます。
特徴Eの再スケールされた値は \begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}
のように計算されます。E_{max} == E_{min}
の場合は Rescaled(e_i) = 0.5 * (max + min)
ゼロの値はおそらく非ゼロ値に変換されるため、変換器の出力は薄い入力に対してもDenseVectorになるでしょう。
以下の例はlibsvm形式のデータセットをロードし、各特長値を [0, 1] に再スケールします。
APIの詳細はMinMaxScaler Scala ドキュメント と MinMaxScalerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.MinMaxScaler
val dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIについての詳細はMinMaxScaler Java ドキュメント と MinMaxScalerModel Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.sql.DataFrame;
DataFrame dataFrame = jsql.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
DataFrame scaledData = scalerModel.transform(dataFrame);
scaledData.show();
Bucketizer
Bucketizer
は連続する特徴値のカラムをユーザによって指定される特徴値のバケットに変換します。以下のパラメータを取ります:
splits
: 連続する特徴値をバケットにマップするためのパラメータ。n+1 分割の場合、nバケットになります。split x,y によって定義されたバケットは最後のバケットを除いて [x,y) の範囲の値を持ちます。最後のバケットはyも含みます。splits は厳密に増加しなければなりません。全てのdouble値の範囲をカバーするために、-inf, inf の値も明示的に提供されなければなりません; そうでなければ、指定されたsplit外の値はエラーとして扱われるでしょう。splits
の2つの例としてArray(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
とArray(0.0, 1.0, 2.0)
があります。
目的とするカラムの上限と下限に何も意図が無い場合は、splitのBucketizer境界例外の可能性を避けるために境界としてDouble.NegativeInfinity
と Double.PositiveInfinity
を追加したほうが良いでしょう。
指定するsplitは厳密に増加する順番でなければならないことにも注意してください。つまりs0 < s1 < s2 < ... < sn
。
Bucketizerについての詳細はAPIドキュメントの中で見つかります。
以下の例はDouble
のカラムを他のインデックス化されたカラムにバケット化する方法を実演します。
APIの詳細はBucketizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = sqlContext.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
APIの詳細はBucketizer Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2)
));
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame dataFrame = jsql.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
DataFrame bucketedData = bucketizer.transform(dataFrame);
bucketedData.show();
APIの詳細はBucketizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = sqlContext.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()
ElementwiseProduct
ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。This represents the Hadamard product between the input vector, v
and transforming vector, w
, to yield a result vector.
\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]
以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。
APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = sqlContext.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
APIの詳細はElementwiseProduct Java ドキュメント を参照してください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
));
List<StructField> fields = new ArrayList<StructField>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame = sqlContext.createDataFrame(jrdd, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
APIの詳細はElementwiseProduct Python ドキュメント を参照してください。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = sqlContext.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
transformer.transform(df).show()
SQLTransformer
SQLTransformer
はSQL文によって定義される変換を実装します。Currently we only support SQL syntax like "SELECT ... FROM __THIS__ ..."
where "__THIS__"
represents the underlying table of the input dataset. select句は出力で表示するフィールド、定数および表現を指定します。Spark SQLがサポートするどのようなselect句もありえます。ユーザは選択されるカラム上で操作するSpark SQLのビルトイン関数とUDFも使うことができます。例えば、SQLTransformer
は以下のような文をサポートします:
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例
id
、v1
および category
のカラムを持つ以下のデータフレームを仮定します:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
これは"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
のSQLTransformer
の出力です :
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
APIの詳細はSQLTransformer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.SQLTransformer
val df = sqlContext.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
APIの詳細はSQLTransformer Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
));
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
APIの詳細はSQLTransformer Python ドキュメント を参照してください。
from pyspark.ml.feature import SQLTransformer
df = sqlContext.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
VectorAssembler
VectorAssembler
は指定されたカラムのリストを一つのベクトルのカラムに合成する変換器です。ロジスティック回帰や決定木のようなMLモデルを訓練するためには、生の特徴と、異なる特徴変換器によって生成された特徴を、1つの特徴ベクトルに結合すると便利です。VectorAssembler
は以下の入力カラムタイプを受け入れます: 全ての数値型、真偽値、およびベクトル型。各行内で入力カラムの値は指定された順でベクトルに結合されます。
例
カラム id
, hour
, mobile
, userFeatures
および clicked
を持つデータフレームがあると仮定します:
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。hour
, mobile
および userFeatures
を features
と呼ばれる一つの特徴ベクトルに結合し、それを使ってclicked
かそうでないかを予想するために使いたいとします。VectorAssembler
の入力カラムを hour
, mobile
および userFeatures
に設定し、出力カラムをfeatures
に設定した場合、変換後に以下のデータフレームを取得するはずです:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
APIの詳細はVectorAssembler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
val dataset = sqlContext.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
APIの詳細はVectorAssembler Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(row));
DataFrame dataset = sqlContext.createDataFrame(rdd, schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
DataFrame output = assembler.transform(dataset);
System.out.println(output.select("features", "clicked").first());
APIの詳細はVectorAssembler Python ドキュメント を参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = sqlContext.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())
QuantileDiscretizer
QuantileDiscretizer
takes a column with continuous features and outputs a column with binned
categorical features.
The bin ranges are chosen by taking a sample of the data and dividing it into roughly equal parts.
The lower and upper bin bounds will be -Infinity
and +Infinity
, covering all real values.
This attempts to find numBuckets
partitions based on a sample of the given input data, but it may
find fewer depending on the data sample values.
背後の標本ストラテジは決定的ではないため、実行するたびに異なる値を得ることに注意してください。
例
id
, hour
のカラムを持つデータフレームがあるとします:
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
はDouble
型の連続する特徴です。連続する特徴を明確な1つに変化したいとします。numBuckets = 3
の場合、以下のデータフレームを取得するはずです:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
APIの詳細はQuantileDiscretizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = sc.parallelize(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()
APIの詳細はQuantileDiscretizer Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(
Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
DataFrame result = discretizer.fit(df).transform(df);
result.show();
Feature Selectors
VectorSlicer
VectorSlicer
は特徴値ベクトルを取る変換器で、元の特徴値のsub配列を持つ新しい特徴ベクトルを出力します。ベクトルのカラムから特徴値を抽出するのに役立ちます。
VectorSlicer
は指定されたインデックスを持つベクトルカラムを受け付け、それらのインデックスを使って選択された値を持つ新しいベクトルカラムを出力します。インデックスには2つの種類があります
-
インデックスがベクトルを表す数値インデックス
setIndices()
; -
特徴値の名前がベクトルを表す文字列インデックス
setNames()
。これは実装がAttribute
の名前部分に一致するため、AttributeGroup
を持つためにベクトルカラムを必要とします。
数値および文字列の両方による指定が受け付けられます。更に、同時に数値と文字列による名前を使用することができます。少なくとも1つの特徴量が選択されなければなりません。特徴量の複製は許可されません。つまり選択されたインデックスと名前の間には重複は無いでしょう。特徴量の名前が選択された場合、空の入力属性に遭遇した場合に例外が投げられるだろうことに注意してください。
出力ベクトルは選択されたインデックスを最初(指定された順番)に、続いて選択された名前を次(指定された順番)に、整列するでしょう。
例
userFeatures
のカラムを持つデータフレームを持つと仮定します:
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。userFeatures
の最初のカラムはゼロでそれを削除し、最後の2つのカラムだけが選択されるようにしたいとします。VectorSlicer
はsetIndices(1, 2)
を持つ最後の2つの要素を選択し、features
という名前の新しいベクトルを生成します:
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
Suppose also that we have a potential input attributes for the userFeatures
, i.e.
["f1", "f2", "f3"]
, then we can use setNames("f2", "f3")
to select them.
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
APIの詳細はVectorSlicer Scala ドキュメント を参照してください。
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
val data = Array(Row(Vectors.dense(-2.0, 2.3, 0.0)))
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataRDD = sc.parallelize(data)
val dataset = sqlContext.createDataFrame(dataRDD, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
println(output.select("userFeatures", "features").first())
APIの詳細はVectorSlicer Java ドキュメント を参照してください。
import com.google.common.collect.Lists;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = new Attribute[]{
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
JavaRDD<Row> jrdd = jsc.parallelize(Lists.newArrayList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
));
DataFrame dataset = jsql.createDataFrame(jrdd, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
DataFrame output = vectorSlicer.transform(dataset);
System.out.println(output.select("userFeatures", "features").first());
RFormula
RFormula
はR model formulaで指定されたカラムを選択します。それは特徴量のベクトルカラムと2つのラベルのカラムを生成します。Rで線形回帰のために公式が使われるように、文字列入力カラムはone-hotエンコードされ、数値カラムはdoubleにキャストされるでしょう。DataFrameに既に存在しない場合は、出力ラベルカラムは公式内の指定された応答変数から生成されるでしょう。
例
id
, country
, hour
および clicked
のカラムを持つデータフレームがあるとします:
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
clicked ~ country + hour
の公式文字列を使ってRFormula
を使う場合は、これはcountry
および hour
に基づいて clicked
を予想したいことを意味し、変換後に以下のデータフレームを取得するでしょう:
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
APIの詳細はRFormula Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.RFormula
val dataset = sqlContext.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
APIの詳細はRFormula Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
JavaRDD<Row> rdd = jsc.parallelize(Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
));
DataFrame dataset = sqlContext.createDataFrame(rdd, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
DataFrame output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
APIの詳細はRFormula Python ドキュメント を参照してください。
from pyspark.ml.feature import RFormula
dataset = sqlContext.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
ChiSqSelector
ChiSqSelector
はカイ二乗特徴抽出を意味します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelectorはクラスから独立したカイ二乗検定に基づいて特徴を整列し、クラスのラベルが最も依存する一番上の特徴をフィルター(選択)します。これは最も予知力が高い特徴に明け渡すことに似ています。
例
id
, features
および clicked
のカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
If we use ChiSqSelector
with a numTopFeatures = 1
, then according to our label clicked
the
last column in our features
chosen as the most useful feature:
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
APIの詳細はChiSqSelector Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = sc.parallelize(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
result.show()
APIの詳細はChiSqSelector Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.mllib.linalg.VectorUDT;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
));
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
DataFrame df = sqlContext.createDataFrame(jrdd, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
DataFrame result = selector.fit(df).transform(df);
result.show();