在上述步骤中,我们已经将业务数据和用户行为数据同步到了推荐系统数据库当中,接下来,我们就要对文章数据和用户数据进行分析,构建文章画像和用户画像,本文我们主要讲解如何构建文章画像。文章画像由关键词和主题词组成,我们将每个词的 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重,筛选出权重最高的 K 个词作为关键词,将 TextRank 权重最高的 K 个词与 TF-IDF 权重最高的 K 个词的共现词作为主题词。

首先,在 Hive 中创建文章数据库 article 及相关表,其中表 article_data 用于存储完整的文章信息,表 idf_keywords_values 用于存储关键词和索引信息,表 tfidf_keywords_values 用于存储关键词和 TF-IDF 权重信息,表 textrank_keywords_values 用于存储关键词和 TextRank 权重信息,表 article_profile 用于存储文章画像信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-- 创建文章数据库
create database if not exists article comment "artcile information" location '/user/hive/warehouse/article.db/';
-- 创建文章信息表
CREATE TABLE article_data
(
article_id BIGINT comment "article_id",
channel_id INT comment "channel_id",
channel_name STRING comment "channel_name",
title STRING comment "title",
content STRING comment "content",
sentence STRING comment "sentence"
)
COMMENT "toutiao news_channel"
LOCATION '/user/hive/warehouse/article.db/article_data';
-- 创建关键词索引信息表
CREATE TABLE idf_keywords_values
(
keyword STRING comment "article_id",
idf DOUBLE comment "idf",
index INT comment "index"
);
-- 创建关键词TF-IDF权重信息表
CREATE TABLE tfidf_keywords_values
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword STRING comment "keyword",
tfidf DOUBLE comment "tfidf"
);
-- 创建关键词TextRank权重信息表
CREATE TABLE textrank_keywords_values
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword STRING comment "keyword",
textrank DOUBLE comment "textrank"
);
-- 创建文章画像信息表
CREATE TABLE article_profile
(
article_id INT comment "article_id",
channel_id INT comment "channel_id",
keyword map comment "keyword",
topics array comment "topics"
);

计算文章完整信息

为了计算文章画像,需要将文章信息表(news_article_basic)、文章内容表(news_article_content)及频道表(news_channel)进行合并,从而得到完整的文章信息,通常使用 Spark SQL 进行处理。

通过关联表 news_article_basic, news_article_content 和 news_channel 获得文章完整信息,包括 article_id, channel_id, channel_name, title, content,这里获取一个小时内的文章信息。

1
2
3
4
5
6
7
8
9
10
11
spark.sql("use toutiao")
_now = datetime.today().replace(minute=0, second=0, microsecond=0)
start = datetime.strftime(_now + timedelta(days=0, hours=-1, minutes=0), "%Y-%m-%d %H:%M:%S")
end = datetime.strftime(_now, "%Y-%m-%d %H:%M:%S")
basic_content = spark.sql(
"select a.article_id, a.channel_id, a.title, b.content from news_article_basic a "
"inner join news_article_content b on a.article_id=b.article_id where a.review_time >= '{}' "
"and a.review_time < '{}' and a.status = 2".format(start, end))
basic_content.registerTempTable("temp_article")
channel_basic_content = spark.sql(
"select t.*, n.channel_name from temp_article t left join news_channel n on t.channel_id=n.channel_id")

channel_basic_content 结果如下所示

利用 concat_ws() 方法,将 channel_name, title, content 这 3 列数据合并为一列 sentence,并将结果写入文章完整信息表 article_data 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pyspark.sql.functions as F

spark.sql("use article")
sentence_df = channel_basic_content.select("article_id", "channel_id", "channel_name", "title", "content", \
F.concat_ws(
",",
channel_basic_content.channel_name,
channel_basic_content.title,
channel_basic_content.content
).alias("sentence")
)
del basic_content
del channel_basic_content
gc.collect() # 垃圾回收

sentence_df.write.insertInto("article_data")

sentence_df 结果如下所示,文章完整信息包括 article_id, channel_id, channel_name, title, content, sentence,其中 sentence 为 channel_name, title, content 合并而成的长文本内容

计算 TF-IDF

前面我们得到了文章的完整内容信息,接下来,我们要先对文章进行分词,然后计算每个词的 TF-IDF 权重,将 TF-IDF 权重最高的 K 个词作为文章的关键词。首先,读取文章信息

1
2
spark.sql("use article")
article_dataframe = spark.sql("select * from article_data")

