前面我们完成了所有的数据准备,接下来,就要开始召回阶段的工作了,可以做离线召回,也可以做在线召回,召回算法通常包括基于内容的召回和基于协同过滤的召回。ALS 模型是一种基于模型的协同过滤召回算法,本文将通过 ALS 模型实现离线召回。

首先,我们在 Hbase 中创建召回结果表 cb_recall,这里用不同列族来存储不同方式的召回结果,其中 als 表示模型召回,content 表示内容召回,online 表示在线召回。通过设置多个版本来存储多次召回结果,通过设置生存期来清除长时间未被使用的召回结果。

1
2
3
4
5
6
7
8
create 'cb_recall', {NAME=>'als', TTL=>7776000, VERSIONS=>999999}
alter 'cb_recall', {NAME=>'content', TTL=>7776000, VERSIONS=>999999}
alter 'cb_recall', {NAME=>'online', TTL=>7776000, VERSIONS=>999999}

# 插入样例
put 'cb_recall', 'recall:user:5', 'als:2',[1,2,3,4,5,6,7,8,9,10]
put 'cb_recall', 'recall:user:2', 'content:1',[45,3,5,10,289,11,65,52,109,8]
put 'cb_recall', 'recall:user:2', 'online:2',[1,2,3,4,5,6,7,8,9,10]

在 Hive 中建立外部表,用于离线分析

1
2
3
4
5
6
7
8
9
10
11
create external table cb_recall_hbase
(
user_id STRING comment "userID",
als map<string, ARRAY<BIGINT>> comment "als recall",
content map<string, ARRAY<BIGINT>> comment "content recall",
online map<string, ARRAY<BIGINT>> comment "online recall"
)
COMMENT "user recall table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,als:,content:,online:")
TBLPROPERTIES ("hbase.table.name" = "cb_recall");

接着,在 Hbase 中创建历史召回结果表,用于过滤已历史召回结果,避免重复推荐。这里同样设置了多个版本来存储多次历史召回结果,设置了生存期来清除很长时间以前的历史召回结果

1
2
3
4
5
6
create 'history_recall', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999}

# 插入示例
put 'history_recall', 'recall:user:5', 'als:1',[1,2,3]
put 'history_recall', 'recall:user:5', 'als:1',[4,5,6,7]
put 'history_recall', 'recall:user:5', 'als:1',[8,9,10]

ALS 原理

我们先简单了解一下 ALS 模型,上图为用户和物品的关系矩阵,其中,每一行代表一个用户,每一列代表一个物品。蓝色元素代表用户查看过该物品,灰色元素代表用户未查看过该物品,假设有 m 个用户,n 个物品,为了得到用户对物品的评分,我们可以利用矩阵分解将原本较大的稀疏矩阵拆分成两个较小的稠密矩阵,即 m x k 维的用户隐含矩阵和 k x n 维的物品隐含矩阵,如下所示:

其中,用户矩阵的每一行就包括了影响用户偏好的 k 个隐含因子,物品矩阵的每一列就包括了影响物品内容的 k 个隐含因子。这里用户矩阵和物品矩阵中每个隐含因子的值就是利用交替最小二乘(Alternating Least Squares,ALS)优化算法计算而得的,所以叫做 ALS 模型。接下来,再将用户矩阵和物品矩阵相乘即可得到用户对物品的 m x n 维的评分矩阵,其中就包括每一个用户对每一个物品的评分了,进而可以根据评分进行推荐。

ALS 模型训练和预测

Spark 已经实现了 ALS 模型,我们可以直接调用。首先,我们读取用户历史点击行为,构造训练集数据,其中只需要包括用户 ID、文章 ID 以及是否点击

1
2
spark.sql('use profile')
user_article_basic = spark.sql("select user_id, article_id, clicked from user_article_basic")

user_article_basic 结果如下所示,其中 clicked 表示用户对文章是否发生过点击

我们需要将 clicked 由 boolean 类型转成 int 类型,即 true 为 1,false 为 0

1
2
3
4
def convert_boolean_int(row):
return row.user_id, row.article_id, int(row.clicked)

user_article_basic = user_article_basic.rdd.map(convert_boolean_int).toDF(['user_id', 'article_id', 'clicked'])

user_article_basic 结果如下所示,clicked 已经是 int 类型

另外,Spark 的 ALS 模型还要求输入的用户 ID 和文章 ID 必须是从 1 开始递增的连续数字,所以需要利用 Spark 的 PipelineStringIndexer,将用户 ID 和文章 ID 建立从 1 开始递增的索引

1
2
3
4
5
6
7
8
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

user_indexer = StringIndexer(inputCol='user_id', outputCol='als_user_id')
article_indexer = StringIndexer(inputCol='article_id', outputCol='als_article_id')
pip = Pipeline(stages=[user_indexer, article_indexer])
pip_model = pip.fit(user_article_basic)
als_user_article = pip_model.transform(user_article_basic)

als_user_article 结果如下所示,als_user_id 和 als_article_id 即是 ALS 模型所需的用户索引和文章索引

接下来,将用户行为数据中的 als_user_id, als_article_id, clicked 三列作为训练集,对 ALS 模型进行训练,并利用 ALS 模型计算用户对文章的偏好得分,这里可以指定为每个用户保留偏好得分最高的 K 篇文章

1
2
3
4
5
6
7
from pyspark.ml.recommendation import ALS

top_k = 100
als = ALS(userCol='als_user_id', itemCol='als_article_id', ratingCol='clicked')
als_model = als.fit(als_user_article)

recall_res = als_model.recommendForAllUsers(top_k)

