在上一篇文章中介绍了ALS算法的原理(点击阅读),在这篇文章中主要介绍一下ALS算法在Spark中的实现。

概述

协同过滤(Collaborative Filtering)在推荐系统中应用的非常广,该算法的目标是去填充用户-物品评分矩阵中的缺失值,即未评分。该算法的Spark的ML包和MLlib包中均有实现。

其中涉及的参数如下:

  • numBlocks:数据分区的数目,默认为10
  • rank:隐向量的长度,默认是10(m n => m k - k * n)
  • maxIter:最大迭代次数,默认为10
  • regParam:正则化参数系数,默认为1.0
  • implicitPrefs:控制使用显式反馈还是隐式反馈,默认是false即显式反馈。
  • alpha:隐式反馈时的置信度参数,默认为1.0
  • nonnegative:是否对最小二乘使用非负约束,默认为false

隐式反馈与显式反馈

基于矩阵分解的协同过滤标准方法将用户-物品矩阵中的rate视为用户对项目给出的显式偏好,例如:用户对电影进行评分。

在许多实际的用例中,通常只能获取隐式反馈数据(例如:观看,点击,购买,喜欢,分享等)。spark.ml中用于处理此类数据的方法取自Collaborative Filtering for Implicit Feedback Datasets。本质上,这种方法不是试图直接对评级矩阵进行建模,而是将数据视为表示用户操作观察强度的数字(例如点击次数或某人花在观看电影上的累积持续时间)。然后,这些数字与观察到的用户偏好的置信水平相关,而不是与项目的明确评级相关。然后,该模型试图找到可用于预测用户对项目的预期偏好的潜在因素。

正则化参数

通过用户-物品的评分矩阵中用户的评分物品数和物品收到的评分个数来作为正则项,解决最小二乘更新过程中的问题。 这种方法被命名为“ALS-WR”,可以参考论文: Collaborative Filtering for Implicit Feedback Datasets。它减小来regParam对数据集规模的依赖,因此我们可以将从采样子集中学习的最佳参数应用于完整数据集,并获得较好的结果。

应用场景

Spark ALS算法支持输出item 或者user的隐向量,据此我们可以计算出用户或者物品的相似度,继而进行排序得到用户或者item的top N相似user或者item。这样在数据进行召回时便可以进行召回了。

比如根据用户用行为的物品召回,当用户浏览了若干了item时,便将这些item相似的item加入到召回池中,进行rank排序。

ML中的ALS实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
object ALSML {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[5]").appName("ALSML").enableHiveSupport().getOrCreate()
Logger.getRootLogger.setLevel(Level.WARN)

val input = "data/sample_movielens_ratings.txt"
val model_param = "maxIters:10,rank:5,numBlocks:10,regParam:0.01,alpha:0.618,userCol:userId,itemCol:movieId,rateCol:rating,implicitPrefs:true"
val output_model = "model/als_ml"
// 训练模型 找到合适的参数
runBasedML(spark,input,model_param,output_model)
}

def runBasedML(spark: SparkSession, input: String, param: String,output_model_path: String) = {
import spark.sqlContext.implicits._
val ratings = spark.read.textFile(input).map(parseRating).toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

println("创建并训练ALS模型 ...")
val als = ALSMLUtil.createModel(param)
val model = als.fit(training)
println("模型的效果评估 ...")
ALSMLUtil.evaluateModel(model, test)

println("为用户进行item推荐 ...")
model.recommendForAllUsers(10).show(10)

println("为指定用户进行top N item推荐 ...")
val users = ratings.select(als.getUserCol).distinct().limit(3)
model.recommendForUserSubset(users,10).show(10)

println("为item进行用户推荐 ...")
model.recommendForAllItems(10).show(10)
println("为指定的item进行top N 用户推荐 ...")
val movies = ratings.select(als.getItemCol).distinct().limit(3)
model.recommendForItemSubset(movies, 10).show(10)

println("输出隐向量 ...")
model.itemFactors.rdd.map(f => (f.get(0), f.getList(1).toArray.mkString(","))).take(10).foreach(println)

println("保存与加载模型 ...")
model.write.overwrite().save(output_model_path)
val newModel = ALSModel.load(output_model_path)
newModel.itemFactors.rdd.map(f => (f.get(0), f.getList(1).toArray.mkString(","))).take(10).foreach(println)
}

def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
}

