tencent cloud

Tencent Cloud TCHouse-D

Product Introduction
Overview
Concepts
Cluster Architecture
Strengths
Scenarios
Purchase Guide
Billing Overview
Renewal Instructions
Overdue Policy
Refund Instructions
Configuration Adjustment Billing Instructions
Getting Started
Using Tencent Cloud TCHouse-D Through the Console
Using Tencent Cloud TCHouse-D Through a Client
Operation Guide
Cluster Operation
Monitoring and Alarm Configuration
Account Privilege Management
Data Management
Query Management
Modify Configurations
Node Management
Log Analysis
SQL Studio
Enabling Resource Isolation
Development Guide
Design of Data Table
Importing Data
Exporting Data
Basic Feature
Query Optimization
Ecological Expansion Feature
API Documentation
History
Introduction
API Category
Making API Requests
Cluster Operation APIs
Database and Table APIs
Cluster Information Viewing APIs
Hot-Cold Data Layering APIs
Database and Operation Audit APIs
User and Permission APIs
Resource Group Management APIs
Data Types
Error Codes
Cloud Ecosystem
Granting CAM Policies to Sub-accounts
Query Acceleration for Tencent Cloud DLC
Practical Tutorial
Basic Feature Usage
Advanced Features Usage
Resource Specification Selection and Optimization Suggestions
Naming Specifications and Limits to the Database and Data Table
Table Design and Data Import
Query Optimization
Suggested Usage to Avoid
Accessing TCHouse-D via JDBC over the Public Network
Performance Testing
TPC-H Performance Testing
SSB Performance Testing
TPC-DS Performance Testing
FAQs
Common Operational Issues
Common Errors
Contact Us
Glossary
Product Policy
Service Level Agreement
Privacy Policy
Data Processing And Security Agreement

Flink Connector (Real-time or Batch Data with Flink)

PDF
Modo Foco
Tamanho da Fonte
Última atualização: 2024-09-29 09:38:16
Note
This document applies to versions after flink-doris-connector 1.1.0.

Basic Introduction

Flink Doris Connector allows operations (reading, inserting, modifying, and deleting) on data in Doris storage through Flink, not just import. Since Flink is a unified stream-batch computing engine, both real-time incremental data and batch data can be imported into Doris via the Flink Doris Connector. Repo address: https://github.com/apache/doris-flink-connector It essentially maps the Doris table to DataStream or Table.
Note
Modification and deletion are only supported on the Unique Key model. -n Currently, deletion supports data import through Flink CDC for automatic deletion. If other data import methods are used, you'll need to delete data by yourself. For more information on how to use data deletion of Flink CDC, see the last section of this document.

Version Compatibility

Connector Version
Flink Version
Doris Version
Java Version
Scala Version
1.0.3
1.11+
0.15+
8
2.11,2.12
1.1.0
1.14+
1.0+
8
2.11,2.12
1.2.0
1.15+
1.0+
8
-

Usage

There are two main ways to read and write Doris data with Flink:
SQL
DataStream

Parameter Configuration

Flink Doris Connector Sink is internally implemented to write data to Doris through the Stream Load service. It also supports the configuration of Stream Load request parameters. For specific parameters, see the Stream Load manual. The configuration method is as follows:
SQL uses the WITH parameter for the sink.properties.configuration.
The usage of DataStream is DorisExecutionOptions.builder().setStreamLoadProp(Properties) for configuration

SQL

Source (Doris table as data source)
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password'
);
Sink (Doris table as import target table)
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);
Insert
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

Source
DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");

DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
.setDorisOptions(builder.build())
.setDorisReadOptions(DorisReadOptions.builder().build())
.setDeserializer(new SimpleListDeserializationSchema())
.build();

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
Sink String Data Stream
// enable checkpoint
env.enableCheckpointing(10000);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");


DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer()) //serialize according to string
.setDorisOptions(dorisBuilder.build());


//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);

source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\\t" + t.f1)
.sinkTo(builder.build());
RowData Data Stream
// enable checkpoint
env.enableCheckpointing(10000);

//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("password");

// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
.setStreamLoadProp(properties); //streamload params

//flink rowdata‘s schema
String[] fields = {"city", "longitude", "latitude"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE()};

builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(RowDataSerializer.builder() //serialize according to rowdata
.setFieldNames(fields)
.setType("json") //json format
.setFieldType(types).build())
.setDorisOptions(dorisBuilder.build());

