在推荐系统架构中,推荐系统的数据库和业务系统的数据库是分离的,这样才能有效避免推荐系统的数据读写、计算等对业务系统的影响,所以同步业务数据往往也是推荐系统要做的第一件事情。通常业务数据是存储在关系型数据库中,比如 MySQL,而推荐系统通常需要使用大数据平台,比如 Hadoop,我们可以利用 Sqoop 将 MySQL 中的业务数据同步到 Hive 中,在我们的文章推荐系统中,需要同步的数据包括:

  • 用户信息表(user_profile):包括 user_id | gender | birthday | real_name | create_time | update_time | register_media_time | id_number | id_card_front | id_card_back | id_card_handheld | area | company | career 等字段。
  • 用户基础信息表(user_basic):包括 user_id | mobile | password | user_name | profile_photo | last_login | is_media | article_count | following_count | fans_count | like_count | read_count | introduction | certificate | is_verified | account | email | status 等字段。
  • 文章信息表(news_article_basic):包括 article_id | user_id | channel_id | title | cover | is_advertising | create_time | status | reviewer_id | review_time | delete_time | comment_count | allow_comment | update_time | reject_reason 等字段。
  • 文章内容表(news_article_content):包括 article_id | content | update_time 等字段。
  • 频道信息表(news_channel):包括 channel_id | channel_name | create_time | update_time | sequence | is_visible | is_default 等字段。

首先,在 Hive 中创建业务数据库 toutiao,并创建相关表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
create database if not exists toutiao comment "user,news information of mysql" location '/user/hive/warehouse/toutiao.db/';
-- 用户信息表
create table user_profile
(
user_id BIGINT comment "userid",
gender BOOLEAN comment "gender",
birthday STRING comment "birthday",
real_name STRING comment "real_name",
create_time STRING comment "create_time",
update_time STRING comment "update_time",
register_media_time STRING comment "register_media_time",
id_number STRING comment "id_number",
id_card_front STRING comment "id_card_front",
id_card_back STRING comment "id_card_back",
id_card_handheld STRING comment "id_card_handheld",
area STRING comment "area",
company STRING comment "company",
career STRING comment "career"
)
COMMENT "toutiao user profile"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/user_profile';
-- 用户基础信息表
create table user_basic
(
user_id BIGINT comment "user_id",
mobile STRING comment "mobile",
password STRING comment "password",
profile_photo STRING comment "profile_photo",
last_login STRING comment "last_login",
is_media BOOLEAN comment "is_media",
article_count BIGINT comment "article_count",
following_count BIGINT comment "following_count",
fans_count BIGINT comment "fans_count",
like_count BIGINT comment "like_count",
read_count BIGINT comment "read_count",
introduction STRING comment "introduction",
certificate STRING comment "certificate",
is_verified BOOLEAN comment "is_verified"
)
COMMENT "toutiao user basic"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/user_basic';
-- 文章基础信息表
create table news_article_basic
(
article_id BIGINT comment "article_id",
user_id BIGINT comment "user_id",
channel_id BIGINT comment "channel_id",
title STRING comment "title",
status BIGINT comment "status",
update_time STRING comment "update_time"
)
COMMENT "toutiao news_article_basic"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_article_basic';
-- 文章频道表
create table news_channel
(
channel_id BIGINT comment "channel_id",
channel_name STRING comment "channel_name",
create_time STRING comment "create_time",
update_time STRING comment "update_time",
sequence BIGINT comment "sequence",
is_visible BOOLEAN comment "is_visible",
is_default BOOLEAN comment "is_default"
)
COMMENT "toutiao news_channel"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_channel';
-- 文章内容表
create table news_article_content
(
article_id BIGINT comment "article_id",
content STRING comment "content",
update_time STRING comment "update_time"
)
COMMENT "toutiao news_article_content"
row format delimited fields terminated by ',' # 指定分隔符
LOCATION '/user/hive/warehouse/toutiao.db/news_article_content';

查看 Sqoop 连接到 MySQL 的数据库列表

1
2
3
4
5
sqoop list-databases --connect jdbc:mysql://192.168.19.137:3306/ --username root -P

mysql
sys
toutiao

Sqoop 可以指定全量导入和增量导入,通常我们可以先执行一次全量导入,将历史数据全部导入进来,后面再通过定时任务执行增量导入,来保持 MySQL 和 Hive 的数据同步,全量导入不需要提前创建 Hive 表,可以自动创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
array=(user_profile user_basic news_article_basic news_channel news_article_content)

for table_name in ${array[@]};
do
sqoop import \
--connect jdbc:mysql://192.168.19.137/toutiao \
--username root \
--password password \
--table $table_name \
--m 5 \ # 线程数
--hive-home /root/bigdata/hive \ # hive路径
--hive-import \ # 导入形式
--create-hive-table \ # 自动创建表
--hive-drop-import-delims \
--warehouse-dir /user/hive/warehouse/toutiao.db \ # 导入地址
--hive-table toutiao.$table_name
done

