Users can subscribe to message data in Kafka by submitting a routine import job for near real-time data synchronization.
Doris itself guarantees the subscription of messages from Kafka without loss or duplication, which is the Exactly-Once consumption semantics.
Subscribing of Kafka Messages
Subscription of Kafka messages uses the Routine Load feature in Doris.
Users need to create a routine import job first. The job will send a series of tasks continuously through routine scheduling, and each task will consume a certain number of messages in Kafka.
Please note the following usage restrictions:
1. Supports unauthenticated Kafka access and Kafka clusters certified through SSL.
2. The supported message formats are as follows:
The CSV text format is used. Each message occupies a single line, and the line end does not contain a newline character.
JSON format.
3. Only supports Kafka 0.10.0.0 (inclusive) and above versions.
Accessing SSL-authenticated Kafka Cluster
The Routine Load feature supports both unauthenticated Kafka clusters and SSL-certificated Kafka clusters.
To access an SSL-certificated Kafka cluster, you must provide the certificate file (ca.pem) for authenticating the Kafka Broker's public key. If client authentication is also enabled for the Kafka cluster, you must additionally provide the client's public key (client.pem), private key file (client.key), and the key password. These required files must first be uploaded to Palo using the CREATE FILE command, and the catalog name must be Kafka. For detailed help on the CREATE FILE command, refer to the CREATE FILE command manual. An example is provided below: Uploading Files
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
Once the upload is complete, you can view at the uploaded files using the SHOW FILES command. Create Routine Import Job
For the specific command of creating a routine import task, please see the ROUTINE LOAD command manual. Here is an example: 1. Access the unauthenticated Kafka cluster.
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ","
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
max_batch_interval/max_batch_rows/max_batch_size is used to control the runtime cycle of a subtask. The runtime cycle of a subtask is determined by the longest running time, the most consumed rows and the maximum data volume consumed.
2. Access the SSL-certificated Kafka cluster.
CREATE ROUTINE LOAD demo.my_first_routine_load_job ON test_1
COLUMNS TERMINATED BY ",",
PROPERTIES
(
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
)
FROM KAFKA
(
"kafka_broker_list"= "broker1:9091,broker2:9091",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
);
Viewing Import Job Status
For the specific command and example for checking the status of job, please see the SHOW ROUTINE LOAD command document. For the specific command and example of viewing the status of a task for a job, please see the SHOW ROUTINE LOAD TASK command document. You can only peek running tasks. You cannot peek tasks that have ended or have not started.
Modify Job properties
Users can modify some properties of already created jobs. For specific instructions, please see the ALTER ROUTINE LOAD command manual. Job Control
Users can control the stopping, pausing, and restarting of jobs using the three commands: STOP, PAUSE, and RESUME.
More help
For more detailed syntax and best practices about ROUTINE LOAD, please see the ROUTINE LOAD command manual.