+---------------------------------------------+| Mysql |+----------------------+----------------------+| Binlog+----------------------v----------------------+| Canal Server |+-------------------+-----^-------------------+Get | | Ack+-------------------|-----|-------------------+| FE | | || +-----------------|-----|----------------+ || | Sync Job | | | || | +------------v-----+-----------+ | || | | Canal Client | | || | | +-----------------------+ | | || | | | Receiver | | | || | | +-----------------------+ | | || | | +-----------------------+ | | || | | | Consumer | | | || | | +-----------------------+ | | || | +------------------------------+ | || +----+---------------+--------------+----+ || | | | || +----v-----+ +-----v----+ +-----v----+ || | Channel1 | | Channel2 | | Channel3 | || | [Table1] | | [Table2] | | [Table3] | || +----+-----+ +-----+----+ +-----+----+ || | | | || +--|-------+ +---|------+ +---|------+|| +---v------+| +----v-----+| +----v-----+||| +----------+|+ +----------+|+ +----------+|+|| | Task |+ | Task |+ | Task |+ || +----------+ +----------+ +----------+ |+----------------------+----------------------+| | |+----v-----------------v------------------v---+| Coordinator || BE |+----+-----------------+------------------+---+| | |+----v---+ +---v----+ +----v---+| BE | | BE | | BE |+--------+ +--------+ +--------+
[mysqld]log-bin = mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式
--------------------- ------------------| Slave | read | Master || FileName/Position | <<<--------------------------- | Binlog Files |--------------------- ------------------
statement-based格式: Binlog只保存主节点上执行的sql语句,从节点将其复制到本地重新执行row-based格式: Binlog会记录主节点的每一行所有列的数据的变更信息,从节点会复制并执行每一行的变更到本地
begin;insert into canal_test.test_tbl values (3, 300);insert into canal_test.test_tbl values (4, 400);commit;
SET TIMESTAMP=1538238301/*!*/;BEGIN/*!*/.# at 211935643# at 211935698#180930 0:25:01 server id 1 end_log_pos 211935698 Table_map: 'canal_test'.'test_tbl' mapped to number 25#180930 0:25:01 server id 1 end_log_pos 211935744 Write_rows: table-id 25 flags: STMT_END_F...'/*!*/;### INSERT INTO canal_test.test_tbl### SET### @1=1### @2=100# at 211935744#180930 0:25:01 server id 1 end_log_pos 211935771 Xid = 2681726641...'/*!*/;### INSERT INTO canal_test.test_tbl### SET### @1=2### @2=200# at 211935771#180930 0:25:01 server id 1 end_log_pos 211939510 Xid = 2681726641COMMIT/*!*/;
gtid-mode=on // 开启gtid模式enforce-gtid-consistency=1 // 强制gtid和事务的一致性
source_id标识出主节点,transaction_id表示此事务在主节点上执行的顺序(最大263-1)。GTID = source_id:transaction_id
3E11FA47-71CA-11E1-9E33-C80AA9429562:23
vim conf/{your destination}/instance.properties
## canal instance serverIdcanal.instance.mysql.slaveId = 1234## mysql adresscanal.instance.master.address = 127.0.0.1:3306## mysql username/passwordcanal.instance.dbUsername = canalcanal.instance.dbPassword = canal
sh bin/startup.sh
cat logs/{your destination}/{your destination}.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [xxx/instance.properties]2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-xxx2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
-------------------------------------------------| Server || -------------------------------------------- || | Instance 1 | || | ----------- ----------- ----------- | || | | Parser | | Sink | | Store | | || | ----------- ----------- ----------- | || | ----------------------------------- | || | | MetaManager | | || | ----------------------------------- | || -------------------------------------------- |-------------------------------------------------