增量导入,有 append 和 lastmodified 两种模式

  • append:通过指定一个递增的列进行更新,只能追加,不能修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    num=0
    declare -A check
    check=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id [news_article_content]=channel_id)

    for k in ${!check[@]}
    do
    sqoop import \
    --connect jdbc:mysql://192.168.19.137/toutiao \
    --username root \
    --password password \
    --table $k \
    --m 4 \
    --hive-home /root/bigdata/hive \ # hive路径
    --hive-import \ # 导入到hive
    --create-hive-table \ # 自动创建表
    --incremental append \ # 按照id更新
    --check-column ${check[$k]} \ # 指定id列
    --last-value ${num} # 指定最后更新的id
    done
  • lastmodified:按照最后修改时间更新,支持追加和修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    time=`date +"%Y-%m-%d" -d "-1day"`
    declare -A check
    check=([user_profile]=update_time [user_basic]=last_login [news_article_basic]=update_time [news_channel]=update_time)
    declare -A merge
    merge=([user_profile]=user_id [user_basic]=user_id [news_article_basic]=article_id [news_channel]=channel_id)

    for k in ${!check[@]}
    do
    sqoop import \
    --connect jdbc:mysql://192.168.19.137/toutiao \
    --username root \
    --password password \
    --table $k \
    --m 4 \
    --target-dir /user/hive/warehouse/toutiao.db/$k \ # hive路径
    --incremental lastmodified \ # 按照最后修改时间更新
    --check-column ${check[$k]} \ # 指定时间列
    --merge-key ${merge[$k]} \ # 根据指定列合并重复或修改数据
    --last-value ${time} # 指定最后修改时间
    done

Sqoop 可以直接导入到 Hive,自动创建 Hive 表,但是 lastmodified 模式不支持
Sqoop 也可以先导入到 HDFS,然后再建立 Hive 表关联,为了使用 lastmodified 模式,通常使用这种方法

1
--target-dir /user/hive/warehouse/toutiao.db/user_profile # 指定导入的HDFS目录

若先导入到 HDFS,需要注意 HDFS 默认分隔符为 , 而 Hive 默认分隔符为 /u0001,所以需要在创建 Hive 表时指定分隔符为 HDFS 分隔符 ,,若不指定分隔符,查询结果将全部为 NULL

1
row format delimited fields terminated by ',' # 指定分隔符

如果 MySQL 数据中存在特殊字符,如 , \t \n 都会导致 Hive 读取失败(但不影响导入到 HDFS 中),可以利用 --query 参数,在读取时使用 REPLACE() CHAR() CHR() 进行字符替换。如果特殊字符过多,比如 news_article_content 表,可以选择全量导入

1
--query 'select article_id, user_id, channel_id, REPLACE(REPLACE(REPLACE(title, CHAR(13),""),CHAR(10),""), ",", " ") title, status, update_time from news_article_basic WHERE $CONDITIONS' \

如果 MySQL 数据中存在 tinyint 类型,必须在 --connet 中加入 ?tinyInt1isBit=false,防止 Hive 将 tinyint 类型默认转化为 boolean 类型

1
--connect jdbc:mysql://192.168.19.137/toutiao?tinyInt1isBit=false

MySQL 与 Hive 对应类型如下
MySQL|Hive
-|-
bigint|bigint
tinyint|boolean
int|int
double|double
bit|boolean
varchar|string
decimal|double
date / timestamp|string
我们可以利用 crontab 来创建定时任务,将更新命令写入脚本 import_incremental.sh,输入 crontab -e 打开编辑界面,输入如下内容,即可定时执行数据同步任务

1
2
# 每隔半小时增量导入一次
*/30 * * * * /root/toutiao_project/scripts/import_incremental.sh

crontab 命令格式为 * * * * * shell,其中前五个 * 分别代表分钟 (0-59)、小时(0-59)、月内的某天(1-31)、月(1-12)、周内的某天(0-7,周日为 0 或 7),shell 表示要执行的命令或脚本,使用方法如下

1
2
3
4
5
6
# 每隔5分钟运行一次backupscript脚本
*/5 * * * * /root/backupscript.sh
# 每天的凌晨1点运行backupscript脚本
0 1 * * * /root/backupscript.sh
# 每月的第一个凌晨3:15运行backupscript脚本
15 3 1 * * /root/backupscript.sh

crontab 常用命令如下

1
2
3
4
5
6
7
8
9
10
crontab -e      # 修改 crontab 文件,如果文件不存在会自动创建。 
crontab -l # 显示 crontab 文件。
crontab -r # 删除 crontab 文件。
crontab -ir # 删除 crontab 文件前提醒用户。

service crond status # 查看crontab服务状态
service crond start # 启动服务
service crond stop # 关闭服务
service crond restart # 重启服务
service crond reload # 重新载入配置

原文出自(已授权):https://www.jianshu.com/u/ac833cc5146e

参考


【技术服务】,详情点击查看: https://mp.weixin.qq.com/s/PtX9ukKRBmazAWARprGIAg


扫一扫 关注微信公众号!号主 专注于搜索和推荐系统,尝试使用算法去更好的服务于用户,包括但不局限于机器学习,深度学习,强化学习,自然语言理解,知识图谱,还不定时分享技术,资料,思考等文章!