tencent cloud

弹性 MapReduce

动态与公告
产品动态
产品公告
安全公告
产品简介
产品概述
产品优势
产品架构
产品功能
应用场景
约束与限制
技术支持范围
产品发行版
购买指南
EMR on CVM 计费说明
EMR on TKE 计费说明
EMR Serverless HBase 计费说明
快速入门
EMR on CVM 快速入门
EMR on TKE 快速入门
EMR on CVM 操作指南
规划集群
管理权限
配置集群
管理集群
管理服务
监控告警
智能管家
EMR on TKE 操作指南
EMR on TKE 简介
配置集群
管理集群
管理服务
监控运维
应用分析
EMR Serverless HBase 操作指南
EMR Serverless HBase 产品简介
配额与限制
规划实例
管理实例
监控告警
开发指南
EMR 开发指南
Hadoop开发指南
Spark 开发指南
HBASE开发指南
Phoenix on Hbase 开发指南
Hive 开发指南
Presto开发指南
Sqoop 开发指南
Hue 开发指南
Oozie 开发指南
Flume 开发指南
Kerberos 开发指南
Knox 开发指南
Alluxio 开发指南
Kylin 开发指南
Livy 开发指南
Kyuubi 开发指南
Zeppelin 开发指南
Hudi 开发指南
Superset 开发指南
Impala 开发指南
Druid 开发指南
Tensorflow 开发指南
Kudu 开发指南
Ranger 开发指南
Kafka 开发指南
Iceberg 开发指南
StarRocks 开发指南
Flink 开发指南
JupyterLab 开发指南
MLflow 开发指南
实践教程
EMR on CVM 运维实践
数据迁移实践
自定义伸缩实践教程
API 文档
History
Introduction
API Category
Cluster Resource Management APIs
Cluster Services APIs
User Management APIs
Data Inquiry APIs
Scaling APIs
Configuration APIs
Other APIs
Serverless HBase APIs
YARN Resource Scheduling APIs
Making API Requests
Data Types
Error Codes
常见问题
EMR on CVM常见问题
服务等级协议
联系我们

增量 DB 数据到 HDFS

PDF
聚焦模式
字号
最后更新时间: 2025-02-12 16:52:06
Sqoop 是一款开源的工具,主要用于在 Hadoop 和传统数据库(MySQL、PostgreSQL 等)之间进行数据传递,可以将一个关系型数据库(例如 MySQL、Oracle、Postgres 等)中的数据导入到 Hadoop 的 HDFS 中,也可以将 HDFS 的数据导入到关系型数据库中。Sqoop 中一大亮点就是可以通过 Hadoop 的 MapReduce 把数据从关系型数据库中导入数据到 HDFS。
本文介绍了 Sqoop 的增量导入操作,即在数据库中的数据增加或更新后,把数据库的改动同步到导入 HDFS 的数据中。其中分为 append 模式和 lastmodified 模式,append 模式只能用在数据库的数据增加但不更新的场景,lastmodified 模式用在数据增加并且更新的场景。

1. 开发准备

确认您已经开通了腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置界面选择 Sqoop 组件。
Sqoop 等相关软件安装在路径 EMR 云服务器的 /usr/local/service/ 路径下。

2. 使用 append 模式

本节将继续使用上一节的用例。 进入 EMR 控制台,复制目标集群的实例 ID,即集群的名字。再进入关系型数据库控制台,使用 Ctrl+F 进行搜索,找到集群对应的 MySQL 数据库,查看该数据库的内网地址 $mysqlIP。
登录 EMR 集群中的任意机器,最好是登录到 Master 节点。登录 EMR 的方式请参考 登录 Linux 实例。这里我们可以选择使用 WebShell 登录。单击对应云服务器右侧的登录,进入登录界面,用户名默认为 root,密码为创建 EMR 时用户自己输入的密码。输入正确后,即可进入命令行界面。
在 EMR 命令行先使用以下指令切换到 Hadoop 用户,并进入 Sqoop 文件夹:
[root@172 ~]# su hadoop
[hadoop@172 ~]# cd /usr/local/service/sqoop
连接 MySQL 数据库:
[hadoop@172 sqoop]$ mysql -h $mysqlIP –p
Enter password:
密码为您创建 EMR 集群的时候设置的密码。 连接了 MySQL 数据库之后,在表 sqoop_test 中新增一条数据,如下:
mysql> use test;
Database changed