利用 mapPartitions() 方法,对每篇文章进行分词,这里使用的是 jieba 分词器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
words_df = article_dataframe.rdd.mapPartitions(segmentation).toDF(["article_id", "channel_id", "words"])

def segmentation(partition):
import os
import re
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs

abspath = "/root/words"

# 结巴加载用户词典
userdict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userdict_path)

# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")

def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list

# 所有的停用词列表
stopwords_list = get_stopwords_list()

# 分词
def cut_sentence(sentence):
"""对切割之后的词语进行过滤,去除停用词,保留名词,英文和自定义词库中的词,长度大于2的词"""
seg_list = pseg.lcut(sentence)
seg_list = [i for i in seg_list if i.flag not in stopwords_list]
filtered_words_list = []
for seg in seg_list:
if len(seg.word) <= 1:
continue
elif seg.flag == "eng":
if len(seg.word) <= 2:
continue
else:
filtered_words_list.append(seg.word)
elif seg.flag.startswith("n"):
filtered_words_list.append(seg.word)
elif seg.flag in ["x", "eng"]: # 是自定一个词语或者是英文单词
filtered_words_list.append(seg.word)
return filtered_words_list

for row in partition:
sentence = re.sub("<.*?>", "", row.sentence) # 替换掉标签数据
words = cut_sentence(sentence)
yield row.article_id, row.channel_id, words

words_df 结果如下所示,words 为将 sentence 分词后的单词列表

使用分词结果对词频统计模型(CountVectorizer)进行词频统计训练,并将 CountVectorizer 模型保存到 HDFS 中

1
2
3
4
5
6
from pyspark.ml.feature import CountVectorizer
# vocabSize是总词汇的大小,minDF是文本中出现的最少次数
cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
# 训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/CV.model")

加载 CountVectorizer 模型,计算词频向量

1
2
3
4
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/CV.model")
# 得出词频向量结果
cv_result = cv_model.transform(words_df)

cv_result 结果如下所示,countFeatures 为词频向量,如 (986, [2, 4, …], [3.0, 5.0, …]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中分别出现 3 次和 5 次,

得到词频向量后,再利用逆文本频率模型( IDF ),根据词频向量进行 IDF 统计训练,并将 IDF 模型保存到 HDFS

1
2
3
4
from pyspark.ml.feature import IDF
idf = IDF(inputCol="countFeatures", outputCol="idfFeatures")
idf_model = idf.fit(cv_result)
idf_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/IDF.model")

我们已经分别计算了文章信息中每个词的 TF 和 IDF,这时就可以加载 CountVectorizer 模型和 IDF 模型,计算每个词的 TF-IDF

1
2
3
4
5
6
7
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/countVectorizerOfArticleWords.model")
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/IDFOfArticleWords.model")

cv_result = cv_model.transform(words_df)
tfidf_result = idf_model.transform(cv_result)

tfidf_result 结果如下所示,idfFeatures 为 TF-IDF 权重向量,如 (986, [2, 4, …], [0.3, 0.5, …]) 表示总词汇的大小为 986 个,索引为 2 和 4 的词在某篇文章中的 TF-IDF 值分别为 0.3 和 0.5

对文章的每个词都根据 TF-IDF 权重排序,保留 TF-IDF 权重最高的前 K 个词作为关键词

1
2
3
4
5
6
7
8
9
10
11
def sort_by_tfidf(partition):
TOPK = 20
for row in partition:
# 找到索引与IDF值并进行排序
_dict = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
_dict = sorted(_dict, key=lambda x: x[1], reverse=True)
result = _dict[:TOPK]
for word_index, tfidf in result:
yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)

keywords_by_tfidf = tfidf_result.rdd.mapPartitions(sort_by_tfidf).toDF(["article_id", "channel_id", "index", "weights"])

keywords_by_tfidf 结果如下所示,每篇文章保留了权重最高的 K 个单词,index 为单词索引,weights 为对应单词的 TF-IDF 权重

接下来,我们需要知道每个词的对应的 TF-IDF 值,可以利用 zip() 方法,将所有文章中的每个词及其 TF-IDF 权重组成字典,再加入索引列,由此得到每个词对应的 TF-IDF 值,将该结果保存到 idf_keywords_values 表

