協調フィルタリング

協調フィルタリング

協調フィルタリング は一般的にレコメンドシステムで使われます。これらの技術はユーザ-アイテム関連のマトリックスの失われたエントリを満たすことを目的にしています。spark.ml は現在のところモデルベースの協調フィルタリングをサポートします。ユーザとプロダクトは失われたエントリを予測するために使うことができる見えない要素の小さなセットによって記述されます。spark.mlはこれらの見えない要素を学習するために 交互最小二乗法 (ALS) アルゴリズムを使用します。spark.mlでの実装は以下のパラメータを持ちます:

注意: ALSのためのDataFrameに基づいたAPI は現在のところユーザと項目のidとして整数のみをサポートします。他の数値タイプはユーザと項目のidカラムのためにサポートされますが、idは整数値の範囲内になければなりません。

明示的 vs 暗黙的 なフィードバック

協調フィルタリングに基づいたマトリックスの要素化の標準的なやり方はユーザ-アイテムのマトリックスのエントリをユーザによってアイテムに与えられる明示的な 選択、例えばユーザが映画に与えたレーティング、として扱われます。

実際の世界では暗黙的なフィードバックにのみアクセスする利用法が一般的です(例えば、ビュー、クリック、購入、好き、共有など)。spark.mlで使われるそのようなデータを扱うやり方は暗黙的なフィードバックデータセットのための協調フィルタリングから拝借されています。本質的に、直接レートのマトリックスをモデル化しようとする代わりに、この方法はデータをユーザのアクション(クリックの数、あるいは誰かが映画を見るのに費やす累積持続機関など)の観察での強度を表す数値として扱います。それらの数は項目に明示的に与えられたレートよりも観測されたユーザ選択での信頼レベルに関係します。そして、モデルはアイテムに対するユーザの期待された選択を予想するために使うことができる見えない要素を見つけようとします。

規則化パラメータのスケーリング

ユーザ要素の更新時に生成されたユーザのレート数、あるいは商品要素の更新時に受信された商品のレート数によって、各最小自乗問題の解決時の正規化パラメータregParamをスケールします。このやり方は"ALS-WR"と名づけられ、論文"Large-Scale Parallel Collaborative Filtering for the Netflix Prize"の中で議論されました。それはregParam のデータのスケールへの依存を少なくするため、標本化された部分集合から学習した最適なパラメータを完全なデータセットに適用し、似たパフォーマンスを期待することができます。

コールド スタート戦略

ALSModelを使って予測をする場合は、モデルの訓練中には存在しないテストデータセットの中でユーザと/または項目を出合わせることは一般的です。これは一般的には二つのシナリオで起こります:

  1. プロダクションでは、新しいユーザあるいは項目はレートの履歴を持たず、モデルは訓練されていません (これが "コールドスタート問題"です)。
  2. クロス検証の間、データは訓練と評価のセットに分割されます。Sparkの CrossValidator あるいは TrainValidationSplitのように単純なランダム分割の場合、訓練セットに無い評価セットの中でユーザと/あるいは項目が出くわすことが本当に一般的です。

デフォルトで、Sparkはユーザと/あるいは項目の要素がモデル内に無い場合に ALSModel.transform の間にNaN予想を割り当てます。それは新しいユーザあるいは項目を表し、システムは予想として使う幾つかのフォールバックで決定をすることができるため、これはプロダクションで便利です。

しかし、どの予測された値NaN も評価メトリック(例えばRegressionEvaluatorを使う時)としてNaN の結果になるので、クロス検証の間では望ましいものではありません。これによりモデルの選択が不可能になります。

NaNの値を含む予想のDataFrame内の全ての削除するために、Sparkはユーザに coldStartStrategy パラメータを “drop” にすることを許可します。評価の測定基準は非-NaN データ上で計算され、有効になるでしょう。このパラメータの使用方法は以下の例で説明されます。

注意: 現在のところ、サポートされているコールドスタート戦略は “nan” (上で述べられたデフォルトの挙動) と “drop” です。将来はもっと多くの戦略がサポートされるかも知れません。

以下の例の中で、レーティングのデータをMovieLens datasetからロードします。各行はユーザ、映画、レーティング、およびタイムスタンプから成ります。そして、デフォルトでレーティングが明示的(implicitPrefsfalse)と仮定するALSモデルを訓練します。レートの予想の平均二乗平方根誤差を計測することでレコメンドのモデルを評価します。

APIの詳細はALS Scala ドキュメント を参照してください。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
例の完全なコードは Spark のリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala" で見つかります。

レートマトリックスが他の情報元から由来する場合(つまり、他の信号から推測された)、より良い結果を得るためにimplicitPrefstrueに設定することができます。

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

以下の例の中で、レーティングのデータをMovieLens datasetからロードします。各行はユーザ、映画、レーティング、およびタイムスタンプから成ります。そして、デフォルトでレーティングが明示的(implicitPrefsfalse)と仮定するALSモデルを訓練します。レートの予想の平均二乗平方根誤差を計測することでレコメンドのモデルを評価します。

APIの詳細はALS Java ドキュメント を参照してください。

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);

// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);

// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
例の完全なコードは Spark のリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java" で見つかります。

レートマトリックスが他の情報元から由来する場合(つまり、他の信号から推測された)、より良い結果を得るためにimplicitPrefstrueに設定することができます。

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

以下の例の中で、レーティングのデータをMovieLens datasetからロードします。各行はユーザ、映画、レーティング、およびタイムスタンプから成ります。そして、デフォルトでレーティングが明示的(implicitPrefsFalse)と仮定するALSモデルを訓練します。レートの予想の平均二乗平方根誤差を計測することでレコメンドのモデルを評価します。

APIの詳細はALS Python ドキュメント を参照してください。

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
例の完全なコードはSparkのリポジトリの "examples/src/main/python/ml/als_example.py" で見つかります。

レートマトリックスが他の情報元から由来する場合(つまり、他の信号から推測された)、より良い結果を得るためにimplicitPrefsTrueに設定することができます。

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

詳細は R API ドキュメント を参照してください。

# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
             list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df

# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
                   itemCol = "movieId", ratingCol = "rating")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
例の完全なコードはSparkのリポジトリの "examples/src/main/r/ml/als.R" で見つかります。
TOP
inserted by FC2 system