mysql> insert into sqoop_test values(null, 'forth', now(), 'hbase');
Query ok, 1 row affected(0.00 sec)
查看表中的数据:
Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title | time | content |
+----+--------+---------------------+---------+
| 1 | first | 2018-07-03 15:29:37 | hdfs |
| 2 | second | 2018-07-03 15:30:57 | mr |
| 3 | third | 2018-07-03 15:31:07 | yarn |
| 4 | forth | 2018-07-03 15:39:38 | hbase |
+----+--------+---------------------+---------+
4 rows in set (0.00 sec)
使用 append 模式将新增的数据同步到上一节中储存数据的 HDFS 路径中:
[hadoop@172 sqoop]$ bin/sqoop-import --connect jdbc:mysql://$mysqlIP/test --username
root -P --table sqoop_test --check-column id --incremental append --last-value 3 --target-dir
/sqoop
其中 $mysqlIP 为您的 MySQL 数据库的内网地址。
执行命令会需要您输入数据库的密码,默认为您创建 EMR 集群时设置的密码。比普通的 sqoop-import 命令多出一些参数,其中 --check-column 为导入时参照的数据,--incremental 为导入的模式,在此例中为 append,--last-value 为参考数据的参考值,比该值更新的数据都会导入到 HDFS 中。
执行成功后,可以查看 HDFS 相应目录下更新后的数据:
[hadoop@172 sqoop]$ hadoop fs -cat /sqoop/*
1, first, 2018-07-03 15:29:37.0,hdfs
2, second, 2018-07-03 15:30:57.0,mr
3, third, 2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase

使用 Sqoop job

使用 append 同步 HDFS 中的数据每次需要手动输入 --last-value,也可以使用 sqoop job 的方式,Sqoop 会自动保存上次导入成功的 last-value 值。如果要使用 sqoop job。需要启动 sqoop-metastore 进程,操作步骤如下:
首先在 conf/sqoop-site.xml 中启动 sqoop-metastore 进程:
<property>
<name>sqoop.metastore.client.enable.autoconnect</name>
<value>true</value>
</property>
然后在 bin 目录下启动 sqoop-metastore 服务:
./sqoop-metastore &
使用如下指令创建 Sqoop job:
说明
此命令适用于 Sqoop 1.4.6 版本。
[hadoop@172 sqoop]$ bin/sqoop job --create job1 -- import --connect
jdbc:mysql://$mysqlIP/test --username root -P --table sqoop_test --check-column id
--incremental append --last-value 4 --target-dir /sqoop
其中 $mysqlIP 为您的 MySQL 的内网地址。使用该命令就成功创建了一个 Sqoop job,每一次执行,会自动从上次更新的 last-value 值自动更新。
为 MySQL 中的 sqoop_test 表格新增一条记录:
mysql> insert into sqoop_test values(null, 'fifth', now(), 'hive');
Query ok, 1 row affected(0.00 sec)

Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title | time | content |
+----+--------+---------------------+---------+
| 1 | first | 2018-07-03 15:29:37 | hdfs |
| 2 | second | 2018-07-03 15:30:57 | mr |
| 3 | third | 2018-07-03 15:31:07 | yarn |
| 4 | forth | 2018-07-03 15:39:38 | hbase |
| 5 | fifth | 2018-07-03 16:02:29 | hive |
+----+--------+---------------------+---------+
5 rows in set (0.00 sec)
然后执行 Sqoop job:
[hadoop@172 sqoop]$ bin/sqoop job --exec job1
执行该命令会让您输入 MySQL 的密码。执行成功后,可以查看 HDFS 相应目录下更新后的数据:
[hadoop@172 sqoop]$ hadoop fs -cat /sqoop/*
1, first, 2018-07-03 15:29:37.0,hdfs
2, second, 2018-07-03 15:30:57.0,mr
3, third, 2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase
5,fifth,2018-07-03 16:02:29.0,hive

3. 使用 lastmodified 模式

直接创建一个 sqoop-import 的 lastmodified 模式的 Sqoop job,首先查询 sqoop_test 中最后更新的时间:
mysql> select max(time) from sqoop_test;
创建一个 Sqoop job:
[hadoop@172 sqoop]$ bin/sqoop job --create job2 -- import --connect jdbc:mysql://$mysqlIP/test --username root -P --table sqoop_test --check-column time --incremental lastmodified --merge-key id --last-value '2018-07-03 16:02:29' --target-dir /sqoop
参数说明:
$mysqlIP 为您的 MySQL 的内网地址。
--check-column 必须使用 timestamp。
--incremental 模式选择 lastmodified。
--merge-key 选择 ID。
--last-value 为我们查询到的表中的最后更新时间。在此时间后做出的更新都会被同步到 HDFS 中,而 Sqoop job 每次会自动保存和更新该值。
对 MySQL 中的 sqoop_test 表添加数据并做出更改:
mysql> insert into sqoop_test values(null, 'sixth', now(), 'sqoop');
Query ok, 1 row affected(0.00 sec)

mysql> update sqoop_test set time=now(), content='spark' where id = 1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 changed: 1 warnings: 0

Mysql> select * from sqoop_test;
+----+--------+---------------------+---------+
| id | title | time | content |
+----+--------+---------------------+---------+
| 1 | first | 2018-07-03 16:07:46 | spark |
| 2 | second | 2018-07-03 15:30:57 | mr |
| 3 | third | 2018-07-03 15:31:07 | yarn |
| 4 | forth | 2018-07-03 15:39:38 | hbase |
| 5 | fifth | 2018-07-03 16:02:29 | hive |
| 6 | fifth | 2018-07-03 16:09:58 | sqoop |
+----+--------+---------------------+---------+
6 rows in set (0.00 sec)
执行 Sqoop job:
[hadoop@172 sqoop]$ bin/sqoop job --exec job2
执行该命令会让您输入 MySQL 的密码。执行成功后,可以查看 HDFS 相应目录下更新后的数据:
[hadoop@172 sqoop]$ hdfs dfs -cat /sqoop/*
1,first,2018-07-03 16:07:46.0,spark
2,second,2018-07-03 15:30:57.0,mr
3,third,2018-07-03 15:31:07.0,yarn
4,forth,2018-07-03 15:39:38.0,hbase
5,fifth,2018-07-03 16:02:29.0,hive
6,sixth,2018-07-03 16:09:58.0,sqoop
更多的 Sqoop 操作可以查看 官方文档

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