特徴量抽出と変換 - RDDベースのAPI
TF-IDF
注意 データフレームベースのAPIを使うことをお勧めします。それはTF-IDFでのMLユーザガイドで詳述されます。
索引語頻度の逆出現頻度 (TF-IDF) はテキストマイニングで単語の重要度をコープス内のドキュメントに反映するために広く使われている特徴ベクトル化です。$t$
によって単語を、$d$
によってドキュメントを、$D$
でコープスを示します。索引語頻度 $TF(t, d)$
は単語 $t$
がドキュメント $d$
に現れる数で、ドキュメントの文章頻度 $DF(t, D)$
は単語 $t$
を含むドキュメントの数です。重要度を測定するために索引語頻度だけを使う場合、頻繁に現れる単語を過度に強調しすぎますが、ドキュメントについてほとんど情報を運びません。例えば "a"、"the" および "of"。コープスを横断してしばしば単語が現れる場合、それは特定のドキュメントに関する特別な情報を運ばないことを意味します。逆出現頻度はどれだけの情報を単語が提供するかの数値的な指標です: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
ここで $|D|$
はコープス内のドキュメントの総数です。対数が使われるため、全てのドキュメントで単語が現れる場合、IDFは0になります。補正単語はコープス外の単語のためにゼロで割られることを避けるために適用されることに注意してください。TF-IDF 指標は単純にTF と IDFの産物です: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
索引語頻度と文章頻度の定義上の幾つかの変数があります。spark.mllib
ではそれらを柔軟にするためにTFとIDFを分割します。
索引語頻度の実装は ハッシュのトリックを利用します。生の特徴はハッシュ関数を適用することでインデックス(単語)にマップされます。そして、索引語頻度はマップされたインデックスに基づいて計算されます。このやり方はグローバルな単語からインデックスへのマップの計算の必要性を避けます。これは大きなコープスには高くつくでしょうが、潜在的なハッシュの衝突の影響を受けます。生の特徴の違いはハッシュの後で同じ単語になるかも知れません。衝突の可能性を減らすために、目標の特徴次元を増やすことができます。つまり、ハッシュテーブルのバケットの数です。デフォルトの特徴次元は $2^{20} = 1,048,576$
です。
注意: spark.mllib
はテキストの文節化のためのツールを提供しません。ユーザはStanford NLP Group および scalanlp/chalk を参照してください。
TF と IDF は HashingTF および IDF で実装されています。HashingTF
は入力として RDD[Iterable[_]]
を取ります。各レコードは文字あるいは他のタイプの繰り返しかも知れません。
APIの詳細はHashingTF
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
TF と IDF は HashingTF および IDF で実装されています。HashingTF
は入力としてリストのRDDを取ります。各レコードは文字あるいは他のタイプの繰り返しかも知れません。
APIの詳細はHashingTF
Python ドキュメント を参照してください。
from pyspark.mllib.feature import HashingTF, IDF
# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
Word2Vec
Word2Vec は単語の分散型ベクトル表現を計算します。分散型表現の主な利点は似たような単語はベクトル空間内で近い場所にあるということで、小説のパターンへの一般化を容易にし、モデルの推定をもっと堅牢にします。分散型ベクトル表現は名前付きのエンティティ認識、明確化、パース、タグ付けおよび機械翻訳のような多くの自然言語処理アプリケーションで有効であると示されます。
モデル
Word2Vecの実装において、skip-gramモデルを使用しました。skip-gramの訓練の目的は同じ文章内でテキストの予測するのが得意な単語ベクトル表現を学習することです。数学的には、ある訓練ワードの系列 $w_1, w_2, \dots, w_T$
においてskip-gramモデルの目的は対数尤度の平均の最大化です。 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \]
ここで $k$ は訓練ウィンドウのサイズです。
skip-gramモデルにおいて、各単語 $w$ はベクトル $u_w$ と $v_w$ に関係があり、それらはそれぞれ単語およびコンテキストとしての$w$のベクトル表現です。指定された単語 $w_j$ の単語が正しく予測される確率 $w_i$ は、softmaxモデルによって決定されます。$V$が語彙のサイズとした時、\[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]
です。
$\log p(w_i | w_j)$ の計算コストは $V$ に比例するため、softmaxを使ったskip-gramモデルは高くつきます。簡単に数百万の単位になり目得ます。Word2Vecの訓練を高速可するために、階層的なsoftmaxを使いました。これは $\log p(w_i | w_j)$ の計算の複雑さを $O(\log(V))$ に減らします。
例
以下の例はテキストファイルをどうやってロードするかを説明し、それをSeq[String]
のRDDとしてパースし、Word2Vec
インスタンスを構築し、Word2VecModel
を入力データに適合させます。最後に、指定された単語のトップ40の類義語を表示します。例を実行するには、最初にtext8データをダウンロードし、それを好きなディレクトリに解凍します。ここで、解凍されたファイルをtext8
とし、同じディレクトリでspark shellを実行できると仮定します。
APIの詳細はWord2Vec
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
APIの詳細はWord2Vec
Python ドキュメント を参照してください。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
StandardScaler
単位分散にスケール および/あるいは 訓練セット内の標本のカラムのサマリ統計を使って平均を削除することで、特徴を標準化します。これは事前処理のステップでとても一般的です。
例えば、全ての特徴が分散1 および/あるいは ゼロ平均を持つ場合、サポートベクトルマシーンのRBFカーネル あるいは L1およびL2正規化線形モデルは一般的に良く動作します。
標準化は最適化プロセス時の収斂レートを改善し、モデル訓練時にあまりにも大きな影響を及ぼすとても大きな分散を持つ特徴を避けることもできます。
モデルのフィッティング
StandardScaler
はコンストラクタに以下のパラメータを持ちます:
withMean
デフォルトはfalse。スケーリングする前に平均を使ってデータを中心化します。dense出力を構築するため、sparse入力に適用する場合には注意してください。withStd
デフォルトはtrue。データを標準偏差1にスケールします。
StandardScaler
にはRDD[Vector]
の入力を取ることができる fit
メソッドを提供し、要約の統計を学習し、 StandardScaler
をどうやって設定するかに依存して入力データセットを標準分散1 および/あるいは ゼロ平均特徴に変換できます。
このモデルは変換されたVector
を生成するために Vector
に正規化を適用、あるいは変換されたRDD[Vector]
を生成するためにRDD[Vector]
に適用できる、VectorTransformer
を実装します。
もし特徴の分散がゼロの場合、その特徴に対してVector
の中にデフォルトの 0.0
値を返すだろうことに注意してください。
例
以下の例はlibsvm形式のデータセットをロードし、新しい特徴が標準偏差1 および/あるいは ゼロの平均を持つように、特徴を標準化します。
APIの詳細はStandardScaler
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
APIの詳細はStandardScaler
Python ドキュメント を参照してください。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
平均器
Normalizer は各標本が単位 $L^p$ ノルムを持つようにスケールします。これはテキストの分類あるいはクラスタリングで一般的な操作です。例えば、TF-IDFベクトルで正規化された2つの $L^2$ ドット積は、ベクトルのコサイン類似度です。
Normalizer
はコンストラクタに以下のパラメータを持ちます:
p
$L^p$ 空間を正規化します。デフォルトは $p = 2$。
Normalizer
は変換されたVector
を生成するためにVector
に正規化を適用、あるいは変換されたRDD[Vector]
を生成するためにRDD[Vector]
に適用できる、 VectorTransformer
を実装します。
入力のノルムがゼロの場合、入力ベクトルを返すだろうことに注意してください。
例
以下の例はlibsvm形式のデータセットをロードし、$L^2$ ノルム および $L^\infty$ ノルムを持つ特徴を正規化します。
APIの詳細はNormalizer
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
APIの詳細はNormalizer
Python ドキュメント を参照してください。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
ChiSqSelector
特徴の選択 はモデルの構築時に使うために関連する特徴を識別しようとします。それは特徴空間のサイズを減らし、速度と統計的な学習挙動の両方を改善します。
ChiSqSelector
はカイ二乗特徴抽出を実装します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelector はどの特徴を選択するかを決めるために独立性のカイ二乗検定 を使います。5つの選択メソッドをサポートします: numTopFeatures
, percentile
, fpr
, fdr
, fwe
:
numTopFeatures
カイ二乗検定に従って固定数のトップの特徴を選択します。これは最も予知力が高い特徴に明け渡すことに似ています。percentile
はnumTopFeatures
に似ていますが、固定数の代わりに全ての特徴の分数を選択します。fpr
はp値が閾値以下の全ての特徴を選択します。従って選択のfalse positiveレートを制御します。fdr
はfalse discoveryレートが閾値より低い全ての特徴を選択するために Benjamini-Hochberg procedure を使います。fwe
はp値が閾値以下の全ての特徴を選択します。閾値は 1/numFeatures によってスケールされ、従って選択のfamily-withエラーが制御されます。
デフォルトでは、選択メソッドはデフォルトのトップの特徴の数が50に設定されたnumTopFeatures
です。ユーザは setSelectorType
を使って選択メソッドを選択することができます。
選択する特徴の数は提出される検証セットを使って調整することができます。
モデルのフィッティング
fit
メソッドは分類特徴を持つRDD[LabeledPoint]
の入力を取り、要約統計量を学習し、入力データセットを削減された特徴空間に変換することができるChiSqSelectorModel
を返します。ChiSqSelectorModel
は、削減されたVector
を生成するためにVector
へ、あるいは削減されたRDD[Vector]
を生成するためにRDD[Vector]
へ適用することができます。
ユーザは選択された特徴インデックス(昇順に整列していなければなりません)の配列を提供することで手動で ChiSqSelectorModel
を生成することもできることに注意してください。
例
以下の例はChiSqSelectorの基本的な使い方を示します:使用されるデータセットは各特長ごとに0から255まで変動するグレースケールの値からなる特徴マトリックスを持ちます。
APIの詳細はChiSqSelector
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
APIの詳細はChiSqSelector
Java ドキュメント を参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
ElementwiseProduct
ElementwiseProduct
は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。これは結果のベクトルを生成するための、入力ベクトル v
と 変換ベクトル scalingVec のアダマール積を表します。
scalingVec
を “w
” と表すと、この変換は以下のように書くことができます:
\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]
ElementwiseProduct
はコンストラクタに以下のパラメータを持ちます:
scalingVec
: 変換ベクトル。
ElementwiseProduct
は変換されたVector
を生成するためにVector
に正規化を適用、あるいは変換されたRDD[Vector]
を生成するためにRDD[Vector]
に適用できる、 VectorTransformer
を実装します。
例
以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。
APIの詳細はElementwiseProduct
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
APIの詳細はElementwiseProduct
Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
APIの詳細はElementwiseProduct
Python ドキュメント を参照してください。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())
PCA
PCAを使ってベクトルを低次元空間に投影する特徴transformer。詳細は次元削減で読むことができます。