特徴の抽出、変換および選択
この章は以下のグループに大まかに分類される特徴のために動作するアルゴリズムを対象にします:
- 抽出: "raw"データから特徴を抽出
- 変換: スケーリング、変換、あるいは特徴の修正
- 選択: 特徴の大きなセットから下位集合を選択
- 局所性鋭敏型ハッシュ (LSH): このアルゴリズムのクラスは特徴変換の様相を他のアルゴリズムと組み合わせます。
目次
- 特徴抽出器
- 特徴変換
- Tokenizer
- StopWordsRemover
- $n$-gram
- Binarizer
- PCA
- PolynomialExpansion
- 離散コサイン変換 (DCT)
- StringIndexer
- IndexToString
- OneHotEncoder
- VectorIndexer
- Interaction
- 平均器
- StandardScaler
- RobustScaler
- MinMaxScaler
- MaxAbsScaler
- Bucketizer
- ElementwiseProduct
- SQLTransformer
- VectorAssembler
- VectorSizeHint
- QuantileDiscretizer
- Imputer
- Feature Selectors
- 一貫性のあるハッシュ
特徴抽出器
TF-IDF
索引語頻度の逆出現頻度 (TF-IDF) はテキストマイニングで単語の重要度をコープス内のドキュメントに反映するために広く使われている特徴ベクトル化です。$t$
によって単語を、$d$
によってドキュメントを、$D$
でコープスを示します。索引語頻度 $TF(t, d)$
は単語 $t$
がドキュメント $d$
に現れる数で、ドキュメントの文章頻度 $DF(t, D)$
は単語 $t$
を含むドキュメントの数です。重要度を測定するために索引語頻度だけを使う場合、頻繁に現れる単語を過度に強調しすぎますが、ドキュメントについてほとんど情報を運びません。例えば "a"、"the" および "of"。コープスを横断してしばしば単語が現れる場合、それは特定のドキュメントに関する特別な情報を運ばないことを意味します。逆出現頻度はどれだけの情報を単語が提供するかの数値的な指標です: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
ここで $|D|$
はコープス内のドキュメントの総数です。対数が使われるため、全てのドキュメントで単語が現れる場合、IDFは0になります。補正単語はコープス外の単語のためにゼロで割られることを避けるために適用されることに注意してください。TF-IDF 指標は単純にTF と IDFの産物です: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
索引語頻度と文章頻度の定義上の幾つかの変数があります。MLlibではそれらを柔軟にするためにTFとIDFを分割します。
TF: HashingTF
とCountVectorizer
の両方は、単語の頻度ベクトルを生成するために使うことができます。
HashingTF
は単語のセットを取り、それらのセットを固定長の特徴量ベクトルに設定する変換器
です。テキスト処理において、"単語のセット"は bag of words です。HashingTF
は ハッシュのトリックを使います。生の特徴はハッシュ関数を適用することでインデックス(単語)にマップされます。ここで使われるハッシュ関数はMurmurHash 3です。そして、索引語頻度はマップされたインデックスに基づいて計算されます。このやり方はグローバルな単語からインデックスへのマップの計算の必要性を避けます。これは大きなコープスには高くつくでしょうが、潜在的なハッシュの衝突の影響を受けます。生の特徴の違いはハッシュの後で同じ単語になるかも知れません。衝突の可能性を減らすために、目標の特徴次元を増やすことができます。つまり、ハッシュテーブルのバケットの数です。ハッシュ化された値の単純なmoduloはベクトル インデックスを決定するために使われるため、特徴次元として二乗を使うことは当を得ています。そうでなければ、機能はベクトル インデックスに等しくマップされないでしょう。デフォルトの特徴次元は $2^{18} = 262,144$
です。任意の二値トグルパラメータは単語の頻度のカウントを制御します。trueに設定された場合は、全ての非ゼロの頻度のカウントは1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。
CountVectorizer
はテキストドキュメントを単語のカウントのベクトルに変換します。詳細はCountVectorizer を参照してください。
IDF: IDF
はデータセットに当てはまり IDFModel
を生成するEstimator
です。IDFModel
は特徴量ベクトル(一般的にHashingTF
あるいはCountVectorizer
から生成されます)を取り、各特徴をスケールします。直感的にそれはコープス内で頻繁に現れる特徴の重み付けを減らします。
注意: spark.ml
はテキストの文節化のためのツールを提供しません。ユーザはStanford NLP Group および scalanlp/chalk を参照してください。
例
以下のコードの断片で、文章のセットから始めます。各文章を Tokenizer
を使って単語に分割します。各文章 (bag of words)について、文章を特徴量ベクトルにハッシュするためにHashingTF
を使います。特徴量ベクトルをリスケールするために IDF
を使います; これは一般的にテキストを特徴として使用する場合にパフォーマンスを改善します。それから特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIについての詳細はHashingTF Scala ドキュメント およびIDF Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
APIについての詳細はHashingTF Java ドキュメント およびIDF Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
APIについての詳細はHashingTF Python ドキュメント およびIDF Python ドキュメントを参照してください。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
Word2Vec
Word2Vec
は ドキュメントを表す単語の系列を取り、Word2VecModel
を訓練する Estimator
です。モデルは各単語をユニークな固定長のベクトルにマップします。Word2VecModel
は各ドキュメントをドキュメント内の全ての単語の平均を使ってベクトルに変換します; このベクトルは予想、ドキュメントの類似性の計算などのために特徴として使うことができます。詳細は Word2VecのMLlibユーザガイドを参照してください。
例
以下のコードの断片の中で、ドキュメントのセットを使って始めます。それぞれは単語の系列を表します。各ドキュメントのために、それを特徴量ベクトルに変換します。そして、この特徴量ベクトルは学習アルゴリズムに渡されることができます。
APIの詳細はWord2Vec Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
APIの詳細はWord2Vec Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row row : result.collectAsList()) {
List<String> text = row.getList(0);
Vector vector = (Vector) row.get(1);
System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
APIの詳細はWord2Vec Python ドキュメント を参照してください。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
CountVectorizer
CountVectorizer
と CountVectorizerModel
はテキスト文章のコレクションをトークンのカウントのベクトルに変換することを目的としています。推測的な辞書を利用できない場合、CountVectorizer
は語彙を抽出するためにEstimator
としてつかうことができ、CountVectorizerModel
を生成します。モデルは語彙上のドキュメントのための希薄な表現を生成します。これはLDAのような他のアルゴリズムに渡すことができます。
合致プロセスの中で、CountVectorizer
はコープスに渡って単語の頻度が高い順に並べられた単語の中で最も長いvocabSize
を選択するでしょう。語彙に含まれるべき単語がドキュメントに現れる最小の数(断片が<1.0)を指定することで、任意のパラメータ minDF
も合致プロセスに影響します。他の任意の二値トグルのパラメータが出力ベクトルを制御します。trueに設定されると全ての非ゼロのカウントが1に設定されます。これは、整数カウントよりも二値をモデルとする離散確率モデルを利用するのに特に便利です。
例
id
と texts
のカラムを持つ以下のデータフレームを仮定します:
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts
の中の各行はArray[String] 型のドキュメントです。CountVectorizer
の適合の開始は語彙(a, b, c)を持つ CountVectorizerModel
を生成します。そして、変換後のカラム“vector” は以下を含みます:
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
各ベクトルは語彙上のドキュメントのトークンの数を表します。
APIの詳細はCountVectorizer Scala ドキュメント と CountVectorizerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).show(false)
APIの詳細はCountVectorizer Java ドキュメント と CountVectorizerModel Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show(false);
APIの詳細はCountVectorizer Python ドキュメント と CountVectorizerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
FeatureHasher
特徴ハッシュはカテゴリあるいは数値特徴量を指定された次元の特徴ベクトルに写像します (大抵は実質的に元の特徴空間のものより小さいです)。これは特徴を特徴空間内の指標にマップするハッシュの策略を使って行われます。
FeatureHasher
変換器は複数のカラムを操作します。各カラムは数値あるいはカテゴリ特徴のどちらかを含むかもしれません。カラムデータ型の挙動および処理は以下の通りです:
- 数値カラム: 数値特徴量については、カラム名のハッシュ値が特徴値を特徴ベクトル内のインデックスへマップするために使われます。デフォルトでは、数値特徴量は(もしそれが整数値の時でも)カテゴリとして扱われません。それらをカテゴリとして扱うためには、
categoricalCols
パラメータを使って適切なカラムを指定してください。 - 文字列カラム: カテゴリ特徴量については、文字列 “column_name=value” のハッシュ値が
1.0
の指標値を持つベクトルインデックスへマップするために使われます。従って、カテゴリ特徴量は “one-hot” エンコードされます (dropLast=false
のOneHotEncoderの使用と似ています)。 - Boolean カラム: Boolean値は文字列カラムと同じ方法で扱われます。つまり、boolean特徴は “column_name=true” あるいは “column_name=false” として、
1.0
の指標として表現されます。
Null (missing) 値は無視されます (結果の特徴ベクトルが暗黙的に0)。
ここで使われるハッシュ関数もHashingTFで使われるMurmurHash 3です。ハッシュ化された値の単純なmoduloはベクトル インデックスを決定するために使われるため、numFeatures パラメータとして二乗を使うことは当を得ています。そうでなければ、機能はベクトル インデックスに等しくマップされないでしょう。
例
4つの入力カラム real
, bool
, stringNum
および string
を持つデータフレームを持っていると仮定します。入力としてのこれらの異なるデータ型は特徴ベクトルのカラムを生成するための変換の挙動を表すでしょう。
real| bool|stringNum|string
----|-----|---------|------
2.2| true| 1| foo
3.3|false| 2| bar
4.4|false| 3| baz
5.5|false| 4| foo
そして、このデータフレーム上のFeatureHasher.transform
の出力は:
real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1 |foo |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2 |bar |(262144,[6031, 80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3 |baz |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4 |foo |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])
結果の特徴量ベクトルは学習アルゴリズムに渡すことができます。
APIの詳細は FeatureHasher Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.FeatureHasher
val dataset = spark.createDataFrame(Seq(
(2.2, true, "1", "foo"),
(3.3, false, "2", "bar"),
(4.4, false, "3", "baz"),
(5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
val featurized = hasher.transform(dataset)
featurized.show(false)
APIの詳細はFeatureHasher Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(2.2, true, "1", "foo"),
RowFactory.create(3.3, false, "2", "bar"),
RowFactory.create(4.4, false, "3", "baz"),
RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
FeatureHasher hasher = new FeatureHasher()
.setInputCols(new String[]{"real", "bool", "stringNum", "string"})
.setOutputCol("features");
Dataset<Row> featurized = hasher.transform(dataset);
featurized.show(false);
APIの詳細はFeatureHasher Python ドキュメント を参照してください。
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
特徴変換
Tokenizer
Tokenization は(文章のような)テキストを取るプロセスで、個々の(通常は単語の)用語に分解するプロセスです。単純なTokenizer クラスはこの機能を提供します。以下の例は文章を単語の列に分割する方法を説明します。
RegexTokenizer は正規表現(regex)一致に基づいた更に進んだtokenizationをすることができます。デフォルトでは、パラメータ"pattern"(regex, デフォルト: "\s+"
)は入力テキストを分割するためのデリミタとして使われます。もう一つの方法として、ユーザはパラメータ "gaps"をfalseに設定して、正規表現"pattern"がgapを分割するのではなく"tokens"を意味するようにして、全ての適合の存在をトークン化の結果として見つけることができます。
例
APIの詳細はTokenizer Scala ドキュメント と RegexTokenizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
APIについての詳細はTokenizer Java ドキュメント と RegexTokenizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import scala.collection.mutable.Seq;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register(
"countTokens", (Seq> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
APIについての詳細はTokenizer Python ドキュメント と RegexTokenizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
StopWordsRemover
Stop wordsは入力から除外されなければならない単語で、一般的にはその単語は頻繁に現れ、意味を持たないからです。
StopWordsRemover
は入力として文字列の系列(例えば、Tokenizerの出力)を取り、入力の系列から全てのstop wordを取りこぼします。stopwordのリストはstopWords
パラメータによって指定されます。StopWordsRemover.loadDefaultStopWords(language)
を呼び出すことで幾つかの言語のデフォルトの停止単語が利用可能です。利用可能なオプションは "danish", "dutch", "english", "finnish", "french", "german", "hungarian", "italian", "norwegian", "portuguese", "russian", "spanish", "swedish" および "turkish" です。真偽値パラメータ caseSensitive
は一致が大文字小文字を区別するかを示します(デフォルトはfalse)。
例
id
と raw
のカラムを持つ以下のデータフレームを仮定します:
id | raw
----|----------
0 | [I, saw, the, red, balloon]
1 | [Mary, had, a, little, lamb]
入力カラムとしてraw
を持ち、出力カラムとしてfiltered
を持つStopWordsRemover
を適用すると、以下を取得するでしょう:
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, balloon] | [saw, red, balloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
filtered
では、stop words "I", "the", "had" および "a" がフィルタされます。
APIの詳細はStopWordsRemover Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show(false)
APIの詳細はStopWordsRemover Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
APIの詳細はStopWordsRemover Python ドキュメント を参照してください。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
$n$-gram
n-gramはなんらかの整数$n$のための$n$トークン(一般的に単語)の系列です。NGram
クラスは入力の特徴量を$n$-gramに変換するために使うことができます。
NGram
は入力として文字列の系列(例えば、Tokenizerの出力)を取ります。パラメータn
は各$n$-gramの単語の数を決定するために使われます。出力は、$n$個の連続した単語の空白で区切られた文字列によって表される各$n$-gramの系列からなるでしょう。もし入力系列がn
より少ない文字列からなる場合、何も出力されません。
例
APIの詳細はNGram Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")
val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
APIの詳細はNGram Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
APIの詳細はNGram Python ドキュメント を参照してください。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
Binarizer
二値化は数値的な特徴の閾値化を二値 (0/1) の特徴にする処理です。
Binarizer
は、二値化のためのthreshold
と同様に、共通のパラメータ inputCol
とoutputCol
を取ります。閾値より大きな特徴値は1.0に2値化されます; 閾値より小さい値は0.0に2値化されます。Vector と Double の両方の種類がinputCol
のためにサポートされます。
例
APIの詳細はBinarizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
APIの詳細はBinarizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
APIの詳細はBinarizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
PCA
PCA は直交変換をおそらく相互関係のある変数の観測のセットを主要なコンポーネントと呼ばれる線形的な相互関係の無い変数の値のセットに変換するために使用する統計学上の手法です。PCA クラスはPCAを使ってベクトルを低次元に投影するためにモデルを訓練します。以下の例は5次元の特徴値ベクトルを3次元の主要な成分に投影する方法を示します。
例
APIの詳細はPCA Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val result = pca.transform(df).select("pcaFeatures")
result.show(false)
APIの詳細はPCA Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
APIの詳細はPCA Python ドキュメント を参照してください。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
PolynomialExpansion
Polynomial expansionは特徴値を多項式空間に拡張する処理です。これは元の次元のn-次の組み合わせによって定式化されます。PolynomialExpansion クラスはこの機能を提供します。以下の例は特徴値を3次元の多項式空間に拡張する方法を示します。
例
APIの詳細はPolynomialExpansion Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(2.0, 1.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polyExpansion.transform(df)
polyDF.show(false)
APIの詳細はPylynomialExpansion Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 1.0)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
APIの詳細はPolynomialExpansion Python ドキュメント を参照してください。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
離散コサイン変換 (DCT)
離散コサイン変換 は時間領域の長さ$N$ リアル値の系列を他の周波数領域の長さ$N$ リアル値の系列に変換します。DCT クラスはこの機能を提供し、DCT-II を実装し、変換のための表現マトリックスが単位元になるように結果に $1/\sqrt{2}$ の倍率を掛けます。変換される手順には移動は適用されません(たとえば、変換手順の $0$th の要素は、DCTの $0$th の係数で $N/2$th ではありません。
例
APIの詳細はDCT Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
APIの詳細はDCT Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(false);
APIの詳細はDCT Python ドキュメント を参照してください。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
StringIndexer
StringIndexer
はラベルの文字列カラムをラベルのインデックスのカラムにエンコードします。StringIndexer
は複数のカラムをエンコードします。インデックスは [0, numLabels)
の中にあり、4つの順番オプションがサポートされます: “frequencyDesc”: ラベルの頻度の降順 (最も頻度の多いラベルは0に割り当てられます)、“frequencyAsc”: ラベルの頻度の昇順 (最も頻度の少ないラベルは0に割り当てられます)、 “alphabetDesc”: アルファベットの降順、そして “alphabetAsc”: アルファベットの昇順 (default = “frequencyDesc”)。“frequencyDesc”/”frequencyAsc”で頻度が等しい場合は、文字列はさらにアルファベット順にソートされます。
ユーザがまだ見たことが無いラベルを維持することを選択した場合、それらはインデックス numLabels に格納されるでしょう。入力カラムが数値の場合、それを文字列にキャストし、文字列値をインデックスします。Estimator
または Transformer
のようなダウンストリームパイプライン コンポーネントがこの文字列のインデックスラベルを利用する場合、コンポーネントの入力カラムをこの文字列のインデックスのカラム名に設定する必要があります。多くの場合において、setInputCol
を使って入力カラムを設定することができます。
例
id
と category
のカラムを持つ以下のデータフレームを仮定します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
は3つのラベルを持つ文字列のカラムです: “a", “b", and “c". 入力カラムとしてcategory
を持ち、出力カラムとしてcategoryIndex
を持つ StringIndexer
を適用すると、以下を取得するでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
"a" は最も頻度が高いためインデックス0
を取得し、インデックス1
を持つ"c"とインデックス2
を持つ"b"が続きます。
さらに、StringIndexer
を1つのデータセットに適合し、それを他のものを変換するのに使う場合、どのようにStringIndexer
が未発見のラベルを扱うかについて、3つの戦略があります。
- 例外を投げる (これがデフォルトです)
- 現在未発見のラベルを含んでいる行を完全にスキップする
- 未発見のラベルをインデックス numLabels の特別な追加のバケットに入れる
例
前回の例に戻りますが、今回は以前定義したStringIndexer
を以下のデータセットに再利用します:
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
4 | e
StringIndexer
がどのように未発見のラベルを扱うかを設定していないか、それを"error"に設定した場合は、例外が投げられるでしょう。しかし、setHandleInvalid("skip")
を呼んだ場合は、以下のデータセットが生成されるでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
“d” あるいは “e” を含む行は現れないことに注意してください。
setHandleInvalid("keep")
を呼び出す場合、以下のデータセットが生成されるでしょう:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | d | 3.0
4 | e | 3.0
“d” あるいは “e” を含む行がインデックス“3.0” にマップされることに注意してください。
APIの詳細はStringIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
APIの詳細はStringIndexer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
APIの詳細はStringIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
IndexToString
StringIndexer
と対象的に、IndexToString
はラベルのインデックスのカラムを元のラベルのを含むカラムの文字列としてマップし直します。一般的な使い方は、StringIndexer
を使ってラベルからインデックスを生成し、それらのインデックスを使ってモデルを訓練し、IndexToString
を使って予測されたインデックスのカラムから元のラベルを取り出します。しかし、自由に独自のラベルを提供しても良いです。
例
StringIndexer
の例を基礎にして、カラム id
と categoryIndex
を持つ以下のデータフレームを持つと仮定しましょう:
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
入力カラムとして categoryIndex
、出力カラムとして originalCategory
と一緒に IndexToString
を適用を適用すると、元のラベルを取り出すことができます(それらはカラムのメタデータから推測されるでしょう)。
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
APIの詳細はIndexToString Scala ドキュメント を参照してください。
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
println(s"Transformed string column '${indexer.getInputCol}' " +
s"to indexed column '${indexer.getOutputCol}'")
indexed.show()
val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
s"${Attribute.fromStructField(inputColSchema).toString}\n")
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
APIの詳細はIndexToString Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
"to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();
StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
Attribute.fromStructField(inputColSchema).toString() + "\n");
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
"original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
APIの詳細はIndexToString Python ドキュメント を参照してください。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
OneHotEncoder
One-hot encoding は、全ての特徴値のセットから特定の特徴の存在を表す1つの最大1の値を持つ2値のベクトルに、ラベルのインデックスとして表されるカテゴリ特徴をマップします。この変換により、ロジスティック回帰のような連続する特徴値を期待するアルゴリズムが categorical featuresを使うことができます。文字列型の入力データについては、最初にStringIndexer を使ってカテゴリ特徴へ符号化するのが一般的です。
OneHotEncoder
は、各入力カラムについて one-hot-encoded 出力ベクトルを返しながら、複数のカラムを変換することができます。これらのベクトルをVectorAssembler を使って1つの特徴ベクトルにマージするのが一般的です。
OneHotEncoder
はデータの変換時に無効な入力をどうやって扱うかを選択するためにhandleInvalid
パラメータをサポートします。利用可能なオプションは ‘keep’ (全ての無効な入力は特別なカテゴリインデックスに割り当てられます) と ‘error’ (エラーを投げる)です。
例
APIの詳細はOneHotEncoder Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.OneHotEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)
val encoded = model.transform(df)
encoded.show()
APIの詳細はOneHotEncoder Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0),
RowFactory.create(1.0, 0.0),
RowFactory.create(2.0, 1.0),
RowFactory.create(0.0, 2.0),
RowFactory.create(0.0, 1.0),
RowFactory.create(2.0, 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});
OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
APIの詳細はOneHotEncoder Python ドキュメント を参照してください。
from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
VectorIndexer
VectorIndexer
はVector
のデータセット内のカテゴリ特徴量のインデックスを手助けします。それは自動的にどの機能が絶対的かを決め、元の値を分類のインデックスに変換します。厳密に言うと、以下のことを行います:
- タイプVectorとパラメータ
maxCategories
を取ります。 - 明確な値の数に基づいてどの特徴が絶対的であるかを決定します。ここで最も大きい
maxCategories
を持つ特徴が絶対的であると明らかにされます。 - 各分類特徴について0から始まる分類インデックスを計算します。
- 分類的な特徴をインデックスし、元の特徴値をインデックスに変換します。
分類的な特徴をインデックスすることにより、決定木およびツリーアンサンブルのようなアルゴリズムが分類的な特徴を適切に扱うことができ、パフォーマンスを改善します。
例
以下の例では、ラベル付けされた点のデータセットを読み、どの毒長が分類的であるかを決めるために VectorIndexer
を使います。分類的な特徴値をそれらのインデックスに変換します。この変換されたデータはカテゴリ特徴量を扱うDecisionTreeRegressor
のようなアルゴリズムに渡すことができます。
APIの詳細はVectorIndexer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
s"categorical features: ${categoricalFeatures.mkString(", ")}")
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
APIの詳細はVectorIndexer Java ドキュメント を参照してください。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
APIの詳細はVectorIndexer Python ドキュメント を参照してください。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
Interaction
Interaction
はベクトルあるいは二つの値のカラムを取る Transformer
で、各入力カラムから1つの値の全ての組み合わせの積を含む1つのベクトルのカラムを生成します。
例えば、入力カラムとして3次元のそれぞれが2つのベクトル型のカラムを持つ場合、出力カラムとして9次元のベクトルを取得するでしょう。
例
“id1”, “vec1” と “vec2” のカラムを持つ以下のデータフレームがあると仮定します:
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
それらの入力カラムを使ってInteraction
を適用すると、出力カラムとしてのinteractedCol
は以下を含みます:
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
APIの詳細はInteraction Scala docs を参照してください。
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")
val assembled1 = assembler1.transform(df)
val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")
val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")
val interacted = interaction.transform(assembled2)
interacted.show(truncate = false)
APIの詳細はInteractionExample.scala Java ドキュメント を参照してください。
List<Row> data = Arrays.asList(
RowFactory.create(1, 1, 2, 3, 8, 4, 5),
RowFactory.create(2, 4, 3, 8, 7, 9, 8),
RowFactory.create(3, 6, 1, 9, 2, 3, 6),
RowFactory.create(4, 10, 8, 6, 9, 4, 5),
RowFactory.create(5, 9, 2, 7, 10, 7, 3),
RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);
StructType schema = new StructType(new StructField[]{
new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VectorAssembler assembler1 = new VectorAssembler()
.setInputCols(new String[]{"id2", "id3", "id4"})
.setOutputCol("vec1");
Dataset<Row> assembled1 = assembler1.transform(df);
VectorAssembler assembler2 = new VectorAssembler()
.setInputCols(new String[]{"id5", "id6", "id7"})
.setOutputCol("vec2");
Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");
Interaction interaction = new Interaction()
.setInputCols(new String[]{"id1","vec1","vec2"})
.setOutputCol("interactedCol");
Dataset<Row> interacted = interaction.transform(assembled2);
interacted.show(false);
APIの詳細はInteraction Python ドキュメント を参照してください。
from pyspark.ml.feature import Interaction, VectorAssembler
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)
平均器
Normalizer
は、各Vector
が単位ノルムを持つように正規化し、Vector
行のデータセットを変換するTransformer
です。パラメータ p
を取り、それは正規化に使われる p-norm を指定します。(デフォルトは $p = 2$ ) この正規化は入力データの標準化の役に立ち、学習アルゴリズムの挙動を改善します。
例
以下の例はlibsvm形式のデータセットをロードし、各行がunit $L^1$ ノルムおよび unit $L^\infty$ ノルムを持つように正規化する方法を実演します。
APIの詳細はNormalizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
APIの詳細はNormalizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
APIの詳細はNormalizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
StandardScaler
StandardScaler
はVector
行のデータセットを変換し、各特長値が標準偏差1 および/あるいは 平均0を持つように正規化します。以下のパラメータを取ります:
withStd
: デフォルトはtrueデータを標準偏差1にスケールします。withMean
: デフォルトはfalse。スケーリングする前に平均を使ってデータを中心化します。dense出力を構築するため、sparse入力に適用する場合には注意してください。
StandardScaler
は StandardScalerModel
を生成するためにデータセットにfit
するEstimator
です; これは概要の統計を計算することに該当します。モデルはデータセット中のVector
が単位標準偏差 および/あるいは ゼロ平均特徴 を持つように変換することができます。
特徴の標準偏差がゼロの場合、その特徴のためのVector
の中でデフォルトの0.0
値を返すだろうことに注意してください。
例
以下の例はlibsvm形式のデータセットをロードし、各特長値がunit 標準偏差を持つように正規化する方法を実演します。
APIの詳細はStandardScaler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細はStandardScaler Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はStandardScaler Python ドキュメント を参照してください。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
RobustScaler
RobustScaler
は、Vector
行のデータセットを変換し、中央値を削除して、特定の分位範囲(デフォルトはIQR: 四分位範囲、第一と第三分位間の分位範囲)に従ってデータをスケーリングします。その動作はStandardScaler
と非常によく似ていますが、平均と標準偏差の代わりに中央値と分位範囲が使われるため、外れ値に対して堅牢になります。以下のパラメータを取ります:
lower
: デフォルトは 0.25。分位範囲を計算するための下限分位。全ての特徴量で共有されます。upper
: デフォルトは 0.75。分位範囲を計算するための上限分位。全ての特徴量で共有されます。withScaling
: デフォルトは True。データを分位範囲にスケールします。withCentering
: デフォルトは False。スケーリングする前に中央値を使ってデータを中心化します。dense出力を構築するため、sparse入力に適用する場合には注意してください。
RobustScaler
は RobustScalerModel
を生成するためにデータセットにfit
するEstimator
です; これは変位の統計を計算することに該当します。モデルはデータセット中のVector
が単位変位幅 および/あるいは ゼロ中央値特徴 を持つように変換することができます。
特徴の変位幅がゼロの場合、その特徴のためのVector
の中でデフォルトの0.0
値を返すだろうことに注意してください。
例
以下の例はlibsvm形式のデータセットをロードし、各特長値が単位変位幅を持つように正規化する方法を実演します。
APIの詳細はRobustScaler Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.RobustScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75)
// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)
// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細はRobustScaler Java ドキュメント を参照してください。
import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
RobustScaler scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75);
// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);
// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
APIの詳細はRobustScaler Python ドキュメント を参照してください。
from pyspark.ml.feature import RobustScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
MinMaxScaler
MinMaxScaler
は Vector
行のデータセットを変換し、各特長値を特定の範囲(よくあるのは[0, 1])に再スケールします。以下のパラメータを取ります:
min
: デフォルトは0.0。変換後の下限値は全ての特徴値によって共有されます。max
: デフォルトは1.0。変換後の上限値は全ての特徴値によって共有されます。
MinMaxScaler
はデータセットの総統計を計算し、MinMaxScalerModel
を生成します。モデルはそれぞれの特徴値を指定された領域に入るように変換することができます。
特徴Eの再スケールされた値は、\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}
として計算されます。$E_{max} == E_{min}$
の場合は $Rescaled(e_i) = 0.5 * (max + min)$
ゼロの値はおそらく非ゼロ値に変換されるため、変換器の出力は薄い入力に対してもDenseVector
になるでしょう。
例
以下の例はlibsvm形式のデータセットをロードし、各特長値を [0, 1] に再スケールします。
APIの詳細はMinMaxScaler Scala ドキュメント と MinMaxScalerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
APIについての詳細はMinMaxScaler Java ドキュメント と MinMaxScalerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
+ scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
APIの詳細はMinMaxScaler Python ドキュメント と MinMaxScalerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
MaxAbsScaler
MaxAbsScaler
はVector
の行のデータセットを変換し、各特徴の最大の絶対値を使って割ることで各特徴を範囲 [-1, 1] 再び倍率を掛けます。それはデータを移動/中央に寄せないため、まばらな度合を破壊しません。
MaxAbsScaler
はデータセット上の総統計を計算し、MaxAbsScalerModel
を生成します。モデルはそれぞれの特徴値を個々に範囲[-1, 1] に変換することができます。
例
以下の例はlibsvm形式のデータセットをロードし、各特長値を [-1, 1] に再スケールします。
APIの詳細はMaxAbsScaler Scala ドキュメント と MaxAbsScalerModel Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -8.0)),
(1, Vectors.dense(2.0, 1.0, -4.0)),
(2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
APIについての詳細はMaxAbsScaler Java ドキュメント と MaxAbsScalerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
APIの詳細はMaxAbsScaler Python ドキュメント と MaxAbsScalerModel Python ドキュメント を参照してください。
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
Bucketizer
Bucketizer
は連続する特徴値のカラムをユーザによって指定される特徴値のバケットに変換します。以下のパラメータを取ります:
splits
: 連続する特徴値をバケットにマップするためのパラメータ。n+1 分割の場合、nバケットになります。split x,y によって定義されたバケットは最後のバケットを除いて [x,y) の範囲の値を持ちます。最後のバケットはyも含みます。splits は厳密に増加しなければなりません。全てのdouble値の範囲をカバーするために、-inf, inf の値も明示的に提供されなければなりません; そうでなければ、指定されたsplit外の値はエラーとして扱われるでしょう。splits
の2つの例としてArray(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
とArray(0.0, 1.0, 2.0)
があります。
目的とするカラムの上限と下限に何も意図が無い場合は、splitのBucketizer境界例外の可能性を避けるために境界としてDouble.NegativeInfinity
と Double.PositiveInfinity
を追加するべきです。
指定するsplitは厳密に増加する順番でなければならないことにも注意してください。つまりs0 < s1 < s2 < ... < sn
。
Bucketizerについての詳細はAPIドキュメントの中で見つかります。
例
以下の例はDouble
のカラムを他のインデックス化されたカラムにバケット化する方法を実演します。
APIの詳細はBucketizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
val splitsArray = Array(
Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))
val data2 = Array(
(-999.9, -999.9),
(-0.5, -0.2),
(-0.3, -0.1),
(0.0, 0.0),
(0.2, 0.4),
(999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")
val bucketizer2 = new Bucketizer()
.setInputCols(Array("features1", "features2"))
.setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
.setSplitsArray(splitsArray)
// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)
println(s"Bucketizer output with [" +
s"${bucketizer2.getSplitsArray(0).length-1}, " +
s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
APIの詳細はBucketizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-999.9),
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2),
RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
// Bucketize multiple columns at one pass.
double[][] splitsArray = {
{Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
{Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};
List<Row> data2 = Arrays.asList(
RowFactory.create(-999.9, -999.9),
RowFactory.create(-0.5, -0.2),
RowFactory.create(-0.3, -0.1),
RowFactory.create(0.0, 0.0),
RowFactory.create(0.2, 0.4),
RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);
Bucketizer bucketizer2 = new Bucketizer()
.setInputCols(new String[] {"features1", "features2"})
.setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
.setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);
System.out.println("Bucketizer output with [" +
(bucketizer2.getSplitsArray()[0].length-1) + ", " +
(bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
APIの詳細はBucketizer Python ドキュメント を参照してください。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()
ElementwiseProduct
ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。これは結果のベクトルを生成するための、入力ベクトル v
と 変換ベクトル w
のアダマール積を表します。
\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]
例
以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。
APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
APIの詳細はElementwiseProduct Java ドキュメント を参照してください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
APIの詳細はElementwiseProduct Python ドキュメント を参照してください。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
SQLTransformer
SQLTransformer
はSQL文によって定義される変換を実装します。現在のところ、以下のようなSQL構文だけをサポートします。"SELECT ... FROM __THIS__ ..."
where "__THIS__"
は入力データセットの基礎をなすテーブルを表します。select句は出力で表示するフィールド、定数および表現を指定します。Spark SQLがサポートするどのようなselect句もありえます。ユーザは選択されるカラム上で操作するSpark SQLのビルトイン関数とUDFも使うことができます。例えば、SQLTransformer
は以下のような文をサポートします:
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例
id
、v1
および category
のカラムを持つ以下のデータフレームを仮定します:
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
これは"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
のSQLTransformer
の出力です :
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
APIの詳細はSQLTransformer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
APIの詳細はSQLTransformer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
APIの詳細はSQLTransformer Python ドキュメント を参照してください。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
VectorAssembler
VectorAssembler
は指定されたカラムのリストを一つのベクトルのカラムに合成する変換器です。ロジスティック回帰や決定木のようなMLモデルを訓練するためには、生の特徴と、異なる特徴変換器によって生成された特徴を、1つの特徴ベクトルに結合すると便利です。VectorAssembler
は以下の入力カラムタイプを受け入れます: 全ての数値型、真偽値、およびベクトル型。各行内で入力カラムの値は指定された順でベクトルに結合されます。
例
カラム id
, hour
, mobile
, userFeatures
および clicked
を持つデータフレームがあると仮定します:
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。hour
, mobile
および userFeatures
を features
と呼ばれる一つの特徴ベクトルに結合し、それを使ってclicked
かそうでないかを予想するために使いたいとします。VectorAssembler
の入力カラムを hour
, mobile
および userFeatures
に設定し、出力カラムをfeatures
に設定した場合、変換後に以下のデータフレームを取得するはずです:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
APIの詳細はVectorAssembler Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
APIの詳細はVectorAssembler Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
APIの詳細はVectorAssembler Python ドキュメント を参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
VectorSizeHint
VectorType
のカラムについてのベクトルのサイズを明示的に指定することは、時には有用かもしれません。例えば、VectorAssembler
は出力カラムのサイズ情報とメタデータを生成するために、入力カラムからサイズの情報を使います。幾つかの場合においてはこの情報はカラムの内容を調べることで取得することができますが、ストリーミング データフレーム内ではストリームが開始されるまでは内容が利用できません。VectorSizeHint
によりユーザはVectorAssembler
あるいはベクトルサイズを知る必要があるかもしれない他の変換器が入力としてそのカラムを使うことができるように、明示的にカラムのベクトルサイズを指定することができます。
VectorSizeHint
を使うには、ユーザはinputCol
と size
パラメータを設定しなければなりません。この変換器のデータフレームへの適用は、ベクトルサイズを指定するinputCol
のためのメタデータが更新された新しいデータフレームを生成します。結果のデータフレーム上のDownstream操作はメタデータを使ってこのサイズを取得することができます。
VectorSizeHint
は、ベクトルのカラムがnullあるいは間違ったサイズのベクトルを含む時の挙動を制御する任意のhandleInvalid
パラメータを取ることもできます。デフォルトでは handleInvalid
は、例外が投げられなければならない、“rror” に設定されます。このパラメータは、無効な値を含む行が結果のデータフレームからフィルタアウトされなければならない “skip”、あるいはカラムが無効値のチェックをされてはならず全ての行が維持されなければならない “optimistic” に設定することもできます。“optimistic”の使用は、カラムのためのメタデータ VectorSizeHint
が適用されたものはそのカラムの内容に一致しないことを意味し、結果のデータフレームが一貫性の無い状態になる結果を引き起こすかも知れません。ユーザはこの種類の矛盾の状態を避けるように気を付ける必要があります。
APIの詳細については VectorSizeHint Scala docs を参照してください。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq(
(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3)
val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
APIの詳細はVectorSizeHint Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);
VectorSizeHint sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3);
Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
APIの詳細はVectorSizeHint Python ドキュメント を参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
QuantileDiscretizer
QuantileDiscretizer
は連続する特徴を持つカラムを取り、ビンされたカテゴリの機能を持つカラムを出力します。ビンの数はnumBuckets
パラメータによって設定されます。使用されるバケットの数はこの値より小さいかも知れません。例えば、十分にはっきりと異なる変異を生成するには入力の個別の値があまりにも少ない場合。
NaN 値: NaN 値はQuantileDiscretizer
フィッティングの間にカラムから削除されるでしょう。これは予想するためのBucketizer
モデルを生成するでしょう。変換の間に、Bucketizer
がデータセット内にNaN値を見つけるとエラーを上げるかも知れませんが、ユーザはhandleInvalid
を設定してデータセット内のNaN値を維持するか削除するかのどちらかを選択することもできます。ユーザがNaN値を維持することを選択した場合、それらは特別に扱われ、自身のバケット内に配置されるでしょう。例えば、4つのバケットが使われる場合、非NaNデータはバケット[0-3]に入れられるでしょうが、NaNは特別なバケット[4]に入れられるでしょう。
アルゴリズム: ビンの範囲は近似アルゴリズムを使って選択されます(詳細な説明はapproxQuantile のドキュメントを見てください)。近似の精度はrelativeError
パラメータを使って制御することができます。0に設定した場合は、正確な変位量が計算されます(注意: 正確な変位量の計算は高価な操作です)。下限および上限のbinの境界は全ての実世界の値をカバーする-Infinity
と +Infinity
です。
例
id
, hour
のカラムを持つデータフレームがあるとします:
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
はDouble
型の連続する特徴です。連続する特徴を明確な1つに変化したいとします。numBuckets = 3
の場合、以下のデータフレームを取得するはずです:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
APIの詳細はQuantileDiscretizer Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
APIの詳細はQuantileDiscretizer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
APIの詳細はQuantileDiscretizer Python ドキュメント を参照してください。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
Imputer
Imputer
estimatorは、失われた値の箇所の平均、中央値、あるいはカラムのモードを使って、データセット中の失われた値を補完します。入力カラムは数値型でなければなりません。現在のところ、Imputer
はカテゴリ特徴量をサポートせず、カテゴリ特徴量を含むカラムについてはたぶん正しくない値を生成します。Imputerは.setMissingValue(custom_value)
によって‘NaN’以外の独自の値を補完することができます。例えば、.setMissingValue(0)
は全ての出来事を(0)に補完するでしょう。
入力カラム中の全てのnull
値は失われたものとして扱われ、補完されることに注意してください。
例
カラム a
と b
を持つデータフレームがあると仮定します:
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
この例では、Imputer は存在する全てのDouble.NaN
(失われた値のデフォルト) を対応するカラムの他の値から計算した平均(デフォルトの補完ストラテジ)で置き換えるでしょう。この例では、カラムa
と b
の代理の値は それぞれ 3.0 と 4.0 です。変換の後で、出力カラム内の失われた値は関連するカラムの代理の値によって置き換えられるでしょう。
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
APIの詳細はImputer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Imputer
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")
val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))
val model = imputer.fit(df)
model.transform(df).show()
APIの詳細はImputer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});
ImputerModel model = imputer.fit(df);
model.transform(df).show();
APIの詳細はImputer Python ドキュメント を参照してください。
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
Feature Selectors
VectorSlicer
VectorSlicer
は特徴値ベクトルを取る変換器で、元の特徴値のsub配列を持つ新しい特徴ベクトルを出力します。ベクトルのカラムから特徴値を抽出するのに役立ちます。
VectorSlicer
は指定されたインデックスを持つベクトルカラムを受け付け、それらのインデックスを使って選択された値を持つ新しいベクトルカラムを出力します。インデックスには2つの種類があります
-
インデックスがベクトルを表す数値インデックス
setIndices()
; -
特徴値の名前がベクトルを表す文字列インデックス
setNames()
。これは実装がAttribute
の名前部分に一致するため、AttributeGroup
を持つためにベクトルカラムを必要とします。
数値および文字列の両方による指定が受け付けられます。更に、同時に数値と文字列による名前を使用することができます。少なくとも1つの特徴量が選択されなければなりません。特徴量の複製は許可されません。つまり選択されたインデックスと名前の間には重複は無いでしょう。特徴の名前が選択されると、もし空の入力属性に遭遇した場合に例外が投げられることに注意してください。
出力ベクトルは選択されたインデックスを最初(指定された順番)に、続いて選択された名前を次(指定された順番)に、整列するでしょう。
例
userFeatures
のカラムを持つデータフレームを持つと仮定します:
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
は3つのユーザ特徴値を持つベクトルカラムです。userFeatures
の最初のカラムはゼロでそれを削除し、最後の2つのカラムだけが選択されるようにしたいとします。VectorSlicer
はsetIndices(1, 2)
を持つ最後の2つの要素を選択し、features
という名前の新しいベクトルを生成します:
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
userFeatures
として ["f1", "f2", "f3"]
の可能性がある入力属性があるとした場合、それらを選択するためにsetNames("f2", "f3")
を使うことができます。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
APIの詳細はVectorSlicer Scala ドキュメント を参照してください。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
output.show(false)
APIの詳細はVectorSlicer Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
APIの詳細はVectorSlicer Python ドキュメント を参照してください。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
RFormula
RFormula
はR model formulaで指定されたカラムを選択します。現在のところ、‘~’, ‘.’, ‘:’, ‘+’ および ‘-‘ を含む R オペレータの制限された部分集合をサポートします。基本的なオペレータは以下の通りです:
~
目的のものと単語を分割します+
単語を結合します。“+ 0" は切片の削除を意味します-
単語を削除します。“- 1" は切片の削除を意味します:
相互作用 (数値の掛け算、あるいは2値化された分類の値).
目的のものを除く全てのカラム
a
とb
が double のカラムと仮定した場合、RFormula
の結果を説明するために以下の単純な例を使います:
y ~ a + b
はモデルy ~ w0 + w1 * a + w2 * b
を意味し、w0
は切片で、w1, w2
は係数です。y ~ a + b + a:b - 1
はモデルy ~ w1 * a + w2 * b + w3 * a * b
を意味し、w1, w2, w3
は係数です。
RFormula
は、特徴のカラムと、doubleあるいは文字列のカラムのベクトルを生成します。Rで線形回帰のために公式が使われる時のように、数値カラムはdoubleにキャストされるでしょう。As to string input columns, they will first be transformed with StringIndexer using ordering determined by stringOrderType
, and the last category after ordering is dropped, then the doubles will be one-hot encoded.
値 {'b', 'a', 'b', 'a', 'c', 'b'}
を含む文字列の特徴カラムを仮定すると、符号化を制御するためにstringOrderType
を設定します:
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
ラベルのカラムが文字列型の場合、frequencyDesc
を使ってStringIndexerで2倍に最初に変換されるでしょうデータフレーム中にラベルのカラムが存在しない場合は、出力ラベルのカラムは公式内の指定された応答変数から生成されるでしょう。
注意: 順番付けのオプションstringOrderType
はラベルカラムのために使われません。ラベルのカラムがインデックスされる場合、StringIndexer
の中でデフォルトの降順頻度の順序付けを使います。
例
id
, country
, hour
および clicked
のカラムを持つデータフレームがあるとします:
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
clicked ~ country + hour
の公式文字列を使ってRFormula
を使う場合は、これはcountry
および hour
に基づいて clicked
を予想したいことを意味し、変換後に以下のデータフレームを取得するでしょう:
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
APIの詳細はRFormula Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
APIの詳細はRFormula Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
APIの詳細はRFormula Python ドキュメント を参照してください。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
ChiSqSelector
ChiSqSelector
はカイ二乗特徴抽出を意味します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelector はどの特徴を選択するかを決めるために 独立性のカイ二乗検定 を使います。5つの選択メソッドをサポートします: numTopFeatures
, percentile
, fpr
, fdr
, fwe
:
numTopFeatures
カイ二乗検定に従って固定数のトップの特徴を選択します。これは最も予知力が高い特徴に明け渡すことに似ています。percentile
はnumTopFeatures
に似ていますが、固定数の代わりに全ての特徴の分数を選択します。fpr
はp値が閾値以下の全ての特徴を選択します。従って選択のfalse positiveレートを制御します。fdr
はfalse discoveryレートが閾値より低い全ての特徴を選択するために Benjamini-Hochberg procedure を使います。fwe
はp値が閾値以下の全ての特徴を選択します。閾値は 1/numFeatures によってスケールされ、従って選択のfamily-withエラーが制御されます。デフォルトでは、選択メソッドはデフォルトのトップの特徴の数が50に設定されたnumTopFeatures
です。ユーザはsetSelectorType
を使って選択メソッドを選択することができます。
例
id
, features
および clicked
のカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
numTopFeatures = 1
を持つ ChiSqSelector
を使う場合、ラベルclicked
に応じて、features
内の最後のカラムは最も有用な特徴として選択されます。
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
APIの詳細はChiSqSelector Scala ドキュメント を参照してください。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
APIの詳細はChiSqSelector Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
+ " features selected");
result.show();
APIの詳細はChiSqSelector Python ドキュメント を参照してください。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
UnivariateFeatureSelector
UnivariateFeatureSelector
operates on categorical/continuous labels with categorical/continuous features.
User can set featureType
and labelType
, and Spark will pick the score function to use based on the specified
featureType
and labelType
.
featureType | labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous |categorical | ANOVATest (f_classif)
continuous |continuous | F-value (f_regression)
It supports five selection modes: numTopFeatures
, percentile
, fpr
, fdr
, fwe
:
numTopFeatures
chooses a fixed number of top features.percentile
はnumTopFeatures
に似ていますが、固定数の代わりに全ての特徴の分数を選択します。fpr
はp値が閾値以下の全ての特徴を選択します。従って選択のfalse positiveレートを制御します。fdr
はfalse discoveryレートが閾値より低い全ての特徴を選択するために Benjamini-Hochberg procedure を使います。fwe
はp値が閾値以下の全ての特徴を選択します。閾値は 1/numFeatures によってスケールされ、従って選択のfamily-withエラーが制御されます。
デフォルトでは、選択モードはデフォルトのselectionThresholdが50に設定されたnumTopFeatures
です。
例
id
, features
および label
のカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:
id | features | label
---|--------------------------------|---------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0
featureType
をcontinuous
に設定し、labelType
をnumTopFeatures = 1
なcategorical
に設定した場合、features
の最後のカラムは最も便利な特徴量として選択されます:
id | features | label | selectedFeatures
---|--------------------------------|---------|------------------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0 | [2.3]
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0 | [4.1]
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0 | [2.5]
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0 | [3.8]
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0 | [3.0]
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0 | [2.1]
APIの詳細はUnivariateFeatureSelector Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)
val df = spark.createDataset(data).toDF("id", "features", "label")
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
s" features selected using f_classif")
result.show()
APIの詳細はUnivariateFeatureSelector Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("UnivariateFeatureSelector output with top "
+ selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
APIの詳細はUnivariateFeatureSelector Python ドキュメント を参照してください。
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()
VarianceThresholdSelector
VarianceThresholdSelector
は変動が少ない特徴量を削除するselectorです。varianceThreshold
以下の変動を持つ特徴量は削除されます。varianceThreshold
が設定されない場合、デフォルトは0で、変動が0の特徴量(つまり、全ての標本で同じ値の特徴量)だけが削除されることを意味します。
例
id
とfeatures
のカラムを持つデータフレームがあると仮定し、これが予想される目的として使われます:
id | features
---|--------------------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]
6つの特徴量の変動はそれぞれ、16.67, 0.67, 8.17, 10.17, 5.07, 11.47 です。varianceThreshold = 8.0
のVarianceThresholdSelector
を使う場合、変動が8.0以下の特徴量は削除されます:
id | features | selectedFeatures
---|--------------------------------|-------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]
APIの詳細はVarianceThresholdSelector Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)
val df = spark.createDataset(data).toDF("id", "features")
val selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"Output: Features with variance lower than" +
s" ${selector.getVarianceThreshold} are removed.")
result.show()
APIの詳細はVarianceThresholdSelector Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VarianceThresholdSelector selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("Output: Features with variance lower than "
+ selector.getVarianceThreshold() + " are removed.");
result.show();
Refer to the VarianceThresholdSelector Python docs for more details on the API.
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()
一貫性のあるハッシュ
局所性鋭敏型ハッシュ (LSH) はハッシュ技術の重要なクラスです。クラスタリング、近似近傍検索および大きなデータセットの外れ値検知で一般的に使われます。
LSHの全体的な考えは、お互いに近いデータポイントが高い確率を持つ同じバケット内にあり一方でお互いに遠くにあるデータポイントが異なるバケット内にあるように、ハッシュデータポイントがバケットを指す関数のファミリー(“LSH families”)を使うことです。LSH ファミリーは正式には以下のように定義されます。
距離空間 (M, d)
、M
はセットでd
はM
上での距離関数、の中で、LSHファミリーは以下の特性を満たす関数のファミリー h
です: \[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \]
この LSH ファミリーは (r1, r2, p1, p2)
-センシティブ と呼ばれます。
Sparkでは、異なるLSHファミリーは別個のクラス (例えば MinHash
)内で実装され、特徴の変換のためのAPI、approximate similarity join および 近似最近傍 は各クラス内で提供されます。
LSHでは、false positiveを入力特徴の距離のペア ($d(p,q) \geq r2$
を持つ) として定義します。これは同じパケットにハッシュ化されます。false negativeを 近くの特徴のペア($d(p,q) \leq r1$
を持つ) と定義します。これは異なるバケットにハッシュ化されます。
LSH オペレーション
LSHが代用することができる主要なオペレータを説明します。合致したLSHモデルはこれらのオペレーションのそれぞれのためのメソッドを持ちます。
特徴変換
特徴変換はハッシュ化された値を新しいカラムとして追加する基本的な機能です。これは次元削減に役に立つかも知れません。ユーザはinputCol
および outputCol
を設定することで入力と出力のカラム名を指定することができます。
LSH は複数のLSHハッシュテーブルもサポートします。ユーザはnumHashTables
を設定することでハッシュテーブルの数を指定することができます。これは類似結合近似と近似最近傍においてOR-amplificationに使う事もできます。ハッシュテーブルの数を増やすと精度が高まりますが、通信コストと実行時間も増えるでしょう。
outputCol
の型は配列の次元が numHashTables
に等しく、ベクトルの次元が現在ところ1に設定されている Seq[Vector]
です。将来のリリースでは、ユーザがこれらのベクトルの次元を指定できるように AND-amplification を実装するつもりです。
類似結合近似
類似結合近似は二つのデータセットを取り、ユーザ定義の閾値より小さい距離のデータセット中の行のペアを大体返します。類似結合近似は二つの異なるデータセットと自身への結合の両方をサポートします。自身への結合は幾つかの重複ペアを生成するでしょう。
類似結合近似は入力として変換および非変換されたデータセットの両方を受け付けます。非変換のデータセットが使われた場合、自動的に変換されるでしょう。この場合、ハッシュの署名はoutputCol
として生成されるでしょう。
結合されたデータセットの中で、元のデータセットは datasetA
および datasetB
の中でクエリすることができます。返された各行のペア間の本当の距離を示すために、出力データセットに距離カラムが追加されるでしょう。
近傍結合検索近似
近似最近傍検索は(特徴ベクトルの)データセットと(1つの特徴ベクトルの)キーを取り、ベクトルに最も近いデータセット内の指定された数の行を大体返します。
近似最近傍検索は入力として変換および非変換されたデータセットの両方を受け付けます。非変換のデータセットが使われた場合、自動的に変換されるでしょう。この場合、ハッシュの署名はoutputCol
として生成されるでしょう。
各出力行と検索されたキーの間の本当の距離を示すために、出力データセットに距離カラムが追加されるでしょう。
注意: 近似最近傍検索は、ハッシュバケット内に十分な候補が無い場合にk
より少ない行を返すでしょう。
LSH アルゴリズム
ユークリッド距離についてのバケット化ランダム写像
バケット化ランダム写像はユークリッド距離のためのLSHファミリーです。ユークリッド距離は以下のように定義されます: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]
LSHファミリーは特徴ベクトル $\mathbf{x}$
をランダムな単位ベクトル $\mathbf{v}$
上に写像し、写像された結果をハッシュバケットに割り当てます: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]
r
はユーザ定義のバケットの長さです。バケットの長さはハッシュバケットの平均長(と、したがってバケットの数)を制御するために使うことができます。バケット長が大きくなる(つまり、バケットが少なくなる)と、同じバケットにハッシュ化される特徴の確率が増加します(trueおよびfalse positiveの数が増えます)。
バケット化ランダム写像は任意のベクトルと入力特徴として受け取り、sparseとdenseベクトルの両方をサポートします。
APIの詳細はBucketedRandomProjectionLSH Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 1.0)),
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細はBucketedRandomProjectionLSH Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 1.0)),
RowFactory.create(1, Vectors.dense(1.0, -1.0)),
RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(4, Vectors.dense(1.0, 0.0)),
RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
RowFactory.create(6, Vectors.dense(0.0, 1.0)),
RowFactory.create(7, Vectors.dense(0.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
Vector key = Vectors.dense(1.0, 0.0);
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
APIの詳細はBucketedRandomProjectionLSH Python ドキュメントを参照してください。
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
ジャッカード距離のためのMinHash
MinHash は入力特徴が自然数のセットの場合のジャッカー度距離のLSHファミリーです。二つのセットのジャッカード距離はそれらの和と積の濃度によって定義されます: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \]
MinHash はランダムハッシュ関数g
をセット中の各要素に適用し、すべてのハッシュ化された値の最小値を取ります: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]
MinHashの入力セットは二元のベクトルを表します。ベクトルのインデックスは要素自身を表し、ベクトル内の非0の値はセット内の要素の存在を表します。denseとsparseベクトルの両方がサポートされますが、一般的には効率化のためにsparseベクトルがお勧めです。例えば、Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])
は空間内に10個の要素があることを意味します。このセットは elem 2, elem 3 および elem 5 を含みます。全ての非ゼロの値は2値の “1” の値として扱われます。
注意: 空のセットはMinHashによって変換することができません。このことは任意の入力ベクトルは少なくとも一つの非ゼロのエントリを持つ必要があることを意味します。
APIの詳細はMinHashLSH Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細はMinHashLSH Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
APIの詳細はMinHashLSH Python ドキュメント を参照してください。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()