特徴抽出と変形 - spark.mllib

TF-IDF

索引語頻度の逆出現頻度 (TF-IDF) はテキストマイニングで単語の重要度をコープス内のドキュメントに反映するために広く使われている特徴ベクトル化です。$t$によって単語を、$d$によってドキュメントを、$D$でコープスを示します。索引語頻度 $TF(t, d)$ は単語 $t$ がドキュメント $d$に現れる数で、ドキュメントの文章頻度 $DF(t, D)$は単語 $t$を含むドキュメントの数です。重要度を測定するために索引語頻度だけを使う場合、頻繁に現れる単語を過度に強調しすぎますが、ドキュメントについてほとんど情報を運びません。例えば "a"、"the" および "of"。コープスを横断してしばしば単語が現れる場合、それは特定のドキュメントに関する特別な情報を運ばないことを意味します。逆出現頻度はどれだけの情報を単語が提供するかの数値的な指標です: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] ここで $|D|$ はコープス内のドキュメントの総数です。対数が使われるため、全てのドキュメントで単語が現れる場合、IDFは0になります。補正単語はコープス外の単語のためにゼロで割られることを避けるために適用されることに注意してください。TF-IDF 指標は単純にTF と IDFの産物です: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 索引語頻度と文章頻度の定義上の幾つかの変数があります。spark.mllibではそれらを柔軟にするためにTFとIDFを分割します。

索引語頻度の実装は ハッシュのトリックを利用します。生の特徴はハッシュ関数を適用することでインデックス(単語)にマップされます。そして、索引語頻度はマップされたインデックスに基づいて計算されます。このやり方はグローバルな単語からインデックスへのマップの計算の必要性を避けます。これは大きなコープスには高くつくでしょうが、潜在的なハッシュの衝突の影響を受けます。生の特徴の違いはハッシュの後で同じ単語になるかも知れません。衝突の可能性を減らすために、目標の特徴次元を増やすことができます。つまり、ハッシュテーブルのバケットの数です。デフォルトの特徴次元は $2^{20} = 1,048,576$です。

注意: spark.mllib はテキストの文節化のためのツールを提供しません。ユーザはStanford NLP Group および scalanlp/chalk を参照してください。

TF と IDF は HashingTF および IDF で実装されています。HashingTF は入力として RDD[Iterable[_]] を取ります。各レコードは文字あるいは他のタイプの繰り返しかも知れません。

APIの詳細はHashingTF Scala ドキュメント を参照してください。

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector

val sc: SparkContext = ...

// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq)

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)

HashingTF の適用はデータへの1回だけの通過を必要としますが、IDFの適用は2回の通過を必要とします: 一回目はIDFベクトルを計算し、二回目はIDFによって索引語頻度をスケールします。

import org.apache.spark.mllib.feature.IDF

// ... continue from the previous example
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

spark.mllibのIDF実装はドキュメントの最小数より少ない数だけ発生する単語を無視するための選択肢を提供します。そのような場合、これらの単語のIDFは0に設定されます。この特徴は minDocFreq 値をIDFコンストラクタに渡すことで使われることができます。

import org.apache.spark.mllib.feature.IDF

// ... continue from the previous example
tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

TF と IDF は HashingTF および IDF で実装されています。HashingTF は入力としてリストのRDDを取ります。各レコードは文字あるいは他のタイプの繰り返しかも知れません。

APIの詳細はHashingTF Python ドキュメント を参照してください。

from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF

sc = SparkContext()

# Load documents (one per line).
documents = sc.textFile("...").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)

HashingTF の適用はデータへの1回だけの通過を必要としますが、IDFの適用は2回の通過を必要とします: 一回目はIDFベクトルを計算し、二回目はIDFによって索引語頻度をスケールします。

from pyspark.mllib.feature import IDF

# ... continue from the previous example
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

spark.mllibのIDF実装はドキュメントの最小数より少ない数だけ発生する単語を無視するための選択肢を提供します。そのような場合、これらの単語のIDFは0に設定されます。この特徴は minDocFreq 値をIDFコンストラクタに渡すことで使われることができます。

# ... continue from the previous example
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
tfidf = idf.transform(tf)

Word2Vec

Word2Vec は単語の分散型ベクトル表現を計算します。分散型表現の主な利点は似たような単語はベクトル空間内で近い場所にあるということで、小説のパターンへの一般化を容易にし、モデルの推定をもっと堅牢にします。分散型ベクトル表現は名前付きのエンティティ認識、明確化、パース、タグ付けおよび機械翻訳のような多くの自然言語処理アプリケーションで有効であると示されます。

モデル

