在上篇文章中,我们实现了基于模型的离线召回,属于基于协同过滤的召回算法。接下来,本文就讲一下另一个经典的召回方式,那就是如何实现基于内容的离线召回。相比于协同过滤来说,基于内容的召回会简单很多,主要思路就是召回用户点击过的文章的相似文章,通常也被叫做 u2i2i。

离线召回

首先,读取用户历史行为数据,得到用户历史点击过的文章

1
2
3
spark.sql('use profile')
user_article_basic = spark.sql("select * from user_article_basic")
user_article_basic = user_article_basic.filter('clicked=True')

user_article_basic 结果如下所示

接下来,遍历用户历史点击过的文章,获取与之相似度最高的 K 篇文章即可。可以根据之前计算好的文章相似度表 article_similar 进行相似文章查询,接着根据历史召回结果进行过滤,防止重复推荐。最后将召回结果按照频道分别存入召回结果表及历史召回结果表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
user_article_basic.foreachPartition(get_clicked_similar_article)

def get_clicked_similar_article(partition):
"""召回用户点击文章的相似文章
"""
import happybase
pool = happybase.ConnectionPool(size=10, host='hadoop-master')

with pool.connection() as conn:
similar_table = conn.table('article_similar')
for row in partition:
# 读取文章相似度表,根据文章ID获取相似文章
similar_article = similar_table.row(str(row.article_id).encode(),
columns=[b'similar'])
# 按照相似度进行排序
similar_article_sorted = sorted(similar_article.items(), key=lambda item: item[1], reverse=True)
if similar_article_sorted:
# 每次行为推荐10篇文章
similar_article_topk = [int(i[0].split(b':')[1]) for i in similar_article_sorted][:10]

# 根据历史召回结果进行过滤
history_table = conn.table('history_recall')
history_article_data = history_table.cells('reco:his:{}'.format(row.user_id).encode(), 'channel:{}'.format(row.channel_id).encode())
# 将多个版本都加入历史文章ID列表
history_article = []
if len(history_article_data) >= 2:
for article in history_article_data[:-1]:
history_article.extend(eval(article))
else:
history_article = []

# 过滤history_article
recall_article = list(set(similar_article_topk) - set(history_article))

# 存储到召回结果表及历史召回结果表
if recall_article:
content_table = conn.table('cb_recall')
content_table.put("recall:user:{}".format(row.user_id).encode(), {'content:{}'.format(row.channel_id).encode(): str(recall_article).encode()})

# 放入历史召回结果表
history_table.put("reco:his:{}".format(row.user_id).encode(), {'channel:{}'.format(row.channel_id).encode(): str(recall_article).encode()})

可以根据用户 ID 和频道 ID 来查询召回结果

1
2
3
hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN CELL
content:13 timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]

最后,使用 Apscheduler 定时更新。在用户召回方法 update_user_recall() 中,增加基于内容的离线召回方法 update_content_recall(),首先读取用户行为日志,并筛选用户点击的文章,接着读取文章相似表,获取相似度最高的 K 篇文章,然后根据历史召回结果进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表

1
2
3
4
5
6
7
8
def update_user_recall():
"""
用户的频道推荐召回结果更新逻辑
:return:
"""
ur = UpdateRecall(500)
ur.update_als_recall()
ur.update_content_recall()

