在上篇文章中,我们已经完成了离线文章画像的构建,接下来,我们要为相似文章推荐做准备,那就是计算文章之间的相似度。首先,我们要计算出文章的词向量,然后利用文章的词向量来计算文章的相似度。

计算文章词向量

我们可以通过大量的历史文章数据,训练文章中每个词的词向量,由于文章数据过多,通常是分频道进行词向量训练,即每个频道训练一个词向量模型,我们包括的频道如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
channel_info = {
1: "html",
2: "开发者资讯",
3: "ios",
4: "c++",
5: "android",
6: "css",
7: "数据库",
8: "区块链",
9: "go",
10: "产品",
11: "后端",
12: "linux",
13: "人工智能",
14: "php",
15: "javascript",
16: "架构",
17: "前端",
18: "python",
19: "java",
20: "算法",
21: "面试",
22: "科技动态",
23: "js",
24: "设计",
25: "数码产品",
}

接下来,分别对各自频道内的文章进行分词处理,这里先选取 18 号频道内的所有文章,进行分词处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
spark.sql("use article")
article_data = spark.sql("select * from article_data where channel_id=18")
words_df = article_data.rdd.mapPartitions(segmentation).toDF(['article_id', 'channel_id', 'words'])

def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs

abspath = "/root/words"

# 结巴加载用户词典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)

# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")

def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list

# 所有的停用词列表
stopwords_list = get_stopwords_list()

# 分词
def cut_sentence(sentence):
"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词"""
# eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一个词语或者是英文单词
filtered_words_list.append(seg.word)
return filtered_words_list

for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替换掉标签数据
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words

words_df 结果如下所示,words 为分词后的词语列表

接着,使用分词后的所有词语,对 Word2Vec 模型进行训练并将模型保存到 HDFS,其中 vectorSize 为词向量的长度,minCount 为词语的最小出现次数,windowSize 为训练窗口的大小,inputCol 为输入的列名,outputCol 为输出的列名

1
2
3
4
5
from pyspark.ml.feature import Word2Vec

w2v_model = Word2Vec(vectorSize=100, inputCol='words', outputCol='vector', minCount=3)
model = w2v_model.fit(words_df)
model.save("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")

加载训练好的 Word2Vec 模型

1
2
3
4
from pyspark.ml.feature import Word2VecModel

w2v_model = Word2VecModel.load("hdfs://hadoop-master:9000/headlines/models/word2vec_model/channel_18_python.word2vec")
vectors = w2v_model.getVectors()

vectors 结果如下所示,其中 vector 是训练后的每个词的 100 维词向量,是 vector 类型格式的,如 [0.2 -0.05 -0.1 …]

这里,我们计算出了所有词语的词向量,接下来,还要得到关键词的词向量,因为我们需要通过关键词的词向量来计算文章的词向量。那么,首先通过读取频道内的文章画像来得到关键词(实际场景应该只读取新增文章画像)

1
article_profile = spark.sql("select * from article_profile where channel_id=18")

在文章画像表中,关键词和权重是存储在同一列的,我们可以利用 LATERAL VIEW explode() 方法,将 map 类型的 keywords 列中的关键词和权重转换成单独的两列数据

1
2
article_profile.registerTempTable('profile')
keyword_weight = spark.sql("select article_id, channel_id, keyword, weight from profile LATERAL VIEW explode(keywords) AS keyword, weight")

keyword_weight 结果如下所示,keyword 为关键词,weight 为对应的权重

这时就可以利用关键词 keyword 列,将文章关键词 keyword_weight 与词向量结果 vectors 进行内连接,从而得到每个关键词的词向量

1
keywords_vector = keyword_weight.join(vectors, vectors.word==keyword_weight.keyword, 'inner')

keywords_vector 结果如下所示,vector 即对应关键词的 100 维词向量

接下来,将文章每个关键词的词向量加入权重信息,这里使每个关键词的词向量 = 关键词的权重 x 关键词的词向量,即 weight_vector = weight x vector,注意这里的 vector 为 vector 类型,所以 weight x vector 是权重和向量的每个元素相乘,向量的长度保持不变

1
2
3
4
def compute_vector(row):
return row.article_id, row.channel_id, row.keyword, row.weight * row.vector

article_keyword_vectors = keywords_vector.rdd.map(compute_vector).toDF(["article_id", "channel_id", "keyword", "weightingVector"])

article_keyword_vectors 结果如下所示,weightingVector 即为加入权重信息后的关键词的词向量

再将上面的结果按照 article_id 进行分组,利用 collect_set() 方法,将一篇文章内所有关键词的词向量合并为一个列表

1
2
article_keyword_vectors.registerTempTable('temptable')
article_keyword_vectors = spark.sql("select article_id, min(channel_id) channel_id, collect_set(weightingVector) vectors from temptable group by article_id")

article_keyword_vectors 结果如下所示,vectors 即为文章内所有关键词向量的列表,如 [[0.6 0.2 …], [0.1 -0.07 …], …]

接下来,利用上面得出的二维列表,计算每篇文章内所有关键词的词向量的平均值,作为文章的词向量。注意,这里的 vectors 是包含多个词向量的列表,词向量列表的平均值等于其中每个词向量的对应元素相加再除以词向量的个数

1
2
3
4
5
6
7
8
def compute_avg_vectors(row):
x = 0
for i in row.vectors:
x += i
# 求平均值
return row.article_id, row.channel_id, x / len(row.vectors)

article_vector = article_keyword_vectors.rdd.map(compute_avg_vectors).toDF(['article_id', 'channel_id', 'vector'])

article_vector 结果如下所示

此时,article_vector 中的 vector 列还是 vector 类型,而 Hive 不支持该数据类型,所以需要将 vector 类型转成 array 类型(list)