Word2Vecの実装において、skip-gramモデルを使用しました。skip-gramの訓練の目的は同じ文章内でテキストの予測するのが得意な単語ベクトル表現を学習することです。数学的には、ある訓練ワードの系列 $w_1, w_2, \dots, w_T$においてskip-gramモデルの目的は対数尤度の平均の最大化です。 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \] ここで $k$ は訓練ウィンドウのサイズです。

skip-gramモデルにおいて、各単語 $w$ はベクトル $u_w$ と $v_w$ に関係があり、それらはそれぞれ単語およびコンテキストとしての$w$のベクトル表現です。指定された単語 $w_j$ の単語が正しく予測される確率 $w_i$ は、softmaxモデルによって決定されます。$V$が語彙のサイズとした時、\[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]です。vocabulary size.

$\log p(w_i | w_j)$ の計算コストは $V$ に比例するため、softmaxを使ったskip-gramモデルは高くつきます。簡単に数百万の単位になり目得ます。Word2Vecの訓練を高速可するために、階層的なsoftmaxを使いました。これは $\log p(w_i | w_j)$ の計算の複雑さを $O(\log(V))$ に減らします。

以下の例はテキストファイルをどうやってロードするかを説明し、それをSeq[String]のRDDとしてパースし、Word2Vec インスタンスを構築し、Word2VecModel を入力データに適合させます。最後に、指定された単語のトップ40の類義語を表示します。例を実行するには、最初にtext8データをダウンロードし、それを好きなディレクトリに解凍します。ここで、解凍されたファイルをtext8とし、同じディレクトリでspark shellを実行できると仮定します。

APIの詳細はWord2Vec Scala ドキュメント を参照してください。

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")

APIの詳細はWord2Vec Python ドキュメント を参照してください。

from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec

sc = SparkContext(appName='Word2Vec')
inp = sc.textFile("text8_lines").map(lambda row: row.split(" "))

word2vec = Word2Vec()
model = word2vec.fit(inp)

synonyms = model.findSynonyms('china', 40)

for word, cosine_distance in synonyms:
    print("{}: {}".format(word, cosine_distance))

StandardScaler

Standardizes features by scaling to unit variance and/or removing the mean using column summary statistics on the samples in the training set. これは事前処理のステップでとても一般的です。

例えば、全ての特徴が分散1 および/あるいは ゼロ平均を持つ場合、サポートベクトルマシーンのRBFカーネル あるいは L1およびL2正規化線形モデルは一般的に良く動作します。

Standardization can improve the convergence rate during the optimization process, and also prevents against features with very large variances exerting an overly large influence during model training.

モデルのフィッティング

StandardScaler はコンストラクタに以下のパラメータを持ちます:

StandardScalerにはRDD[Vector]の入力を取ることができる fit メソッドを提供し、要約の統計を学習し、 StandardScalerをどうやって設定するかに依存して入力データセットを標準分散1 および/あるいは ゼロ平均特徴に変換できます。

このモデルは変換されたVectorを生成するために Vectorに正規化を適用、あるいは変換されたRDD[Vector]を生成するためにRDD[Vector]に適用できる、VectorTransformer を実装します。

もし特徴の分散がゼロの場合、その特徴に対してVectorの中にデフォルトの 0.0 値を返すだろうことに注意してください。

以下の例はlibsvm形式のデータセットをロードし、新しい特徴が標準偏差1 および/あるいは ゼロの平均を持つように、特徴を標準化します。

APIの詳細はStandardScaler Scala ドキュメント を参照してください。

import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)

// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))

// Without converting the features into dense vectors, transformation with zero mean will raise
// exception on sparse vector.
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))

APIの詳細はStandardScaler Python ドキュメント を参照してください。

from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# scaler3 is an identical model to scaler2, and will produce identical transformations
scaler3 = StandardScalerModel(scaler2.std, scaler2.mean)


# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))

# Without converting the features into dense vectors, transformation with zero mean will raise
# exception on sparse vector.
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler1.transform(features.map(lambda x: Vectors.dense(x.toArray()))))

平均器

Normalizer は各標本が単位 $L^p$ ノルムを持つようにスケールします。これはテキストの分類あるいはクラスタリングで一般的な操作です。For example, the dot product of two $L^2$ normalized TF-IDF vectors is the cosine similarity of the vectors.

Normalizer はコンストラクタに以下のパラメータを持ちます:

Normalizer は変換されたVectorを生成するためにVectorに正規化を適用、あるいは変換されたRDD[Vector]を生成するためにRDD[Vector]に適用できる、 VectorTransformerを実装します。

入力のノルムがゼロの場合、入力ベクトルを返すだろうことに注意してください。

以下の例はlibsvm形式のデータセットをロードし、$L^2$ ノルム および $L^\infty$ ノルムを持つ特徴を正規化します。

APIの詳細はNormalizer Scala ドキュメント を参照してください。

import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)

// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))

// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))

