tencent cloud

Tencent Cloud TCHouse-D

Broker Load (HDFS Data)

Download
Mode fokus
Ukuran font
Terakhir diperbarui: 2026-05-12 21:40:27
Broker load is an asynchronous import method. The supported data sources depend on the data sources supported by the Broker process. There are generally Brokers that support the community version of HDFS and Brokers that support S3 protocol Cloud Object Storage. This document explains how to use Broker load to import HDFS data.
Since the data in the Doris table is ordered, broker load needs to use the resources of the Doris cluster to sort the data when importing it. Compared with Spark load to complete the migration of massive historical data, it occupies more resources of Doris cluster. Therefore, this method is mostly used when users do not have computing resources like Spark. If there are Spark computing resources, it is recommended to use Spark load.
Users need to create Broker load import through MySQL protocol, and check the import results through the import command.

Applicable scenario

The source data is in the storage system that Broker can access, such as HDFS.
The data volume is at the level of tens to hundreds of GB.

Basic Principles

After the user submits the import task, the FE will generate the corresponding plan and distribute the plan to multiple BEs for execution, according to the current number of BEs and the size of the file. Each BE executes a part of the import data. During the execution, the BE will pull data from Broker, transform the data, and import the data into the system. After all BEs have completed the import, the FE will finally decide whether the import is successful.
+
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+


Start Import

Let's look at the use of Broker Load through a few practical scenario examples.

Importing Data from Hive Partition Table

