+| 1. user create broker loadv+----+----+| || FE || |+----+----+|| 2. BE etl and load the data+--------------------------+| | |+---v---+ +--v----+ +---v---+| | | | | || BE | | BE | | BE || | | | | |+---+-^-+ +---+-^-+ +--+-^--+| | | | | || | | | | | 3. pull data from broker+---v-+-+ +---v-+-+ +--v-+--+| | | | | ||Broker | |Broker | |Broker || | | | | |+---+-^-+ +---+-^-+ +---+-^-+| | | | | |+---v-+-----------v-+----------v-+-+| HDFS/BOS/AFS cluster || |+----------------------------------+
##The data format is: default, partition field is: dayCREATE TABLE `ods_demo_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'
load data local inpath '/opt/custorm' into table ods_demo_detail;
CREATE TABLE `doris_ods_test_detail` (`rq` date NULL,`id` varchar(32) NOT NULL,`store_id` varchar(32) NULL,`company_id` varchar(32) NULL,`tower_id` varchar(32) NULL,`commodity_id` varchar(32) NULL,`commodity_name` varchar(500) NULL,`commodity_price` decimal(10, 2) NULL,`member_price` decimal(10, 2) NULL,`cost_price` decimal(10, 2) NULL,`unit` varchar(50) NULL,`quantity` int(11) NULL,`actual_price` decimal(10, 2) NULL) ENGINE=OLAPUNIQUE KEY(`rq`, `id`, `store_id`)PARTITION BY RANGE(`rq`)(PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))DISTRIBUTED BY HASH(`store_id`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "MONTH","dynamic_partition.start" = "-2147483648","dynamic_partition.end" = "2","dynamic_partition.prefix" = "P_","dynamic_partition.buckets" = "1","in_memory" = "false","storage_format" = "V2");
LOAD LABEL broker_load_2022_03_23(DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
192.168.20.123:8020 is the IP address and Port of the active namenode for the HDFS cluster used by the Hive table.#Data format: ORC partition: dayCREATE TABLE `ods_demo_orc_detail`(`id` string,`store_id` string,`company_id` string,`tower_id` string,`commodity_id` string,`commodity_name` string,`commodity_price` double,`member_price` double,`cost_price` double,`unit` string,`quantity` double,`actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\\n'STORED AS ORC
LOAD LABEL dish_2022_03_23(DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")INTO TABLE doris_ods_test_detailCOLUMNS TERMINATED BY ","FORMAT AS "orc"(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)COLUMNS FROM PATH AS (`day`)SET(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price))WITH BROKER "broker_name_1"("username" = "hdfs","password" = "")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
FORMAT AS "orc": Specifies the format of the data to be imported.SET: Defines the field mapping relationship between the Hive table and the Doris table and some operations to convert the fields.LOAD LABEL demo.label_20220402(DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")INTO TABLE `ods_dish_detail_test`COLUMNS TERMINATED BY "\\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price))with HDFS ("fs.defaultFS"="hdfs://10.220.147.151:8020","hdfs_user"="root")PROPERTIES("timeout"="1200","max_filter_ratio"="0.1");
mysql> show load order by createtime desc limit 1\\G;*************************** 1. row ***************************JobId: 41326624Label: broker_load_2022_03_23State: FINISHEDProgress: ETL:100%; LOAD:100%Type: BROKEREtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1ErrorMsg: NULLCreateTime: 2022-04-01 18:59:06EtlStartTime: 2022-04-01 18:59:11EtlFinishTime: 2022-04-01 18:59:11LoadStartTime: 2022-04-01 18:59:11LoadFinishTime: 2022-04-01 18:59:11URL: NULLJobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}1 row in set (0.01 sec)
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";
fe.conf file.The concurrency count for this import = Math.min(Source file size / Minimum processing volume, Maximum concurrency, number of Current BE node)The amount of data processed by a single BE for this import = Source file size / The concurrency count of this import
max_bytes_per_broker_scanner * BE node count. If you need to import larger amounts of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately.
Default configuration:Parameter: min_bytes_per_broker_scanner. Default value: 64MB. Unit: bytes.Parameter: max_broker_concurrency. Default value: 10.Parameter: max_bytes_per_broker_scanner. Default value: 3GB. Unit: bytes.
Edit the configuration in fe.confmax_broker_concurrency = Number of BEsData volume processed by a single BE for the current import task = Original file size / max_broker_concurrencymax_bytes_per_broker_scanner >= Data volume processed by a single BE for the current import taskFor example, a 100G file, the number of BEs in the cluster is 10max_broker_concurrency = 10max_bytes_per_broker_scanner >= 10G = 100G / 10
Data volume processed by a single BE for the current import task / User's slowest import speed for Doris cluster (MB/s) >= Timeout for current import task >= Data volume processed by a single BE for current import task / 10M/sFor example, a 100G file, the number of BEs in the cluster is 10timeout >= 1000s = 10G / 10M/s
Expected maximum import file volume = 14400s * 10M/s * Number of BEsFor example: The number of BEs in the cluster is 10Expected maximum import file volume = 14400s * 10M/s * 10 = 1440000M ≈ 1440GNote: Generally, the user's environment may not reach a speed of 10M/s, so it's recommended to split files larger than 500G and then import them.
desired_max_waiting_jobs restricts the number of Broker load jobs that do not start or are running (Job status is PENDING or LOADING) within a cluster. Default value: 100. If this threshold is exceeded, the newly submitted job will be directly rejected.async_pending_load_task_pool_size is used to limit the number of concurrently running pending tasks. This also effectively controls the number of actual running import jobs. This parameter defaults to 10. In other words, if a user submits 100 Load jobs, only 10 jobs will go to the LOADING state and start execution at the same time, while the other jobs remain in the PENDING waiting state.async_loading_load_task_pool_size is used to limit the number of loading tasks running simultaneously. A Broker load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). Therefore, async_loading_load_task_pool_size should be greater than or equal to the async_pending_load_task_pool_size.set enable_profile=true to enable the session variable. Then submit the import job. After the import job is completed, you can view the profile of the import job in the Queries Tag on the FE's web page. You can also view the SHOW LOAD PROFILE help document to obtain more usage information.Scan bytes per broker scanner exceed limit:xxx.
Please refer to the Best Practices section of the document and modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency.failed to send batch or TabletWriter add batch with unknown id.
Appropriately modify query_timeout and streaming_load_rpc_max_alive_time_sec.
streaming_load_rpc_max_alive_time_sec: During the import process, Doris enables a Writer for each Tablet to receive and write data. This parameter specifies the Writer's waiting timeout. If the Writer does not receive any data within this period, it is automatically destroyed. When the system processing speed is slow, the Writer may fail to receive the next batch of data for an extended period, causing an import error: TabletWriter add batch with unknown id. In this case, you can appropriately increase this configuration. The default value is 600 seconds.LOAD_RUN_FAIL; msg:Invalid Column Name:xxx.
For data in PARQUET or ORC format, the column names in the file header must match those in the Doris table, for example:(tmp_c1,tmp_c2)SET(id=tmp_c2,name=tmp_c1)
Apakah halaman ini membantu?
Anda juga dapat Menghubungi Penjualan atau Mengirimkan Tiket untuk meminta bantuan.
masukan