+| 1. user create broker loadv+----+----+| || FE || |+----+----+|| 2. BE etl and load the data+--------------------------+| | |+---v---+ +--v----+ +---v---+| | | | | || BE | | BE | | BE || | | | | |+---+-^-+ +---+-^-+ +--+-^--+| | | | | || | | | | | 3. pull data from broker+---v-+-+ +---v-+-+ +--v-+--+| | | | | ||Broker | |Broker | |Broker || | | | | |+---+-^-+ +---+-^-+ +---+-^-+| | | | | |+---v-+-----------v-+----------v-+-+| HDFS/BOS/AFS cluster || |+----------------------------------+
##The data format is: default, partition field is: dayCREATE TABLE `ods_demo_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'
load data local inpath '/opt/custorm' into table ods_demo_detail;
CREATE TABLE `doris_ods_test_detail` (`rq` date NULL,`id` varchar(32) NOT NULL,`store_id` varchar(32) NULL,`company_id` varchar(32) NULL,`tower_id` varchar(32) NULL,`commodity_id` varchar(32) NULL,`commodity_name` varchar(500) NULL,`commodity_price` decimal(10, 2) NULL,`member_price` decimal(10, 2) NULL,`cost_price` decimal(10, 2) NULL,`unit` varchar(50) NULL,`quantity` int(11) NULL,`actual_price` decimal(10, 2) NULL) ENGINE=OLAPUNIQUE KEY(`rq`, `id`, `store_id`)PARTITION BY RANGE(`rq`)(PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))DISTRIBUTED BY HASH(`store_id`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "MONTH","dynamic_partition.start" = "-2147483648","dynamic_partition.end" = "2","dynamic_partition.prefix" = "P_","dynamic_partition.buckets" = "1","in_memory" = "false","storage_format" = "V2");
LOAD LABEL broker_load_2022_03_23(DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
192.168.20.123:8020 is the IP and port of the active namenode for the HDFS cluster used by the Hive table.#Data format: ORC partition: dayCREATE TABLE `ods_demo_orc_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'STORED AS ORC
LOAD LABEL dish_2022_03_23(DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","FORMAT AS "orc"(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
FORMAT AS "orc": Specifies the format of the data to be imported.SET: Defines the field mapping relationship between the Hive table and the Doris table and some operations to convert the fields.LOAD LABEL demo.label_20220402(DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")INTO TABLE `ods_dish_detail_test`COLUMNS TERMINATED BY "\\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price))with HDFS ("fs.defaultFS"="hdfs://10.220.147.151:8020","hdfs_user"="root")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
mysql> show load order by createtime desc limit 1\\G;*************************** 1. row ***************************JobId: 41326624Label: broker_load_2022_03_23State: FINISHEDProgress: ETL:100%; LOAD:100%Type: BROKEREtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1ErrorMsg: NULLCreateTime: 2022-04-01 18:59:06EtlStartTime: 2022-04-01 18:59:11EtlFinishTime: 2022-04-01 18:59:11LoadStartTime: 2022-04-01 18:59:11LoadFinishTime: 2022-04-01 18:59:11URL: NULLJobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}1 row in set (0.01 sec)
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
fe.conf file.The concurrency count for this import = Math.min(Source file size / Minimum processing volume, Maximum concurrency, number of Current BE node)The amount of data processed by a single BE for this import = Source file size / The concurrency count of this import
max_bytes_per_broker_scanner * BE node count. If you need to import larger amounts of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately.
Default configuration:Parameter: min_bytes_per_broker_scanner. Default value: 64MB. Unit: bytes.Parameter: max_broker_concurrency. Default value: 10.Parameter: max_bytes_per_broker_scanner. Default value: 3GB. Unit: bytes.
Edit the configuration in fe.confmax_broker_concurrency = Number of BEsData volume processed by a single BE for the current import task = Original file size / max_broker_concurrencymax_bytes_per_broker_scanner >= Data volume processed by a single BE for the current import taskFor example, a 100G file, the number of BEs in the cluster is 10max_broker_concurrency = 10max_bytes_per_broker_scanner >= 10G = 100G / 10
Data volume processed by a single BE for the current import task / User's slowest import speed for Doris cluster (MB/s) >= Timeout for current import task >= Data volume processed by a single BE for current import task / 10M/sFor example, a 100G file, the number of BEs in the cluster is 10timeout >= 1000s = 10G / 10M/s
Expected maximum import file volume = 14400s * 10M/s * Number of BEsFor example: The number of BEs in the cluster is 10Expected maximum import file volume = 14400s * 10M/s * 10 = 1440000M ≈ 1440GNote: Generally, the user's environment may not reach a speed of 10M/s, so it's recommended to split files larger than 500G and then import them.
desired_max_waiting_jobs restricts the number of Broker load jobs that do not start or are running (Job status is PENDING or LOADING) within a cluster. Default value: 100. If this threshold is exceeded, the newly submitted job will be directly rejected.async_pending_load_task_pool_size is used to limit the number of pending tasks running at the same time. It also controls the actual number of running import tasks. The default parameter is 10. That is to say, assuming that the user has submitted 100 load jobs, only 10 jobs will enter LOADING status to start execution at the same time and the other Jobs are in the PENDING waiting state.async_loading_load_task_pool_size is used to limit the number of loading tasks running simultaneously. A Broker load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). Therefore, async_loading_load_task_pool_size should be greater than or equal to the async_pending_load_task_pool_size.set enable_profile=true to enable the session variable. Then you can submit the task. Once the job is completed, you can view the Job's profile in the Queris Tab on the FE's web page. You can view the SHOW LOAD PROFILE help document to get more help information.
This profile can help analyze the running status of import jobs. Currently, the profile can be viewed only when jobs are completed.Scan bytes per broker scanner exceed limit: xxx.
Please see the Best Practices section of the document and modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency.failed to send batch or TabletWriter add batch with unknown id.
Appropriately modify query_timeout and streaming_load_rpc_max_alive_time_sec.
streaming_load_rpc_max_alive_time_sec: During the import process, Doris enables a Writer for each Tablet to receive and write data. This parameter specifies the wait timeout for the Writer. If the Writer hasn't received any data within this time, it will be automatically destroyed. If the system's processing speed is slow, the Writer might not receive the next batch of data for a long time, causing import error: TabletWriter add batch with unknown id. In this case, it's appropriate to increase this configuration. The default is 600 seconds.LOAD_RUN_FAIL; msg: Invalid Column Name: xxx.
If the data is in PARQUET or ORC format, the column names in the file header needs to be consisttent with the column names in the Doris table, such as:(tmp_c1,tmp_c2)SET(id=tmp_c2,name=tmp_c1)
Feedback