//mock rowdata source
DataStream<RowData> source = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String value) throws Exception {
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, StringData.fromString("beijing"));
genericRowData.setField(1, 116.405419);
genericRowData.setField(2, 39.916927);
return genericRowData;
}
});

source.sinkTo(builder.build());

Configuration

General Configuration Items

Key
Default Value
Required
Comment
fenodes
--
Y
Doris FE HTTP Address
table.identifier
--
Y
Doris table name, e.g., db.tbl
username
--
Y
Username to access Doris
password
--
Y
Password to access Doris
doris.request.retries
3
N
Number of times of retrying to send requests to Doris
doris.request.connect.timeout.ms
30000
N
Connection timeout period when sending requests to Doris
doris.request.read.timeout.ms
30000
N
Reading timeout period when sending requests to Doris
doris.request.query.timeout.s
3600
N
Timeout period for querying Doris. The default value is 1 hour. -1 means there is no time limit.
doris.request.tablet.size
Integer. MAX_VALUE
N
Number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated to enhance the parallelism on the Flink side. However, it will also put more pressure on Doris.
doris.batch.size
1024
N
Maximum rows of data that can be read from BE at one time. Increasing this value can reduce the number of times of establishing connections between Flink and Doris, thereby reducing the extra time cost caused by network latency.
doris.exec.mem.limit
2147483648
N
Memory limit for a single query. The default value is 2GB. The unit is bytes.
doris.deserialize.arrow.async
FALSE
N
Whether to support asynchronous conversion of Arrow format to the RowBatch required for flink-doris-connector iteration
doris.deserialize.queue.size
64
N
Internal processing queue for asynchronously converting Arrow format. It takes effect when doris.deserialize.arrow.async is true.
doris.read.field
--
N
Column name list for reading Doris table, separated by commas
doris.filter.query
--
N
Expression for filtering the data to be read. This expression is passed through to Doris. Doris uses this expression to filter the data at the source end.
sink.label-prefix
--
Y
Prefix for labels used in stream load, which is required to be globally unique in 2pc scenarios to ensure Flink's EOS semantics.
sink.properties.*
--
N
Stream load parameters.
For example: 'sink.properties.column_separator' = ', ' Definition column separator, 'sink.properties.escape_delimiters' = 'true' special characters as separators,'\\x01' will be converted into binary 0x01
Importing in JSON Format
'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'
sink.enable-delete
TRUE
N
Whether to enable deletion. This option requires the batch deletion feature (enabled in Doris 0.15+ versions by default) is enabled for Doris table, only Unique model is supported.
sink.enable-2pc
TRUE
N
Whether to enable two-stage submission (2pc), defaulted as true, ensuring Exactly-Once semantics
Doris Type
Flink Type
NULL_TYPE
NULL
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
DATETIME
TIMESTAMP
DECIMAL
DECIMAL
CHAR
STRING
LARGEINT
STRING
VARCHAR
STRING
DECIMALV2
DECIMAL
TIME
DOUBLE
HLL
Unsupported datatype
CREATE TABLE cdc_mysql_source (
id int
,name VARCHAR
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);

-- Supporting synchronizing delete events (sink.enable-delete='true'), the batch deletion feature must be enabled for the Doris table
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true',
'sink.label-prefix' = 'doris_label'
);

insert into doris_sink select id,name from cdc_mysql_source;

Java Example

samples/doris-demo/fink-demo/ provides a Java version example for reference, click here.

Best Practice

Application Scenario

The most suitable scenario for using Flink Doris Connector is to synchronize data (from Mysql, Oracle, PostgreSQL, etc) to Doris in real-time/batch, use Flink to combine data in Doris with other data sources for analysis, and Flink Doris Connector can also be used.

Other

Flink Doris Connector mainly depends on Checkpoint for stream writing, so the interval of Checkpoint is the visible delay time of data.
To ensure the Exactly Once semantics of Flink, two-stage submission is enabled in Flink Doris Connector by default. And two-stage submission is enabled in verions after 1.1 by default. For 1.0, it can be enabled by modifying BE parameter, see Stream Load (local file) for more information.

FAQs

1. Writing Bitmap Type
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'test.bitmap_test',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)
2. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650] In Exactly-Once scenarios, Flink Job must start from the latest Checkpoint/Savepoint during restarting. Otherwise, the above error will be reported. When Exactly-Once is not required, the problem can be solved by disabling 2PC submission (sink.enable-2pc=false) or by changing to a different sink.label-prefix.

Ajuda e Suporte

Esta página foi útil?

comentários