1
2
3
4
5
6
7
8
9
10
11
12
keywords_list_with_idf = list(zip(cv_model.vocabulary, idf_model.idf.toArray()))
def append_index(data):
for index in range(len(data)):
data[index] = list(data[index]) # 将元组转为list
data[index].append(index) # 加入索引
data[index][1] = float(data[index][1])
append_index(keywords_list_with_idf)
sc = spark.sparkContext
rdd = sc.parallelize(keywords_list_with_idf) # 创建rdd
idf_keywords = rdd.toDF(["keywords", "idf", "index"])

idf_keywords.write.insertInto('idf_keywords_values')

idf_keywords 结果如下所示,包含了所有单词的名称、TF-IDF 权重及索引

通过 index 列,将 keywords_by_tfidf 与表 idf_keywords_values 进行连接,选取文章 ID、频道 ID、关键词、TF-IDF 权重作为结果,并保存到 TF-IDF 关键词表 tfidf_keywords_values

1
2
3
keywords_index = spark.sql("select keyword, index idx from idf_keywords_values")
keywords_result = keywords_by_tfidf.join(keywords_index, keywords_index.idx == keywords_by_tfidf.index).select(["article_id", "channel_id", "keyword", "weights"])
keywords_result.write.insertInto("tfidf_keywords_values")

keywords_result 结果如下所示,keyword 和 weights 即为所有词在每个文章中的 TF-IDF 权重

计算 TextRank

前面我们已经计算好了每个词的 TF-IDF 权重,为了计算关键词,还需要得到每个词的 TextRank 权重,接下来,还是先读取文章完整信息

1
2
spark.sql("use article")
article_dataframe = spark.sql("select * from article_data")

对文章 sentence 列的内容进行分词,计算每个词的 TextRank 权重,并将每篇文章 TextRank 权重最高的 K 个词保存到 TextRank 结果表 textrank_keywords_values

1
2
3
textrank_keywords_df = article_dataframe.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id", "keyword", "textrank"])

textrank_keywords_df.write.insertInto("textrank_keywords_values")

TextRank 计算细节:分词后只保留指定词性的词,滑动截取长度为 K 的窗口,计算窗口内的各个词的投票数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def textrank(partition):
import os
import jieba
import jieba.analyse
import jieba.posseg as pseg
import codecs

abspath = "/root/words"

# 结巴加载用户词典
userDict_path = os.path.join(abspath, "ITKeywords.txt")
jieba.load_userdict(userDict_path)

# 停用词文本
stopwords_path = os.path.join(abspath, "stopwords.txt")

def get_stopwords_list():
"""返回stopwords列表"""
stopwords_list = [i.strip() for i in codecs.open(stopwords_path).readlines()]
return stopwords_list

# 所有的停用词列表
stopwords_list = get_stopwords_list()

class TextRank(jieba.analyse.TextRank):
def __init__(self, window=20, word_min_len=2):
super(TextRank, self).__init__()
self.span = window # 窗口大小
self.word_min_len = word_min_len # 单词的最小长度
# 要保留的词性,根据jieba github ,具体参见https://github.com/baidu/lac
self.pos_filt = frozenset(
('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', "nw", "nz", "PER", "LOC", "ORG"))

def pairfilter(self, wp):
"""过滤条件,返回True或者False"""

if wp.flag == "eng":
if len(wp.word) <= 2:
return False

if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len \
and wp.word.lower() not in stopwords_list:
return True
# TextRank过滤窗口大小为5,单词最小为2
textrank_model = TextRank(window=5, word_min_len=2)
allowPOS = ('n', "x", 'eng', 'nr', 'ns', 'nt', "nw", "nz", "c")

for row in partition:
tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS=allowPOS, withFlag=False)
for tag in tags:
yield row.article_id, row.channel_id, tag[0], tag[1]

textrank_keywords_df 结果如下所示,keyword 和 textrank 即为每个单词在文章中的 TextRank 权重

画像计算

我们计算出 TF-IDF 和 TextRank 后,就可以计算关键词和主题词了,读取 TF-IDF 权重

1
idf_keywords_values = oa.spark.sql("select * from idf_keywords_values")

读取 TextRank 权重

1
textrank_keywords_values = oa.spark.sql("select * from textrank_keywords_values")

通过 keyword 关联 TF-IDF 权重和 TextRank 权重

1
keywords_res = textrank_keywords_values.join(idf_keywords_values, on=['keyword'], how='left')

计算 TF-IDF 权重和 TextRank 权重的乘积作为关键词权重

1
keywords_weights = keywords_res.withColumn('weights', keywords_res.textrank * keywords_res.idf).select(["article_id", "channel_id", "keyword", "weights"])

keywords_weights 结果如下所示