APIの詳細はNormalizer Python ドキュメント を参照してください。

from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import Normalizer

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))

# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))

# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))

ChiSqSelector

特徴の選択 はモデルの構築時に使うために関連する特徴を識別しようとします。それは特徴空間のサイズを減らし、速度と統計的な学習挙動の両方を改善します。

ChiSqSelector はカイ二乗特徴抽出を実装します。分類特徴を持つラベル付けされたデータ上で操作します。ChiSqSelector はクラスから独立したカイ二乗検定に基づいて特徴を整列し、クラスのラベルが最も依存する一番上の特徴をフィルター(選択)します。これは最も予知力が高い特徴に明け渡すことに似ています。

選択する特徴の数は提出される検証セットを使って調整することができます。

モデルのフィッティング

ChiSqSelector はselectorが選択するだろうトップの特徴の数を指定する numTopFeatures を取ります。

fit メソッドは分類特徴を持つRDD[LabeledPoint]の入力を取り、要約統計量を学習し、入力データセットを削減された特徴空間に変換することができるChiSqSelectorModelを返します。ChiSqSelectorModelは、削減されたVectorを生成するためにVectorへ、あるいは削減されたRDD[Vector]を生成するためにRDD[Vector]へ適用することができます。

ユーザは選択された特徴インデックス(昇順に整列していなければなりません)の配列を提供することで手動で ChiSqSelectorModelを生成することもできることに注意してください。

以下の例はChiSqSelectorの基本的な使い方を示します:使用されるデータセットは各特長ごとに0から255まで変動するグレースケールの値からなる特徴マトリックスを持ちます。

APIの詳細はChiSqSelector Scala ドキュメント を参照してください。

import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.feature.ChiSqSelector

// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
  LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor } ) )
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp => 
  LabeledPoint(lp.label, transformer.transform(lp.features)) 
}

APIの詳細はChiSqSelector Java ドキュメント を参照してください。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaChiSqSelector");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(sc.sc(),
    "data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();

// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(
    new Function<LabeledPoint, LabeledPoint>() {
      @Override
      public LabeledPoint call(LabeledPoint lp) {
        final double[] discretizedFeatures = new double[lp.features().size()];
        for (int i = 0; i < lp.features().size(); ++i) {
          discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
        }
        return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
      }
    });

// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
final ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(
    new Function<LabeledPoint, LabeledPoint>() {
      @Override
      public LabeledPoint call(LabeledPoint lp) {
        return new LabeledPoint(lp.label(), transformer.transform(lp.features()));
      }
    }
);

sc.stop();

ElementwiseProduct

ElementwiseProduct は 要素ごとの積を使って各入力ベクトルを指定された"weight"で増やします。別の言い方をすると、データセットの各カラムを数値倍にスケールします。This represents the Hadamard product between the input vector, v and transforming vector, scalingVec, to yield a result vector. Qu8T948*1# Denoting the scalingVec as “w,” this transformation may be written as:

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

ElementwiseProduct はコンストラクタに以下のパラメータを持ちます:

ElementwiseProduct は変換されたVectorを生成するためにVectorに正規化を適用、あるいは変換されたRDD[Vector]を生成するためにRDD[Vector]に適用できる、 VectorTransformerを実装します。

以下の例は変換ベクトル値を使ってベクトルを変換する方法を実演します。

APIの詳細はElementwiseProduct Scala ドキュメント を参照してください。

import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors

// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Array(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)

// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))

APIの詳細はElementwiseProduct Java ドキュメント を参照してください。

import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = sc.parallelize(Arrays.asList(
  Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);

// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(
  new Function<Vector, Vector>() {
    @Override
    public Vector call(Vector v) {
      return transformer.transform(v);
    }
  }
);

APIの詳細はElementwiseProduct Python ドキュメント を参照してください。

from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import ElementwiseProduct

# Load and parse the data
sc = SparkContext()
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])

# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)

# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())

PCA

PCAを使ってベクトルを低次元空間に投影する特徴transformer。詳細は次元削減で読むことができます。

以下のコードはベクトル上の主成分をどうやって計算するか、線形回帰を計算するためにそれらを使って関係するラベルを維持したままベクトルを低次元空間に投影する方法を実演します:

APIの詳細はPCA Scala ドキュメント を参照してください。

import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.PCA

val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val pca = new PCA(training.first().features.size/2).fit(data.map(_.features))
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))

val numIterations = 100
val model = LinearRegressionWithSGD.train(training, numIterations)
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)

val valuesAndPreds = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

val valuesAndPreds_pca = test_pca.map { point =>
  val score = model_pca.predict(point.features)
  (score, point.label)
}

val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean()

println("Mean Squared Error = " + MSE)
println("PCA Mean Squared Error = " + MSE_pca)
TOP
inserted by FC2 system