Scenarios
The TDMQ for CKafka (CKafka) connector supports transforming streaming data into searchable and analyzable structured storage, suitable for various scenarios such as real-time log monitoring, user behavior search, and IoT device status analysis. For example:
Business logs are synchronized to Elasticsearch Service (ES) via Kafka, and Ops monitoring is visualized via Kibana.
User clickstream or order data is written to ES in real time. Quick searches and queries under complex conditions are supported.
The connector provides near-real-time writing (data visible within seconds), automatic index management (dynamically adapting to JSON structures), and data preprocessing capabilities (JSON parsing and field pruning), and effectively isolates abnormal data through a dead letter queue mechanism, ensuring transmission reliability with a retry policy. It is widely applied to user learning behavior analytics on online education platforms or quick localization and troubleshooting of abnormal sensor data in smart hardware scenarios.
Constraints and Limitations
Only the 7.x version of ES is supported; the 8.x version is not yet supported. Stay tuned.
Prerequisites
This feature currently relies on ES. To use it, you need to activate the related product feature and prepare an ES cluster.
Creating an ES Connection
2. In the left sidebar, choose Connector > Connection List, select the target region, and click Create Connection.
3. After selecting the target region, set the connection type to Elasticsearch Service, click Next, and set connection information on the connection configuration page.
Connection Name: Enter a connection name to distinguish different ES connections.
Description: Optional. Enter a connection description, which must not exceed 128 characters.
ES Instance Cluster: Select a Tencent Cloud ES instance cluster.
Instance Username: Enter the username of the ES instance. The default username of the Tencent Cloud ES instance is elastic and cannot be changed.
Instance Password: Enter the password of the ES instance.
4. Click Next to initiate connection validation. Upon successful validation, the connection is created. You can view the created connection in the connection list.
Creating a Data Synchronization Task
1. In the left sidebar, choose Connector > Task List. Select the region and click Create Task.
2. Set the basic task information in the task creation pop-up window.
Task Name: Enter a task name to distinguish different data synchronization tasks. It can only contain letters, digits, underscores (_), hyphens (-), and periods (.).
Description: Optional. Enter a task description, which cannot exceed 128 characters.
Task Type: Select Data Distribution.
Data Target Type: Select Elasticsearch Service.
3. Click Next to configure data source information.
Configuring a Data Source
1. On the Data Source Configuration page, configure the data source information.
|
Topic Type | Only CKafka instance topic are supported. |
CKafka Instance | Select the pre-configured data source CKafka instance from the drop-down list. |
Source Topic | Select the pre-configured data source topic from the drop-down list. If an ACL policy is configured for the data source instance, ensure that you have read/write permissions on the selected source topic. |
Start Offset | Configure the topic offset to set the processing policy for historical messages during the dump. Three methods are supported: Start consumption from the latest offset: the maximum offset. Consumption starts from the latest data (skipping historical messages). Start consumption from the start offset: the minimum offset. Consumption starts from the earliest data (processing all historical messages). Start consumption from the specified time point: Consumption starts from a user-defined point in time. |
2. After confirming that the information is correct, click Next to configure data processing rules.
Configuring Data Processing Rules
1. On the Data Processing Rules page, click Preview Topic Messages in the source data section. The first message from the source topic will be selected for parsing.
Note
The messages to be parsed must meet the following conditions: the messages must be in JSON string format, and the source data must be in a single-layer JSON format. Nested JSON formats can be converted using data processing for basic message format conversion.
2. If you need to cleanse the source data, enable Processing Source Data and proceed to Step 3. If no data cleansing is needed and you only require data synchronization, you can skip the subsequent steps and proceed directly to configure the data target.
3. (Optional) The data processing rule configuration supports importing templates from a local computer. If you have prepared a rule template, import it directly. If not, proceed to Step 4 to configure data processing rules. After completing the configuration, you can also export and save the rules as a template for reuse in other tasks.
4. In the Raw Data section, select the data source. You can select Pulled from tource Topic or Custom. In this example, Custom is selected.
5. In the Parsing Mode section, select the corresponding data parsing mode and click OK to view the data parsing result. In this example, JSON is selected. Click the parsing result on the left to generate a structured preview on the right.
|
JSON | Parse data in standard JSON format, support nested fields, and output in key-value pairs. |
Separator | Parse unstructured text based on specified delimiters. Supported delimiters include Space, Tab, ,, ;, |, :, and Custom. |
Regex | It is suitable for extracting specific fields from long array-type messages. You can manually enter a regular expression or use the regular expression auto-generation feature. For more information, see Regular Expression Extraction. Note: When the input regular expression contains capture groups such as (?<name>expr) or (?P<name>expr), it is treated as a pattern string for matching. When a message successfully matches the pattern string, the capture group content is parsed. Otherwise, the entire input regular expression is treated as a capture group, extracting all matching content from the message. |
JSON object array - single-row output | Each object in the array has a consistent format. Only the first object is parsed, and the output is a single JSON object of map type. |
JSON object array - multi-ray output | Each object in the array has a consistent format. Only the first object is parsed, and the output is an array type. |
6. If Secondary Key-Value Parsing is enabled, the data in the value will be parsed again as key-value.
7. In the Data Processing section, set data processing rules; you can edit and delete fields, adjust timestamp formats, and add current system time fields. Click Process Value to add a processing chain for further processing of individual data entries.
|
Mapping | You can select an existing key, and the final output value is mapped from the specified key. |
JSONPATH | Parse multi-layer nested JSON data, starting with the $ symbol and using the . symbol to locate specific fields in multi-layer JSON. For more information, see JSONPath. |
System preset - Current time | You can select a system-preset value. DATE (timestamp) is supported. |
Custom | You can enter a custom value. |
8. Click Test to view the test results of data processing. At this point, you can add a processing chain based on your actual business requirements to reprocess the above data processing results.
9. In the Filter section, choose whether to enable a filter. If enabled, only data matching the filter rules will be output. Filter matching modes include prefix match, suffix match, contains match, except match, numeric match, and IP match. For more information, see Filter Rule Description. 10. In the Retain Source Topic Metadata section, choose whether to retain the source topic data.
11. In the Output Format section, set the data output content. The default is JSON. The ROW format is also supported. If the ROW format is selected, you need to select the output row content.
|
VALUE | Only output the values in the above test results, separated by delimiters. The delimiter between values is None by default. |
KEY&VALUE | Output the key and value in the above test results. Neither the delimiter between the key and value nor the delimiter between values can be None. |
12. In the Failed Message Handling section, set the rules for handling delivery-failed messages. Three methods are supported: Discard, Retain, and Put to dead letter queue.
|
Discard | It is suitable for production environments. When a task fails to run, the current failed message will be ignored. It is recommended to use the Retain mode for testing until no errors are detected, and then edit the task in the Discard mode for production. |
Retain | It is suitable for test environments. When a task fails to run, it will be terminated, no retries will be performed, and the failure reasons will be recorded in Event Center. |
Put to dead letter queue | You need to specify a topic for the dead letter queue. It is suitable for strict production environments. When a task fails to run, the failed messages along with metadata and failure reasons will be delivered to the specified CKafka topic. |
13. After configuring the data rules, you can directly click Export as Template at the top to reuse them in your subsequent data tasks, reducing the operating cost of repeating the configuration.
14. Click Next to configure the data target.
Configuring a Data Target
1. On the data configuration target page, configure a data target.
Source Data: Click to pull source topic data. If no data is available in the source topic, you can also customize the data.
Data Target: Select the pre-created target ES connection for data output.
Index Name: Enter an index name. All its letters must be lowercase. It supports JSONPath syntax.
Split Index Name by Date: Optional. After enabling this option, select a date format. The index written to ES is in the following format: %(index name)_%(date).
Handle Failed Message: Select how to handle failed messages. Supported solutions include Discard, Retain, and Put to CLS (you need to specify the target logset and log topic, and grant the permission to access Cloud Log Service (CLS)).
Retain: It is suitable for test environments. When a task fails to run, it will be terminated, no retries will be performed, and the failure reasons will be recorded in Event Center.
Discard: It is suitable for production environments. When a task fails to run, the current failed message will be ignored. It is recommended to use the Retain mode for testing until no errors are detected, and then edit the task in the Discard mode for production.
Ship to CLS: It is suitable for strict production environments. When a task fails to run, the failed messages along with metadata and failure reasons will be uploaded to the specified CLS topic.
Put to dead letter queue: It is suitable for strict production environments. When a task fails to run, the failed messages along with metadata and failure reasons will be delivered to the specified CKafka topic.
Data Source Type:
Index Time: You can specify a field in the source data as the index time, which is the message delivery time by default.
ES Document ID Field: You can specify the value of this field as the value of the ES document ID, which is topic+kafkaPartition+kafkaOffset by default.
Retain Non-JSON Data: If this option is enabled, KEY will be specified to assemble and deliver non-JSON data. If it is disabled, non-JSON data will be discarded.
KEY: When data in the source topic is not in JSON format, a key can be specified to assemble it in JSON format and deliver it to ES.
This option is only used for the connector to subscribe to the relational database and synchronize the data (creation, deletion, and updates) in the topic to ES. It identifies data creation, deletion, and updates in the database to ensure that the data of ES remains consistent with that of the source table.
Sync Mode: If you select field-by-field matching, you can customize the mapping relationship between message field names and target index fields. If you select default field matching, the mapping in the ES index will use the message key as the field name.
Target Index Type: You can create an index or select an index from existing ES indices.
Primary Key: You can specify the primary key of the database table as the value of the ES document ID.
Index Time: You can specify a field in the source data as the index time, which is the message delivery time by default.
2. Click Submit to complete task creation. On the Task List page, you can view the created data synchronization task. After the task is successfully created, the task will automatically start data synchronization according to the task settings, replicating data in real time.
Viewing the Data Synchronization Progress
1. On the Task List page, click the ID of the created task to go to the basic task information page.
2. At the top of the page, select the Synchronization Progress tab to view the progress and details of data synchronization.