canal client异步获取store中数据get 0 get 1 get 2 put| | | ...... |v v v v--------------------------------------------------------------------- store环形队列^ ^| |ack 0 ack 1
--create Mysql tableCREATE TABLE `source_test` (`id` int(11) NOT NULL COMMENT "",`name` int(11) NOT NULL COMMENT "") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- create Doris tableCREATE TABLE `target_test` (`id` int(11) NOT NULL COMMENT "",`name` int(11) NOT NULL COMMENT "") ENGINE=OLAPUNIQUE KEY(`id`)COMMENT "OLAP"DISTRIBUTED BY HASH(`id`) BUCKETS 8;-- enable batch deleteALTER TABLE target_test ENABLE FEATURE "BATCH_DELETE";
CREATE SYNC `demo`.`job`(FROM `source_test` INTO `target_test`(id,name))FROM BINLOG("type" = "canal","canal.server.ip" = "127.0.0.1","canal.server.port" = "11111","canal.destination" = "xxx","canal.username" = "canal","canal.password" = "canal");
CREATE SYNC [db.]job_name(channel_desc,channel_desc...)binlog_desc
job_name是数据同步作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行。channel_desc用来定义任务下的数据通道,可表示mysql源表到doris目标表的映射关系。在设置此项时,如果存在多个映射关系,必须满足 mysql 源表应该与 doris 目标表是一一对应关系,其他的任何映射关系(如一对多关系),检查语法时都被视为不合法。column_mapping主要指 mysql 源表和 doris 目标表的列之间的映射关系,如果不指定,FE会默认源表和目标表的列按顺序一一对应。但是我们依然建议显式的指定列的映射关系,这样当目标表的结构发生变化(例如增加一个 nullable 的列),数据同步作业依然可以进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。binlog_desc中的属性定义了对接远端 Binlog 地址的一些必要信息,目前可支持的对接类型只有 canal 方式,所有的配置项前都需要加上 canal 前缀。canal.server.ip:canal server 的地址。canal.server.port:canal server 的端口。canal.destination:前文提到的 instance 的字符串标识。canal.batchSize:每批从 canal server 处获取的 batch 大小的最大值,默认8192。canal.username:instance 的用户名。canal.password:instance 的密码。canal.debug:设置为 true 时,会将 batch 和每一行数据的详细信息都打印出来,会影响性能。+-------------+create job | PENDING | resume job+-----------+ <-------------+| +-------------+ |+----v-------+ +-------+----+| RUNNING | pause job | PAUSED || +-----------------------> |+----+-------+ run error +-------+----+| +-------------+ || | CANCELLED | |+-----------> <-------------+stop job +-------------+ stop jobsystem error
canal.ip
canal server 的 IP 地址。canal.port
canal server 的端口。canal.instance.memory.buffer.size
canal 端的 store 环形队列的队列长度,必须设为2的幂次方,默认长度16384。此值等于 canal 端能缓存 event 数量的最大值,也直接决定了 Doris 端一个事务内所能容纳的最大 event 数量。建议将它改的足够大,防止 Doris 端一个事务内能容纳的数据量上限太小,导致提交事务太过频繁造成数据的版本堆积。canal.instance.memory.buffer.memunit
canal 端默认一个 event 所占的空间,默认空间为1024 bytes。此值乘以 store 环形队列的队列长度等于 store 的空间最大值,例如 store 队列长度为16384,则 store 的空间为16MB。但是,一个 event 的实际大小并不等于此值,而是由这个 event 内有多少行数据和每行数据的长度决定的,例如一张只有两列的表的 insert event 只有30字节,但 delete event 可能达到数千字节,这是因为通常 delete event 的行数比 insert event 多。sync_commit_interval_second
提交事务的最大时间间隔。若超过了这个时间 channel 中还有数据没有提交,consumer 会通知 channel 提交事务。min_sync_commit_size提交事务需满足的最小event数量。若Fe接收到的event数量小于它,会继续等待下一批数据直到时间超过了`sync_commit_interval_second `为止。默认值是10000个events,如果您想修改此配置,请确保此值小于canal端的`canal.instance.memory.buffer.size`配置(默认16384),否则在ack前Fe会尝试获取比store队列长度更多的event,导致store队列阻塞至超时为止。
min_bytes_sync_commit
提交事务需满足的最小数据大小。若 FE 接收到的数据大小小于它,会继续等待下一批数据直到时间超过了sync_commit_interval_second为止。默认值是15MB,如果您想修改此配置,请确保此值小于 canal 端的canal.instance.memory.buffer.size和canal.instance.memory.buffer.memunit的乘积(默认16MB),否则在 ack 前 Fe 会尝试获取比 store 空间更大的数据,导致 store 队列阻塞至超时为止。max_bytes_sync_commit
提交事务时的数据大小的最大值。若 Fe 接收到的数据大小大于它,会立即提交事务并发送已积累的数据。默认值是64MB,如果您想修改此配置,请确保此值大于 canal 端的 canal.instance.memory.buffer.size 和 canal.instance.memory.buffer.memunit 的乘积(默认16MB)和 min_bytes_sync_commit。max_sync_task_threads_num
数据同步作业线程池中的最大线程数量。此线程池整个FE中只有一个,用于处理 FE 中所有数据同步作业向 BE 发送数据的任务 task,线程池的实现在 SyncTaskPool 类。alter table的操作,当表结构发生了变化,如果列的映射无法匹配,可能导致作业发生错误暂停,建议通过在数据同步作业中显式指定列映射关系,或者通过增加 Nullable 列或带 Default 值的列来减少这类问题。ip:port + destination吗?
不能。创建数据同步作业时会检查 ip:port + destination 与已存在的作业是否重复,防止出现多个作业连接到同一个 instance 的情况。文档反馈