1
2
3
4
def to_list(row):
return row.article_id, row.channel_id, [float(i) for i in row.vector.toArray()]

article_vector = article_vector.rdd.map(to_list).toDF(['article_id', 'channel_id', 'vector'])

在 Hive 中创建文章词向量表 article_vector

1
2
3
4
5
6
CREATE TABLE article_vector
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
articlevector ARRAY comment "keyword"
);

最后,将 18 号频道内的所有文章的词向量存储到 Hive 的文章词向量表 article_vector 中

1
article_vector.write.insertInto("article_vector")

这样,我们就计算出了 18 号频道下每篇文章的词向量,在实际场景中,我们还要分别计算出其他所有频道下每篇文章的词向量。

计算文章相似度

前面我们计算出了文章的词向量,接下来就可以根据文章的词向量来计算文章的相似度了。通常我们会有几百万、几千万甚至上亿规模的文章数据,为了优化计算性能,我们可以只计算每个频道内文章之间的相似度,因为通常只有相同频道的文章关联性较高,而不同频道之间的文章通常关联性较低。在每个频道内,我们还可以用聚类或局部敏感哈希对文章进行分桶,将文章相似度的计算限制在更小的范围,只计算相同分类内或相同桶内的文章相似度。

  • 聚类(Clustering),对每个频道内的文章进行聚类,可以使用 KMeans 算法,需要提前设定好类别个数 K,聚类算法的时间复杂度并不小,也可以使用一些优化的聚类算法,比如二分聚类、层次聚类等。但通常聚类算法也比较耗时,所以通常被使用更多的是局部敏感哈希。

Spark 的 BisectingKMeans 模型训练代码示例

1
2
3
4
5
from pyspark.ml.clustering import BisectingKMeans

bkmeans = BisectingKMeans(k=100, minDivisibleClusterSize=50, featuresCol="articlevector", predictionCol='group')
bkmeans_model = bkmeans.fit(article_vector)
bkmeans_model.save("hdfs://hadoop-master:9000/headlines/models/articleBisKmeans/channel_%d_%s.bkmeans" % (channel_id, channel))

  • 局部敏感哈希 LSH(Locality Sensitive Hashing),LSH 算法是基于一个假设,如果两个文本在原有的数据空间是相似的,那么经过哈希函数转换以后,它们仍然具有很高的相似度,即越相似的文本在哈希之后,落到相同的桶内的概率就越高。所以,我们只需要将目标文章进行哈希映射并得到其桶号,然后取出该桶内的所有文章,再进行线性匹配即可查找到与目标文章相邻的文章。其实 LSH 并不能保证一定能够查找到与目标文章最相邻的文章,而是在减少需要匹配的文章个数的同时,保证查找到最近邻的文章的概率很大。

下面我们将使用 LSH 模型来计算文章相似度,首先,读取 18 号频道内所有文章的 ID 和词向量作为训练集

1
2
article_vector = spark.sql("select article_id, articlevector from article_vector where channel_id=18")
train = articlevector.select(['article_id', 'articlevector'])

文章词向量表中的词向量是被存储为 array 类型的,我们利用 Spark 的 Vectors.dense() 方法,将 array 类型(list)转为 vector 类型

1
2
3
4
5
6
from pyspark.ml.linalg import Vectors

def list_to_vector(row):
return row.article_id, Vectors.dense(row.articlevector)

train = train.rdd.map(list_to_vector).toDF(['article_id', 'articlevector'])

使用训练集 train 对 Spark 的 BucketedRandomProjectionLSH 模型进行训练,其中 inputCol 为输入特征列,outputCol 为输出特征列,numHashTables 为哈希表数量,bucketLength 为桶的数量,数量越多,相同数据进入到同一个桶的概率就越高

1
2
3
4
from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(inputCol='articlevector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)

训练好模型后,调用 approxSimilarityJoin() 方法即可计算数据之间的相似度,如 model.approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance') 就是利用欧几里得距离作为相似度,计算在 df1 与 df2 每条数据的相似度,这里我们计算训练集中所有文章之间的相似度

1
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

similar 结果如下所示,EuclideanDistance 就是两篇文章的欧几里得距离,即相似度

在后面的推荐流程中,会经常查询文章相似度,所以出于性能考虑,我们选择将文章相似度结果存储到 Hbase 中。首先创建文章相似度表

1
create 'article_similar', 'similar'

然后存储文章相似度结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def save_hbase(partition):
import happybase
pool = happybase.ConnectionPool(size=3, host='hadoop-master')

with pool.connection() as conn:
# 建立表连接
table = conn.table('article_similar')
for row in partition:
if row.datasetA.article_id != row.datasetB.article_id:
table.put(str(row.datasetA.article_id).encode(), {"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})

# 手动关闭所有的连接
conn.close()

similar.foreachPartition(save_hbase)

Apscheduler 定时更新

将文章相似度计算加入到文章画像更新方法中,首先合并最近一个小时的文章完整信息,接着计算 TF-IDF 和 TextRank 权重,并根据 TF-IDF 和 TextRank 权重计算得出关键词和主题词,最后计算文章的词向量及文章的相似度

1
2
3
4
5
6
7
8
9
10
11
def update_article_profile():
"""
定时更新文章画像及文章相似度
:return:
"""
ua = UpdateArticle()
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():
textrank_keywords_df, keywordsIndex = ua.generate_article_label()
article_profile = ua.get_article_profile(textrank_keywords_df, keywordsIndex)
ua.compute_article_similar(article_profile)

原文出自(已授权):https://www.jianshu.com/u/ac833cc5146e

参考


【技术服务】,详情点击查看: https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg


扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!