0%

Scala开发|电商推荐系统-基于Item的协同过滤

实习做了一个个性化推荐项目,尝试了协同过滤和word2vec两种算法,现在实习结束了做点记录。本篇介绍协同过滤算法,公司大数据平台开发环境简陋,没有Python编译环境,脚本是用Scala写的。

协同过滤


协同过滤(collaborative filtering)是一种在推荐系统中广泛使用的技术。该技术通过分析用户或者事物之间的相似性(“协同”),来预测用户可能感兴趣的内容并将此内容推荐给用户。这里的相似性可以是人口特征(性别、年龄、居住地等)的相似性,也可以是历史浏览内容的相似性(比如都关注过和中餐相关的内容),还可以是个人通过一定机制给予某个事物的回应(比如一些教学网站会让用户对授课人进行评分)。比如,用户A和B都是居住在北京的年龄在20-30岁的女性,并且都关注过化妆品和衣物相关的内容。这种情况下,协同过滤可能会认为,A和B相似程度很高。于是可能会把A关注B没有关注的内容推荐给B,反之亦然。

在协同过滤算法中,基于用户的相似性推荐叫UserCF,基于商品的推荐叫ItemCF

算法思路


电商场景中产品规模大而用户购买行为矩阵稀疏,应选用ItemCF进行相似性推荐。

算法思想:
使用用户订单记录计算商品相似性,如果一个用户同时购买了商品1和商品2,则认为这两个商品具有比较高的相似性。根据商品相似性进行推荐,如果计算得出商品1和商品2的相似性较高,就对已购买了商品1且未购买商品2的用户推荐商品2。

实现思路如下:

  1. 构建共现矩阵。根据用户的行为,构建以用户为行坐标,物品为纵坐标的共现矩阵。
  2. 构建物品相似度矩阵。根据共现矩阵计算两两物品之间的相似度,得到物品相似度矩阵。
  3. 获取Topn相似物品。根据用户历史正反馈物品,找出最相似的n件物品。
  4. 计算用户对Topn物品的喜好度。用户对物品的喜好度定义为:当前物品和用户历史物品评分的加权和,加权系数是前面计算的物品相似度。在我的问题背景中,用户只是购买了产品而未对其进行评分,因此使用用户对某件物品的购买次数作为偏好。计算方法如下所示:
    $$r_{u,p}=\sum_{i}w_{p,i} \cdot r_{u,i}.$$
  5. 根据喜好度生成排序结果。

完整代码


首先定义一些通用类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package recommend

import scala.math._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

/**
* 用户评分.
* @param userid 用户
* @param itemid 评分物品
* @param pref 评分
*/
case class ItemPref(
val userid: String,
val itemid: String,
val pref: Double) extends Serializable
/**
* 用户推荐.
* @param userid 用户
* @param itemid 推荐物品
* @param pref 评分/或购买次数
*/
case class UserRecomm(
val userid: String,
val itemid: String,
val pref: Double) extends Serializable
/**
* 相似度.
* @param itemid1 物品
* @param itemid2 物品
* @param similar 相似度
*/
case class ItemSimi(
val itemid1: String,
val itemid2: String,
val similar: Double) extends Serializable

/**
* 相似度计算.
* 这里使用的是余弦相似度
*
*/
class ItemSimilarity extends Serializable {

/**
* 相似度计算.
* @param user_rdd 用户评分
* @param stype 计算相似度公式
* @param RDD[ItemSimi] 返回物品相似度
*
*/
def Similarity(user_rdd: RDD[ItemPref], stype: String): (RDD[ItemSimi]) = {
val simil_rdd = stype match {
case "cooccurrence" =>
ItemSimilarity.CooccurrenceSimilarity(user_rdd)
case _ =>
ItemSimilarity.CooccurrenceSimilarity(user_rdd)
}
simil_rdd
}

}

定义相似度计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
object ItemSimilarity{
/**
* 共现相似度矩阵计算
* w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
* @param user_rdd 用户评分或用户观看/购买记录
* @param RDD[ItemSimi] 返回用户相似度
*/
def CooccurrenceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
val user_rdd1 = user_rdd.map(f=>(f.user, f.item, f.pref))
val user_rdd2 = user_rdd.map(f=>(f._1, f._2))
// 1. (用户:物品)笛卡尔积(用户:物品)=>物品:物品组合 (同时出现在某一用户的记录中)
val user_rdd3 = user_rdd2.join(user_rdd2)
val user_rdd4 = user_rdd3.map(f=>(f._2, 1)) //((物品, 物品), 1)
// 2. 物品:物品:物品频次
val user_rdd5 = user_rdd4.reduceByKey((x,y)=>x+y)
// 3. 对角矩阵:只留下x=y的
val user_rdd6 = user_rdd5.filter(f=>f._1._1==f._1._2) //对角线上的((物品, 物品), 频次)
// 4. 非对角矩阵
val user_rdd7 = user_rdd5.filter(f=>f._1._1!=f._1._2) //非对角线上的((物品, 物品), 频次)
// 5. 计算共现相似度 (物品1,物品2,共现频次)
val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).join(user_rdd6.map(f => (f._1._1, f._2)))
//(物品1,((物品1,物品2,物品1和2共现频次),物品1出现的次数))
val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1, f._2._1._2, f._2._1._3, f._2._2)))
//(物品2, (物品1, 物品2, 物品1和2共现频次, 物品1出现的次数))
val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
//(物品2, ((物品1, 物品2, 物品1和2共现频次, 物品1出现的次数), 物品2出现的次数))
val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
//(物品1,物品2,物品1和2共现频次,物品1出现的次数,物品2出现的次数)
val user_rdd12 = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
//(物品1,物品2,物品1和2的共现相似度)
// 6 结果返回
user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))//将计算结果存为一个ItemSimi类
}

