頻出パターンマイニング - spark.mllib
Mining frequent items, itemsets, subsequences, or other substructures is usually among the first steps to analyze a large-scale dataset, which has been an active research topic in data mining for years. 更に詳しい情報は Wikipediaの 関連ルール学習を見てください。spark.mllib
はFP-growthの並行実装を提供します。これは頻出アイテム集合のマイニングで人気のあるアルゴリズムです。
FP-growth
FP-growth アルゴリズムは Han et al., Mining frequent patterns without candidate generationで説明されています。"FP"はリクエストのパターンを意味します。処理のデータセットが与えられた場合、FP-growthの最初のステップはアイテム頻度と頻度が高いアイテムを識別です。同じ目的のために設計されたApriori-likeアルゴリズムと異なり、FP-growthの二つ目のステップは明示的に候補セットを生成することなしに処理をエンコードするためにサフィックスツリー(FP-tree)を使います。これは通常生成するのが高くつきます。二つ目のステップの後で、頻出アイテム集合をFP-treeから抽出することができます。spark.mllib</c1>では、Li et al., PFP: Parallel FP-growth for query recommendationで説明される、PFPと呼ばれるFP-growthの並行バージョンを実装しています。PFP は処理のサフィックスに基づいて成長するFP-treeの作業を分散し、従って1つのマシーンの実装よりもスケーラブルです。詳細は論文を参照してください。
<c6>spark.mllib</c6>の FP-growth 実装は以下の(ハイパー)パラメータを取ります:
minSupport
: 頻度として識別されるアイテム集合のための最小限のサポート。例えば、もしアイテムが5処理のうち3つで現れた場合、3/5=0.6のサポートを持ちます。numPartitions
: 作業を分散するために使われるパーティションの数。
例
FPGrowth
はFP-growthアルゴリズムを実装します。トランザクションのRDD
を取り、各トランザクションは一般的な種類の単語の配列
です。トランザクションと一緒に FPGrowth.run
を呼び出すと、頻度付きの頻出アイテム集合を格納する FPGrowthModel
を返します。以下の例は、頻出アイテム集合および関連ルール(詳細は関連ルール を見てください)をトランザクション
から採掘する方法を説明します。
APIの詳細はFPGrowth
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(
rule.antecedent.mkString("[", ",", "]")
+ " => " + rule.consequent .mkString("[", ",", "]")
+ ", " + rule.confidence)
}
FPGrowth
はFP-growthアルゴリズムを実装します。トランザクションのJavaRDD
を取り、各トランザクションは一般的な種類のアイテムのIterable
です。トランザクションと一緒に FPGrowth.run
を呼び出すと、頻度付きの頻出アイテム集合を格納する FPGrowthModel
を返します。以下の例は、頻出アイテム集合および関連ルール(詳細は関連ルール を見てください)をトランザクション
から採掘する方法を説明します。
APIの詳細はFPGrowth
Java ドキュメント を参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
JavaRDD<List<String>> transactions = data.map(
new Function<String, List<String>>() {
public List<String> call(String line) {
String[] parts = line.split(" ");
return Arrays.asList(parts);
}
}
);
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
FPGrowth
はFP-growthアルゴリズムを実装します。トランザクションのRDD
を取り、各トランザクションは一般的な種類の単語のリスト
です。トランザクションと一緒に FPGrowth.train
を呼び出すと、頻度付きの頻出アイテム集合を格納する FPGrowthModel
を返します。
APIの詳細はFPGrowth
Python ドキュメント を参照してください。
from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
print(fi)
関連ルール
AssociationRules implements a parallel rule generation algorithm for constructing rules that have a single item as the consequent.
APIの詳細はAssociationRules
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
val freqItemsets = sc.parallelize(Seq(
new FreqItemset(Array("a"), 15L),
new FreqItemset(Array("b"), 35L),
new FreqItemset(Array("a", "b"), 12L)
))
val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
results.collect().foreach { rule =>
println("[" + rule.antecedent.mkString(",")
+ "=>"
+ rule.consequent.mkString(",") + "]," + rule.confidence)
}
AssociationRules implements a parallel rule generation algorithm for constructing rules that have a single item as the consequent.
APIの詳細はAssociationRules
Java ドキュメント を参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<String>(new String[] {"a"}, 15L),
new FreqItemset<String>(new String[] {"b"}, 35L),
new FreqItemset<String>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
PrefixSpan
PrefixSpan はPei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approachで説明される連続するパターンのマイニングアルゴリズムです。頻出系列パターン マイニング問題の形式化については参照される論文を参照してください。
spark.mllib
のPrefixSpan 実装は以下の(ハイパー)パラメータを取ります:
minSupport
: 頻出系列パターンを考慮するために必要とされる最小限のサポート。maxPatternLength
: 頻出系列パターンの最大の長さ。この長さを超えるどのような頻出系列パターンも結果には含まれないでしょう。maxLocalProjDBSize
: the maximum number of items allowed in a prefix-projected database before local iterative processing of the projected databse begins. このパラメータはexecutorのサイズに関して調整するべきです。
例
以下の例は系列上の PrefixSpan実行(Pei et alとして同じ表記を使用)を説明します。
<(12)3>
<1(32)(12)>
<(12)5>
<6>
PrefixSpan
は PrefixSpan アルゴリズムを実装します。PrefixSpan.run
の呼び出しは、頻度付きの頻出系列を格納するPrefixSpanModel
を返します。
APIの詳細はPrefixSpan
Scala ドキュメント および PrefixSpanModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.fpm.PrefixSpan
val sequences = sc.parallelize(Seq(
Array(Array(1, 2), Array(3)),
Array(Array(1), Array(3, 2), Array(1, 2)),
Array(Array(1, 2), Array(5)),
Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
println(
freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]") +
", " + freqSequence.freq)
}
PrefixSpan
は PrefixSpan アルゴリズムを実装します。PrefixSpan.run
の呼び出しは、頻度付きの頻出系列を格納するPrefixSpanModel
を返します。
APIの詳細はPrefixSpan
Java ドキュメント および PrefixSpanModel
Java ドキュメント を参照してください。d
import java.util.Arrays;
import java.util.List;
import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}