这里,我们需要将相同文章的词都合并到一条记录中,将 keywords_weights 按照 article_id 分组,并利用 collect_list() 方法,分别将关键词和权重合并为列表

1
2
3
keywords_weights.registerTempTable('temp')

keywords_weights = spark.sql("select article_id, min(channel_id) channel_id, collect_list(keyword) keywords, collect_list(weights) weights from temp group by article_id")`

keywords_weights 结果如下所示,keywords 为每篇文章的关键词列表,weights 为关键词对应的权重列表

为了方便查询,我们需要将关键词和权重合并为一列,并存储为 map 类型,这里利用 dict()zip() 方法,将每个关键词及其权重组合成字典

1
2
3
4
def to_map(row):
return row.article_id, row.channel_id, dict(zip(row.keywords, row.weights))

article_keywords = keywords_weights.rdd.map(to_map).toDF(['article_id', 'channel_id', 'keywords'])

article_keywords 结果如下所示,keywords 即为每篇文章的关键词和对应权重

前面我们计算完了关键词,接下来我们将 TF-IDF 和 TextRank 的共现词作为主题词,将 TF-IDF 权重表 tfidf_keywords_values 和 TextRank 权重表 textrank_keywords_values 进行关联,并利用 collect_set() 对结果进行去重,即可得到 TF-IDF 和 TextRank 的共现词,即主题词

1
2
3
4
5
6
7
8
topic_sql = """
select t.article_id article_id2, collect_set(t.keyword) topics from tfidf_keywords_values t
inner join
textrank_keywords_values r
where t.keyword=r.keyword
group by article_id2
"""
article_topics = spark.sql(topic_sql)

article_topics 结果如下所示,topics 即为每篇文章的主题词列表

最后,将主题词结果和关键词结果合并,即为文章画像,保存到表 article_profile

1
2
3
article_profile = article_keywords.join(article_topics, article_keywords.article_id==article_topics.article_id2).select(["article_id", "channel_id", "keywords", "topics"])

article_profile.write.insertInto("article_profile")

文章画像数据查询测试

1
2
3
4
hive> select * from article_profile limit 1;
OK
26 17 {"策略":0.3973770571351729,"jpg":0.9806348975390871,"用户":1.2794959063944176,"strong":1.6488457985625076,"文件":0.28144603583387057,"逻辑":0.45256526469610714,"形式":0.4123994242601279,"全自":0.9594604850547191,"h2":0.6244481634710125,"版本":0.44280276959510817,"Adobe":0.8553618185108718,"安装":0.8305037437573172,"检查更新":1.8088946300014435,"产品":0.774842382276899,"下载页":1.4256311032544344,"过程":0.19827163395829256,"json":0.6423301791599972,"方式":0.582762869780791,"退出应用":1.2338671268242603,"Setup":1.004399549339134} ["Electron","全自动","产品","版本号","安装包","检查更新","方案","版本","退出应用","逻辑","安装过程","方式","定性","新版本","Setup","静默","用户"]
Time taken: 0.322 seconds, Fetched: 1 row(s)

Apscheduler 定时更新

定义离线更新文章画像的方法,首先合并最近一个小时的文章信息,接着计算每个词的 TF-IDF 和 TextRank 权重,并根据 TF-IDF 和 TextRank 权重计算得出文章关键词和主题词,最后将文章画像信息保存到 Hive

1
2
3
4
5
6
7
8
9
10
11
def update_article_profile():
"""
定时更新文章画像
:return:
"""
ua = UpdateArticle()
# 合并文章信息
sentence_df = ua.merge_article_data()
if sentence_df.rdd.collect():
textrank_keywords_df, keywordsIndex = ua.generate_article_label()
ua.get_article_profile(textrank_keywords_df, keywordsIndex)

利用 Apscheduler 添加定时更新文章画像任务,设定每隔 1 个小时更新一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor

# 创建scheduler,多进程执行
executors = {
'default': ProcessPoolExecutor(3)
}

scheduler = BlockingScheduler(executors=executors)

# 添加一个定时更新文章画像的任务,每隔1个小时运行一次
scheduler.add_job(update_article_profile, trigger='interval', hours=1)

scheduler.start()

利用 Supervisor 进行进程管理,配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
[program:offline]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/scheduler/main.py
directory=/root/toutiao_project/scheduler
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/offlinesuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true

原文出自(已授权):https://www.jianshu.com/u/ac833cc5146e

参考


【技术服务】,详情点击查看: https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg


扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!