+| 1. user create broker loadv+----+----+| || FE || |+----+----+|| 2. BE etl and load the data+--------------------------+| | |+---v---+ +--v----+ +---v---+| | | | | || BE | | BE | | BE || | | | | |+---+-^-+ +---+-^-+ +--+-^--+| | | | | || | | | | | 3. pull data from broker+---v-+-+ +---v-+-+ +--v-+--+| | | | | ||Broker | |Broker | |Broker || | | | | |+---+-^-+ +---+-^-+ +---+-^-+| | | | | |+---v-+-----------v-+----------v-+-+| HDFS/BOS/AFS cluster || |+----------------------------------+
##数据格式是:默认,分区字段是:dayCREATE TABLE `ods_demo_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'
load data local inpath '/opt/custorm' into table ods_demo_detail;
CREATE TABLE `doris_ods_test_detail` (`rq` date NULL,`id` varchar(32) NOT NULL,`store_id` varchar(32) NULL,`company_id` varchar(32) NULL,`tower_id` varchar(32) NULL,`commodity_id` varchar(32) NULL,`commodity_name` varchar(500) NULL,`commodity_price` decimal(10, 2) NULL,`member_price` decimal(10, 2) NULL,`cost_price` decimal(10, 2) NULL,`unit` varchar(50) NULL,`quantity` int(11) NULL,`actual_price` decimal(10, 2) NULL) ENGINE=OLAPUNIQUE KEY(`rq`, `id`, `store_id`)PARTITION BY RANGE(`rq`)(PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))DISTRIBUTED BY HASH(`store_id`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "MONTH","dynamic_partition.start" = "-2147483648","dynamic_partition.end" = "2","dynamic_partition.prefix" = "P_","dynamic_partition.buckets" = "1","in_memory" = "false","storage_format" = "V2");
LOAD LABEL broker_load_2022_03_23(DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
192.168.20.123:8020是 hive 表所用 HDFS 集群的 active namenode 的 IP 和 Port。#数据格式:ORC 分区:dayCREATE TABLE `ods_demo_orc_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'STORED AS ORC
LOAD LABEL dish_2022_03_23(DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","FORMAT AS "orc"(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
FORMAT AS "orc" :指定了要导入的数据的格式。SET : 定义了 Hive 表和 Doris 表之间的字段映射关系及字段转换的一些操作。LOAD LABEL demo.label_20220402(DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")INTO TABLE `ods_dish_detail_test`COLUMNS TERMINATED BY "\\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price))with HDFS ("fs.defaultFS"="hdfs://10.220.147.151:8020","hdfs_user"="root")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
mysql> show load order by createtime desc limit 1\\G;*************************** 1. row ***************************JobId: 41326624Label: broker_load_2022_03_23State: FINISHEDProgress: ETL:100%; LOAD:100%Type: BROKEREtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1ErrorMsg: NULLCreateTime: 2022-04-01 18:59:06EtlStartTime: 2022-04-01 18:59:11EtlFinishTime: 2022-04-01 18:59:11LoadStartTime: 2022-04-01 18:59:11LoadFinishTime: 2022-04-01 18:59:11URL: NULLJobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}1 row in set (0.01 sec)
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
fe.conf来调整配置值。本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)本次导入单个BE的处理量 = 源文件大小/本次导入的并发数
max_bytes_per_broker_scanner * BE 节点数。如果需要导入更大数据量,则需要适当调整 max_bytes_per_broker_scanner 参数的大小。
默认配置:参数名:min_bytes_per_broker_scanner, 默认 64MB,单位bytes。参数名:max_broker_concurrency, 默认 10。参数名:max_bytes_per_broker_scanner,默认 3G,单位bytes。
修改 fe.conf 中配置max_broker_concurrency = BE 个数当前导入任务单个 BE 处理的数据量 = 原始文件大小 / max_broker_concurrencymax_bytes_per_broker_scanner >= 当前导入任务单个 BE 处理的数据量例如一个 100G 的文件,集群的 BE 个数为 10 个max_broker_concurrency = 10max_bytes_per_broker_scanner >= 10G = 100G / 10
当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s例如一个 100G 的文件,集群的 BE 个数为 10个timeout >= 1000s = 10G / 10M/s
期望最大导入文件数据量 = 14400s * 10M/s * BE 个数例如:集群的 BE 个数为 10个期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。
desired_max_waiting_jobs 会限制一个集群内,未开始或正在运行(作业状态为 PENDING 或 LOADING)的 Broker load 作业数量。默认为 100。如果超过这个阈值,新提交的作业将会被直接拒绝。async_pending_load_task_pool_size 用于限制同时运行的 pending task 的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为 10。也就是说,假设用户提交了100个Load作业,同时只会有10个作业会进入 LOADING 状态开始执行,而其他作业处于 PENDING 等待状态。async_loading_load_task_pool_size 用于限制同时运行的 loading task 的任务数量。一个 Broker load 作业会有 1 个 pending task 和多个 loading task (等于 LOAD 语句中 DATA INFILE 子句的个数)。所以 async_loading_load_task_pool_size 应该大于等于 async_pending_load_task_pool_size。set enable_profile=true 打开会话变量。然后提交导入作业。待导入作业完成后,可以在 FE 的 web 页面的 Queris 标签中查看到导入作业的 Profile。可以查看 SHOW LOAD PROFILE 帮助文档,获取更多使用帮助信息。
这个 Profile 可以帮助分析导入作业的运行状态。当前只有作业成功执行后,才能查看 Profile。Scan bytes per broker scanner exceed limit:xxx。
请参照文档中最佳实践部分,修改 FE 配置项 max_bytes_per_broker_scanner 和 max_broker_concurrency。failed to send batch 或 TabletWriter add batch with unknown id。
适当修改 query_timeout 和 streaming_load_rpc_max_alive_time_sec。
streaming_load_rpc_max_alive_time_sec:在导入过程中,Doris 会为每一个 Tablet 开启一个 Writer,用于接收数据并写入。这个参数指定了 Writer 的等待超时时间。如果在这个时间内,Writer 没有收到任何数据,则 Writer 会被自动销毁。当系统处理速度较慢时,Writer 可能长时间接收不到下一批数据,导致导入报错:TabletWriter add batch with unknown id。此时可适当增大这个配置。默认为 600 秒。LOAD_RUN_FAIL; msg:Invalid Column Name:xxx。
如果是 PARQUET 或者 ORC 格式的数据,需要在文件头的列名与 Doris 表中的列名一致,如:(tmp_c1,tmp_c2)SET(id=tmp_c2,name=tmp_c1)
文档反馈