MLlib中的ALS实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
object ALSMLlib {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[5]").appName("ALS").enableHiveSupport().getOrCreate()
Logger.getRootLogger.setLevel(Level.WARN)

val input = "data/sample_movielens_ratings.txt"
val model_param = "maxIters:10,rank:5,numBlocks:10,regParam:0.01,alpha:0.618,implicitPrefs:true"
val output_model_path = "model/als_ml"

run(spark, input, model_param, output_model_path)
}

def run(spark: SparkSession, input: String, model_param: String, output_model_path: String): Unit = {
println("加载数据 ...")
val ratings = spark.sparkContext.textFile(input)
.map(_.split("::").slice(0,3) match { case Array(userId, movieId, rating) =>
Rating(userId.toString.toInt, movieId.toString.toInt, rating.toString.toDouble)
})
println("训练模型 ...")
val param = new ALSMLlibParam()
param.parseString(model_param)
val model = ALS.train(ratings,param.getRank, param.getMaxIters,param.getAlpha,param.getNumBlocks)

println("评估模型 ...")
val usersProducts = ratings.map { case Rating(user, product, rate) => (user, product) }
val predictions = model.predict(usersProducts).map{ case Rating(user, product, rate) => ((user,product),rate)}
val rateAndPre = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions)
val MSE = rateAndPre.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println("Mean Squared Error = " + MSE)


println(s"用户(2)对 物品(2)的预测评分为:${model.predict(2,2)}")
println("用户纬度的特征向量为:")
model.userFeatures.map(f => (f._1,f._2.mkString(","))).take(10).foreach(println)
println("物品纬度的特征向量为:")
model.productFeatures.map(f => (f._1,f._2.mkString(","))).take(10).foreach(println)
}
}

问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1364)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
at com.kk.recommend.tools.model.ALSBasedMLUtil$.evaluateModel(ALSBasedMLUtil.scala:51)
at com.kk.recommend.topic.follow.ItemCFV2$.testBasedML(ItemCFV2.scala:104)
at com.kk.recommend.topic.follow.ItemCFV2$.main(ItemCFV2.scala:40)
at com.kk.recommend.topic.follow.ItemCFV2.main(ItemCFV2.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
Caused by: java.lang.ArrayStoreException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
at scala.collection.IndexedSeqOptimized$class.copyToArray(IndexedSeqOptimized.scala:180)
at scala.collection.mutable.WrappedArray.copyToArray(WrappedArray.scala:35)
at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:104)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at scala.collection.mutable.WrappedArray.toArray(WrappedArray.scala:73)
at com.kk.recommend.tools.model.ALSBasedMLUtil$$anonfun$2.apply(ALSBasedMLUtil.scala:48)
at com.kk.recommend.tools.model.ALSBasedMLUtil$$anonfun$2.apply(ALSBasedMLUtil.scala:46)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1364)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

结局办法:

解决办法:点击阅读

打印的Schema信息:

1
2
3
4
5
6
root
|-- userId: integer (nullable = false)
|-- recommendations: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- topicId: integer (nullable = true)
| | |-- rating: float (nullable = true)

在row 中的这些列取得时候,要根据类型取,简单的像String,Seq[Double] 这种类型就可以直接取出来,但是像 Seq[(Double,Double)] 这种类型直接取得花就会丢失schema信息,虽然值能取到,但是schema信息丢了,在dataFrame中操作的时候就会抛错

ALS测试结果数据的格式如下:

userId recommendations
148 [[1972, 0.0334868…

原始的写法是:

1
2
3
4
5
6
7
result.select("userId", "recommendations")
.filter(row => !(row.isNullAt(0) || row.isNullAt(1)))
.rdd.flatMap( l=>{
val uid = l.get(0).toString
val itemList = l.getAs[mutable.WrappedArray[(Int,Double)]]("recommendations")
for(item<- itemList) yield (uid, item._1.toString)
})

修改后为:

1
2
3
4
5
6
7
result.select("userId", "recommendations")
.filter(row => !(row.isNullAt(0) || row.isNullAt(1)))
.rdd.flatMap( l=>{
val uid = l.get(0).toString
val itemList= l.getAs[Seq[Row]](1).map(x=>{(x.getInt(0),x.getFloat(1))})
for(item<- itemList) yield (uid, item._1.toString)
})


【搜索与推荐Wiki】专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!