1. Create a Hive table.
##The data format is: default, partition field is: day
CREATE TABLE `ods_demo_detail`(
`id` string,
`store_id` string,
`company_id` string,
`tower_id` string,
`commodity_id` string,
`commodity_name` string,
`commodity_price` double,
`member_price` double,
`cost_price` double,
`unit` string,
`quantity` double,
`actual_price` double
)
PARTITIONED BY (day string)
row format delimited fields terminated by ','
lines terminated by '\\n'
Use Hive's load command to import your data into the Hive table:
load data local inpath '/opt/custorm' into table ods_demo_detail;
2. Create a Doris table. For the specific table creation syntax, see CREATE TABLE.
CREATE TABLE `doris_ods_test_detail` (
`rq` date NULL,
`id` varchar(32) NOT NULL,
`store_id` varchar(32) NULL,
`company_id` varchar(32) NULL,
`tower_id` varchar(32) NULL,
`commodity_id` varchar(32) NULL,
`commodity_name` varchar(500) NULL,
`commodity_price` decimal(10, 2) NULL,
`member_price` decimal(10, 2) NULL,
`cost_price` decimal(10, 2) NULL,
`unit` varchar(50) NULL,
`quantity` int(11) NULL,
`actual_price` decimal(10, 2) NULL
) ENGINE=OLAP
UNIQUE KEY(`rq`, `id`, `store_id`)
PARTITION BY RANGE(`rq`)
(
PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.buckets" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
3. Start importing data. For the specific syntax, refer to Broker Load.
LOAD LABEL broker_load_2022_03_23
(
DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
INTO TABLE doris_ods_test_detail
COLUMNS TERMINATED BY ","
(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
COLUMNS FROM PATH AS (`day`)
SET
(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
)
WITH BROKER "broker_name_1"
(
"username" = "hdfs",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
Note:
192.168.20.123:8020 is the IP address and Port of the active namenode for the HDFS cluster used by the Hive table.

Importing Hive Partitioned Tables (ORC Format)

1. Creating a Hive Partition Table in ORC format.
#Data format: ORC partition: day
CREATE TABLE `ods_demo_orc_detail`(
`id` string,
`store_id` string,
`company_id` string,
`tower_id` string,
`commodity_id` string,
`commodity_name` string,
`commodity_price` double,
`member_price` double,
`cost_price` double,
`unit` string,
`quantity` double,
`actual_price` double
)
PARTITIONED BY (day string)
row format delimited fields terminated by ','
lines terminated by '\\n'
STORED AS ORC
2. Create a Doris table, the create table statement is the same as the above Doris create table statement.
3. Using Broker Load to import data.
LOAD LABEL dish_2022_03_23
(
DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
INTO TABLE doris_ods_test_detail
COLUMNS TERMINATED BY ","
FORMAT AS "orc"
(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
COLUMNS FROM PATH AS (`day`)
SET
(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
)
WITH BROKER "broker_name_1"
(
"username" = "hdfs",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
Note:
FORMAT AS "orc": Specifies the format of the data to be imported.
SET: Defines the field mapping relationship between the Hive table and the Doris table and some operations to convert the fields.

HDFS File System Data Import

Let's continue to take the Doris table created above as an example to demonstrate importing data from HDFS through Broker Load. The statement to import job is as follows:
LOAD LABEL demo.label_20220402
(
DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")
INTO TABLE `ods_dish_detail_test`
COLUMNS TERMINATED BY "\\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
)
with HDFS (
"fs.defaultFS"="hdfs://10.220.147.151:8020",
"hdfs_user"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);
For the specific parameters, see Broker and Broker Load documents.

Viewing the Import Status

We can use the following command to view the status information of the above import task. For the specific syntax for viewing the import status, see SHOW LOAD.
mysql> show load order by createtime desc limit 1\\G;
*************************** 1. row ***************************
JobId: 41326624
Label: broker_load_2022_03_23
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2022-04-01 18:59:06
EtlStartTime: 2022-04-01 18:59:11
EtlFinishTime: 2022-04-01 18:59:11
LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
URL: NULL
JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
1 row in set (0.01 sec)

Canceling Import

A Broker load job can be manually canceled by a user when its status is not CANCELLED or FINISHED. To cancel it, you must specify the Label of the import job to be canceled. You can view the syntax of the cancel import command by executing CANCEL LOAD.
For example: Cancel the import job with the label broker_load_2022_03_23 on database demo:
CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Related System Configuration

Broker parameters

The Broker load needs to access remote storage with the Broker process. Different Brokers require different parameters. For details, see Broker.

FE configuration

The following configurations are system-level configurations for Broker load, i.e., those that apply to all Broker load import tasks. They can be adjusted by changing the fe.conf file.
min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency
The first two configurations limit the smallest and largest amount of data processed by a single BE node. The third configuration limits the maximum concurrency count for an import job. The minimum processed data volume, the maximum concurrency count, the size of the source file, and the number of BE nodes in the current cluster collectively determine the concurrency count for this import.
The concurrency count for this import = Math.min(Source file size / Minimum processing volume, Maximum concurrency, number of Current BE node)
The amount of data processed by a single BE for this import = Source file size / The concurrency count of this import
Typically, the maximum amount of data supported by a single import job is max_bytes_per_broker_scanner * BE node count. If you need to import larger amounts of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately. Default configuration:
Parameter: min_bytes_per_broker_scanner. Default value: 64MB. Unit: bytes.
Parameter: max_broker_concurrency. Default value: 10.
Parameter: max_bytes_per_broker_scanner. Default value: 3GB. Unit: bytes.

Best Practice

Application Scenario

The most suitable scenario for using Broker load is when the original data is in the file system (HDFS, BOS, AFS). Moreover, as Broker load is the only asynchronous import method in a single import, it may be considered for use with large file imports where asynchronous access is required.

Data Volume

This discussion focuses on the scenario of a single BE. If there are multiple BEs in the user's cluster, the volume in the following title should be multiplied by the number of BEs. For instance, if there are 3 BE nodes, the volume under (including) 3G should be multiplied by 3, i.e., under (including) 9G.
Under (including) 3 GB: The user can directly submit a Broker load import request.
Over 3 GB: Since the maximum processing volume of a single import BE is 3 GB, files to be imported larger than 3 GB would need to adjust the import parameters of the Broker load to realize importation of larger files.
1.1 Modify the maximum scan volume of a single BE and the maximum concurrency, based on the number of current BE nodes and the size of the original file.
Edit the configuration in fe.conf
max_broker_concurrency = Number of BEs
Data volume processed by a single BE for the current import task = Original file size / max_broker_concurrency
max_bytes_per_broker_scanner >= Data volume processed by a single BE for the current import task

For example, a 100G file, the number of BEs in the cluster is 10
max_broker_concurrency = 10
max_bytes_per_broker_scanner >= 10G = 100G / 10
After modification, all of the BEs will handle the import task concurrently, and each BE will handle a portion of the original file.
Note
The above two configurations in FE are system configurations, which means that their modifications apply to all Broker load tasks.
1.2 Set the timeout time for the current import task upon creation.
Data volume processed by a single BE for the current import task / User's slowest import speed for Doris cluster (MB/s) >= Timeout for current import task >= Data volume processed by a single BE for current import task / 10M/s

For example, a 100G file, the number of BEs in the cluster is 10
timeout >= 1000s = 10G / 10M/s
1.3 When the timeout calculated in the second step exceeds the system's default maximum import timeout of 4 hours. In this case, it is not recommended to simply increase the maximum import timeout to solve the problem. If a single import is expected to take longer than the default 4-hour maximum timeout, the best approach is to divide the file to be imported and perform the import in multiple batches. The main reason is: if a single import exceeds 4 hours, the time cost of retrying after an import fails is very high. You can use the following formula to calculate the expected maximum import file volume of data for a Doris cluster:
Expected maximum import file volume = 14400s * 10M/s * Number of BEs
For example: The number of BEs in the cluster is 10
Expected maximum import file volume = 14400s * 10M/s * 10 = 1440000M ≈ 1440G

Note: Generally, the user's environment may not reach a speed of 10M/s, so it's recommended to split files larger than 500G and then import them.

Job Scheduling

The system restricts the number of Broker load jobs running in a cluster to prevent running too many load jobs simultaneously.
Firstly, the configuration parameter of FE: desired_max_waiting_jobs restricts the number of Broker load jobs that do not start or are running (Job status is PENDING or LOADING) within a cluster. Default value: 100. If this threshold is exceeded, the newly submitted job will be directly rejected.
A Broker load job is divided into a pending task phase and a loading task phase. The pending task is responsible for obtaining information about the files to be imported, while the loading task is sent to the BE to execute the specific import job.
The FE configuration parameter async_pending_load_task_pool_size is used to limit the number of concurrently running pending tasks. This also effectively controls the number of actual running import jobs. This parameter defaults to 10. In other words, if a user submits 100 Load jobs, only 10 jobs will go to the LOADING state and start execution at the same time, while the other jobs remain in the PENDING waiting state.
FE's configuration parameter async_loading_load_task_pool_size is used to limit the number of loading tasks running simultaneously. A Broker load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). Therefore, async_loading_load_task_pool_size should be greater than or equal to the async_pending_load_task_pool_size.

Performance Analysis

Before the execution of a LOAD job, you can execute set enable_profile=true to enable the session variable. Then submit the import job. After the import job is completed, you can view the profile of the import job in the Queries Tag on the FE's web page. You can also view the SHOW LOAD PROFILE help document to obtain more usage information.
This Profile can help analyze the running status of import jobs. Currently, the Profile can be viewed only after a job is successfully executed.

FAQs

Import error: Scan bytes per broker scanner exceed limit:xxx. Please refer to the Best Practices section of the document and modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency.
Import error: failed to send batch or TabletWriter add batch with unknown id. Appropriately modify query_timeout and streaming_load_rpc_max_alive_time_sec. streaming_load_rpc_max_alive_time_sec: During the import process, Doris enables a Writer for each Tablet to receive and write data. This parameter specifies the Writer's waiting timeout. If the Writer does not receive any data within this period, it is automatically destroyed. When the system processing speed is slow, the Writer may fail to receive the next batch of data for an extended period, causing an import error: TabletWriter add batch with unknown id. In this case, you can appropriately increase this configuration. The default value is 600 seconds.
Import error: LOAD_RUN_FAIL; msg:Invalid Column Name:xxx. For data in PARQUET or ORC format, the column names in the file header must match those in the Doris table, for example:
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
This means obtaining the columns named (tmp_c1, tmp_c2) in the parquet or orc file and mapping them to the (id, name) columns in the Doris table. If no set clause is specified, the columns in the column clause are used for the mapping.
Note:
If you use orc files directly generated by certain Hive versions, the headers in the orc files are not Hive metadata but (_col0, _col1, _col2, ...). This may cause an Invalid Column Name error, requiring the use of a set clause for mapping.

Bantuan dan Dukungan

Apakah halaman ini membantu?

masukan