之前已经添加好了定时更新用户召回结果的任务,每隔 3 小时运行一次,这样就完成了基于内容的离线召回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 创建scheduler,多进程执行
executors = {
'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一个定时运行文章画像更新的任务, 每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)
# 添加一个定时运行用户画像更新的任务, 每隔2个小时运行一次
scheduler.add_job(update_user_profile, trigger='interval', hours=2)
# 添加一个定时运行用户召回更新的任务,每隔3小时运行一次
scheduler.add_job(update_user_recall, trigger='interval', hours=3)
# 添加一个定时运行特征中心平台的任务,每隔4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)

scheduler.start()

在线召回

前面我们实现了基于内容的离线召回,接下来我们将实现基于内容的在线召回。在线召回的实时性更好,能够根据用户的线上行为实时反馈,快速跟踪用户的偏好,也能够解决用户冷启动问题。离线召回和在线召回唯一的不同就是,离线召回读取的是用户历史行为数据,而在线召回读取的是用户实时的行为数据,从而召回用户当前正在阅读的文章的相似文章。

首先,我们通过 Spark Streaming 读取 Kafka 中的用户实时行为数据,Spark Streaming 配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from setting.default import DefaultConfig
import happybase

SPARK_ONLINE_CONFIG = (
("spark.app.name", "onlineUpdate"),
("spark.master", "yarn"),
("spark.executor.instances", 4)
)

KAFKA_SERVER = "192.168.19.137:9092"

# 用于读取hbase缓存结果配置
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
conf = SparkConf()
conf.setAll(SPARK_ONLINE_CONFIG)
sc = SparkContext(conf=conf)
stream_c = StreamingContext(sc, 60)

# 基于内容召回配置,用于收集用户行为
similar_kafkaParams = {"metadata.broker.list": DefaultConfig.KAFKA_SERVER, "group.id": 'similar'}
SIMILAR_DS = KafkaUtils.createDirectStream(stream_c, ['click-trace'], similar_kafkaParams)

Kafka 中的用户行为数据,如下所示

1
{"actionTime":"2019-12-10 21:04:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "116644", "algorithmCombine": "C2"}}

接下来,利用 Spark Streaming 将用户行为数据传入到 get_similar_online_recall() 方法中,这里利用 json.loads() 方法先将其转换为了 json 格式,注意用户行为数据在每条 Kafka 消息的第二个位置

1
SIMILAR_DS.map(lambda x: json.loads(x[1])).foreachRDD(get_similar_online_recall)

接着,遍历用户行为数据,这里可能每次读取到多条用户行为数据。筛选出被点击、收藏或分享过的文章,并获取与其相似度最高的 K 篇文章,再根据历史召回结果表进行过滤,防止重复推荐,最后,按频道分别存入召回结果表及历史召回结果表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def get_online_similar_recall(rdd):
"""
获取在线相似文章
:param rdd:
:return:
"""
import happybase

topk = 10
# 初始化happybase连接
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
for data in rdd.collect():

# 根据用户行为筛选文章
if data['param']['action'] in ["click", "collect", "share"]:
with pool.connection() as conn:
similar_table = conn.table("article_similar")

# 根据用户行为数据涉及文章找出与之最相似文章(基于内容的相似)
similar_article = similar_table.row(str(data["param"]["articleId"]).encode(), columns=[b"similar"])
similar_article = sorted(similar_article.items(), key=lambda x: x[1], reverse=True) # 按相似度排序

if similar_article:
similar_article_topk = [int(i[0].split(b":")[1]) for i in similar_article[:topk]] # 选取K篇作为召回推荐结果

# 根据历史召回结果进行过滤
history_table = conn.table('history_recall')
history_article_data = history_table.cells(b"reco:his:%s" % data["param"]["userId"].encode(), b"channel:%d" % data["channelId"])
# 将多个版本都加入历史文章ID列表
history_article = []
if len(history_article_data) >1:
for article in history_article_data[:-1]:
history_article.extend(eval(article))
else:
history_article = []

# 过滤history_article
recall_article = list(set(similar_article_topk) - set(history_article))

# 如果有召回结果,按频道分别存入召回结果表及历史召回结果表
if recall_article:
recall_table = conn.table("cb_recall")
recall_table.put(b"recall:user:%s" % data["param"]["userId"].encode(), {b"online:%d" % data["channelId"]: str(recall_article).encode()})
history_table.put(b"reco:his:%s" % data["param"]["userId"].encode(), {b"channel:%d" % data["channelId"]: str(recall_article).encode()})

conn.close()

可以根据用户 ID 和频道 ID 来查询召回结果

1
2
3
hbase(main):028:0> get 'cb_recall', 'recall:user:2'
COLUMN CELL
online:13 timestamp=1558041569201, value=[141431,14381, 17966, 17454, 14125, 16174]

创建 online_update.py,加入基于内容的在线召回逻辑

1
2
3
4
5
6
7
8
9
10
if __name__ == '__main__':
ore = OnlineRecall()
ore.update_content_recall()
stream_sc.start()
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
pass

利用 Supervisor 进行进程管理,并开启实时运行,配置如下,其中 environment 需要指定运行所需环境

1
2
3
4
5
6
7
8
9
10
11
12
[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_SUBMIT_ARGS='--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.2 pyspark-shell'
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/online/online_update.py
directory=/root/toutiao_project/reco_sys/online
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/onlinesuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true

参考

https://www.bilibili.com/video/av68356229
https://pan.baidu.com/s/1-uvGJ-mEskjhtaial0Xmgw(学习资源已保存至网盘, 提取码:eakp)


【技术服务】,详情点击查看: https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg


扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!