recall_res 结果如下所示,其中,als_user_id 为用户索引,recommendations 为每个用户的推荐列表,包括文章索引和偏好得分,如 [[255,0.1], [10,0.08], …]

预测结果处理

接着,我们要将推荐结果中的用户索引和文章索引还原为用户 ID 和文章 ID,这就需要建立用户 ID 与用户索引的映射及文章 ID 与文章索引的映射,可以将前面包含用户索引和文章索引的用户行为数据 als_user_article 分别按照 user_id 和 article_id 分组,即可得到用户 ID 与用户索引的映射以及文章 ID 与文章索引的映射

1
2
user_real_index = als_user_article.groupBy(['user_id']).max('als_user_id').withColumnRenamed('max(als_user_id)', 'als_user_id')
article_real_index = als_user_article.groupBy(['article_id']).max('als_article_id').withColumnRenamed('max(als_article_id)', 'als_article_id')

user_real_index 结果如下所示,即用户 ID 与用户索引的映射

再利用 als_user_idrecall_resuser_real_index 进行连接,加入用户 ID

1
recall_res = recall_res.join(user_real_index, on=['als_user_id'], how='left').select(['als_user_id', 'recommendations', 'user_id'])

recall_res 结果如下所示,得到用户索引,推荐列表和用户 ID

存储

接下来,我们要构建出用户和文章的关系,利用 explode() 方法将 recommendations 中的每篇文章都转换为单独的一条记录,并只保留用户 ID 和文章索引这两列数据

1
2
3
import pyspark.sql.functions as F

recall_res = recall_res.withColumn('als_article_id', F.explode('recommendations')).drop('recommendations').select(['user_id', 'als_article_id'])

recall_res 结果如下所示,als_article_id 包括文章索引和偏好得分

我们将 als_article_id 中的偏好得分去除,只保留文章索引

1
2
3
4
def get_article_index(row):
return row.user_id, row.als_article_id[0]

recall_res = recall_res.rdd.map(get_article_index).toDF(['user_id', 'als_article_id'])

recall_res 结果如下所示,得到用户 ID 和文章索引

之前我们将文章 ID 和文章索引保存到了 article_real_index,这里利用 als_article_id 将 recall_res 和 article_real_index 进行连接,得到文章 ID

1
recall_res = recall_res.join(article_real_index, on=['als_article_id'], how='left').select(['user_id', 'article_id'])

recall_res 结果如下所示,得到用户 ID 和要向其推荐的文章 ID

推荐结果存储

为了方便查询,我们需要将推荐结果按频道分别进行存储。首先,读取文章完整信息,得到频道 ID

1
2
spark.sql('use article')
article_data = spark.sql("select article_id, channel_id from article_data")

利用 article_id 将 recall_resarticle_data 进行连接,在推荐结果中加入频道 ID

1
recall_res = recall_res.join(article_data, on=['article_id'], how='left')

recall_res 结果如下所示,推荐结果加入了频道 ID

将推荐结果按照 user_id 和 channel_id 进行分组,利用 collect_list() 方法将文章 ID 合并为文章列表

1
recall_res = recall_res.groupBy(['user_id', 'channel_id']).agg(F.collect_list('article_id')).withColumnRenamed('collect_list(article_id)', 'article_list')

recall_res 结果如下所示,article_list 为某用户在某频道下的推荐文章列表

最后,将推荐结果按频道分别存入召回结果表 cb_recall 及历史召回结果表 history_recall。注意,在保存新的召回结果之前需要根据历史召回结果进行过滤,防止重复推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
recall_res = recall_res.dropna()
recall_res.foreachPartition(save_offline_recall_hbase)

def save_offline_recall_hbase(partition):
"""ALS模型离线召回结果存储
"""
import happybase
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
for row in partition:
with pool.connection() as conn:
# 读取历史召回结果表
history_table = conn.table('history_recall')
# 读取包含多个版本的历史召回结果
history_article_data = history_table.cells('reco:his:{}'.format(row.user_id).encode(),
'channel:{}'.format(row.channel_id).encode())

# 合并多个版本历史召回结果
history_article = [](比如有的用户会比较怀旧)
if len(history_article_data) >= 2:
for article in history_article_data[:-1]:
history_article.extend(eval(article))
else:
history_article = []

# 过滤history_article
recall_article = list(set(row.article_list) - set(history_article))

if recall_article:
table = conn.table('cb_recall')
table.put('recall:user:{}'.format(row.user_id).encode(), {'als:{}'.format(row.channel_id).encode(): str(recall_article).encode()})
history_table.put("reco:his:{}".format(row.user_id).encode(), {'channel:{}'.format(row.channel_id): str(recall_article).encode()})
conn.close()

可以根据用户 ID 和频道 ID 来查询召回结果

1
2
3
hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN CELL
als:13 timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]

Apscheduler 定时更新

在用户召回方法 update_user_recall() 中,增加基于模型的离线召回方法 update_content_recall(),首先读取用户行为日志,进行数据预处理,构建训练集,接着对 ALS 模型进行训练和预测,最后对预测出的推荐结果进行解析并按频道分别存入召回结果表和历史召回结果表

1
2
3
4
5
6
7
def update_user_recall():
"""
用户的频道推荐召回结果更新逻辑
:return:
"""
ur = UpdateRecall(500)
ur.update_als_recall()

添加定时更新用户召回结果的任务,每隔 3 小时运行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 创建scheduler,多进程执行
executors = {
'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
# 添加一个定时运行用户召回更新的任务,每隔3小时运行一次
scheduler.add_job(update_user_recall, trigger='interval', hours=3)
# 添加一个定时运行特征中心平台的任务,每隔4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)

scheduler.start()

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)


【技术服务】,详情点击查看: https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg


扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!