模型调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package recommend

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

object ItemCF {
def main(args: Array[String]) {

//0 构建Spark对象
val conf = new SparkConf().setAppName("ItemCF")
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)

//1 读取样本数据
val data_path = "data.txt"
val data = sc.textFile(data_path)
val userdata = data.map(_.split(",")).map(f => (ItemPref(f(0), f(1), f(2).toDouble))).cache()
//整理成itemPref

//2 建立模型
val mysimil = new ItemSimilarity()
val simil_rdd1 = mysimil.Similarity(userdata, "cooccurrence")
val recommd = new RecommendedItem
val recommd_rdd1 = recommd.Recommend(simil_rdd1, userdata, 30)

//3 打印结果
println(s"物品相似度矩阵: ${simil_rdd1.count()}")
simil_rdd1.collect().foreach { ItemSimi =>
println(ItemSimi.itemid1 + ", " + ItemSimi.itemid2 + ", " + ItemSimi.similar)
}
println(s"用戶推荐列表: ${recommd_rdd1.count()}")
recommd_rdd1.collect().foreach { UserRecomm =>
println(UserRecomm.userid + ", " + UserRecomm.itemid + ", " + UserRecomm.pref)
}

}
}

进行推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
   package recommend

import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._

class RecommendedItem {

/**
* 用户推荐计算.
* @param items_similar 物品相似度
* @param user_prefer 用户评分
* @param r_number 推荐数量
* @param RDD[UserRecomm] 返回用户推荐物品
*/
def Recommend(items_similar: RDD[ItemSimi],
user_prefer: RDD[ItemPref],
r_number: Int): (RDD[UserRecomm]) = {
// 0 数据准备
val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
// 1 矩阵计算——i行与j列join
val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
join(user_prefer1.map(f => (f._2, (f._1, f._3)))) //(itemid1, ((itemid2,similarity), (user, preference)))
// 2 矩阵计算——i行与j列元素相乘
val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
//((user,itemid2),similarity*preference)
// 3 矩阵计算——用户:元素累加求和,将不同的item1计算出的相似评分加总
val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
// 4 矩阵计算——用户:对结果过滤已有I2
val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
//(user, (itemid2, similarity*preference))
// 5 矩阵计算——用户:用户对结果排序,过滤
val rdd_app1_R6 = rdd_app1_R5.groupByKey()
val rdd_app1_R7 = rdd_app1_R6.map(f => {
val i2 = f._2.toBuffer
val i2_2 = i2.sortBy(_._2)
if (i2_2.length > r_number) i2_2.remove(0, (i2_2.length - r_number))
(f._1, i2_2.toIterable)
})//(user, (itemid2, similarity*preference))
val rdd_app1_R8 = rdd_app1_R7.flatMap(f => {
val id2 = f._2
//此处id2是一个iterable的对象
for (w <- id2) yield (f._1, w._1, w._2)
})
rdd_app1_R8.map(f => UserRecomm(f._1, f._2, f._3))
}
}

/**
* 用户推荐计算.
* @param items_similar 物品相似度
* @param user_prefer 用户评分
* @param RDD[UserRecomm] 返回用户推荐物品
*/
def Recommend(items_similar: RDD[ItemSimi],
user_prefer: RDD[ItemPref]): (RDD[UserRecomm]) = {
// 0 数据准备
val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
// 1 矩阵计算——i行与j列join
val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
join(user_prefer1.map(f => (f._2, (f._1, f._3))))
// 2 矩阵计算——i行与j列元素相乘
val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
// 3 矩阵计算——用户:元素累加求和
val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
// 4 矩阵计算——用户:对结果过滤已有I2
val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
// 5 矩阵计算——用户:用户对结果排序,过滤
val rdd_app1_R6 = rdd_app1_R5.map(f => (f._1, f._2._1, f._2._2)).
sortBy(f => (f._1, f._3))
rdd_app1_R6.map(f => UserRecomm(f._1, f._2, f._3))
}
}

这个算法实现之后在公司跑了一下,发现虽然算法很简单,但需要的计算资源很大,即使在Spark集群中,跑100万条记录的数据量也很费劲(当然可能跟分配的资源有关系),所以后来就没有继续使用了。

写着写着发现word2vec的代码忘记自己备份了,嘿嘿,那就先发这个吧。
word2vec因为spark中封装了相应的方法,实现起来比ItemCF容易许多,计算速度也很快。但很让人头痛的地方是spark提供的包中封装的方法太少,如果要在该算法上做一些调整,可能还需要从更底层开始实现。