協調フィルタリング - RDDベースのAPI

協調フィルタリング

協調フィルタリング は一般的にレコメンドシステムで使われます。これらの技術はユーザ-アイテム関連のマトリックスの失われたエントリを満たすことを目的にしています。spark.mllib は現在のところモデルベースの協調フィルタリングをサポートします。ユーザとプロダクトは失われたエントリを予測するために使うことができる見えない要素の小さなセットによって記述されます。spark.mllibはこれらの見えない要素を学習するために 交互最小二乗法 (ALS) アルゴリズムを使用します。spark.mlでの実装は以下のパラメータを持ちます:

明示的 vs 暗黙的 なフィードバック

協調フィルタリングに基づいたマトリックスの要素化の標準的なやり方はユーザ-アイテムのマトリックスのエントリをユーザによってアイテムに与えられる明示的な 選択、例えばユーザが映画に与えたレーティング、として扱われます。

実際の世界では暗黙的なフィードバックにのみアクセスする利用法が一般的です(例えば、ビュー、クリック、購入、好き、共有など)。spark.mllibで使われるそのようなデータを扱うやり方は暗黙的なフィードバックデータセットのための協調フィルタリングから拝借されています。本質的に、直接レートのマトリックスをモデル化しようとする代わりに、この方法はデータをユーザのアクション(クリックの数、あるいは誰かが映画を見るのに費やす累積持続機関など)の観察での強度を表す数値として扱います。それらの数は項目に明示的に与えられたレートよりも観測されたユーザ選択での信頼レベルに関係します。そして、モデルはアイテムに対するユーザの期待された選択を予想するために使うことができる見えない要素を見つけようとします。

規則化パラメータのスケーリング

v1.1から、ユーザ要素の更新時に生成されたユーザのレート数、あるいは商品要素の更新時に受信された商品んのレート数によって、各最小自乗問題の解決時の正規化パラメータlambdaをスケールします。このやり方は"ALS-WR"と名づけられ、論文"Large-Scale Parallel Collaborative Filtering for the Netflix Prize"の中で議論されました。それはlambda のデータのスケールへの依存を少なくするため、標本化された部分集合から学習した最適なパラメータを完全なデータセットに適用し、似たパフォーマンスを期待することができます。

以下の例では、レートのデータをロードします。各行はユーザ、商品およびレートからなります。レートが明示的なものと仮定するデフォルトのALS.train()メソッドを使用します。レートの予想の平均二乗エラーを計測することでレコメンドのモデルを評価します。

APIの詳細はALS Scala ドキュメント を参照してください。

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/RecommendationExample.scala" で見つかります。

レートマトリックスが他の情報元から由来する場合(つまり、他の信号から推測された)、より良い結果を得るためにtrainImplicit を使うことができます。

val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

全てのMLlibのメソッドはJava-friendly なタイプを使用するため、Scalaでするのと同じような方法でimportおよびcallをすることができます。注意しなければならないことは、Spark Java APIは別個のJavaRDD クラスを使用するが、そのメソッドはScalaの RDDオブジェクトを取るということです。JavaRDDオブジェクト上で.rdd() を呼ぶことでJava RDDをScala RDDに変換することができます。Scalaで提供された例に等しい自己内包アプリケーションの例が以下です:

APIの詳細はALS Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(
  new Function<String, Rating>() {
    public Rating call(String s) {
      String[] sarray = s.split(",");
      return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]),
        Double.parseDouble(sarray[2]));
    }
  }
);

// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
  new Function<Rating, Tuple2<Object, Object>>() {
    public Tuple2<Object, Object> call(Rating r) {
      return new Tuple2<Object, Object>(r.user(), r.product());
    }
  }
);
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
  model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
    new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
      public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
        return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
      }
    }
  ));
JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
  JavaPairRDD.fromJavaRDD(ratings.map(
    new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
      public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
        return new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating());
      }
    }
  )).join(predictions).values();
double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
  new Function<Tuple2<Double, Double>, Object>() {
    public Object call(Tuple2<Double, Double> pair) {
      Double err = pair._1() - pair._2();
      return err * err;
    }
  }
).rdd()).mean();
System.out.println("Mean Squared Error = " + MSE);

// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
  "target/tmp/myCollaborativeFilter");
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java" で見つかります。

以下の例では、レートのデータをロードします。各行はユーザ、商品およびレートからなります。レートが明示的だと見なすことができるデフォルトのALS.train()メソッドを使います。レートの予想の平均二乗エラーを計測することでレコメンドのモデルを評価します。

APIの詳細はALS Python ドキュメント を参照してください。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
例の完全なコードは Spark のリポジトリの "examples/src/main/python/mllib/recommendation_example.py" で見つかります。

レートマトリックスが他の情報元から由来する場合(つまり、他の信号から推測された)、より良い結果を得るために trainImplicit を使うことができます。

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)

上のアプリケーションを実行するために、Spark クイックガイドの自己内包型アプリケーションの章で提供される指示に従います。. 依存性としてビルドファイルにspark-mllibも含めるようにしてください。

チュートリアル

Spark Summit 2014 の訓練の練習spark.mllibを使った映画のお勧めのパーソナライズを含んでいます。

TOP
inserted by FC2 system