tencent cloud

Using Kafka Data Subscription for Log Collection
Last updated:2025-11-17 09:35:25
Using Kafka Data Subscription for Log Collection
Last updated: 2025-11-17 09:35:25
Cloud Log Service (CLS) currently supports actively subscribing to logs produced by self-built Kafka or Tencent Cloud CKafka. This document will introduce how to integrate logs into the CLS using Kafka data subscription.

Prerequisites

An available self-built Kafka cluster or Tencent Cloud CKafka cluster.
Kafka version later than 0.10.2.0.
A logset and log topic have been created. For detailed instructions, see logset and log topic.

Operation Steps

Step 1: Logging In to the Console

1. Log in to the CLS console.
2. In the left sidebar, click Log Topic to enter the Log Topic Management page.

Step 2: Creating a Kafka Data Subscription Task

1. On the Log Topic Management page, find the target log topic and click its name to enter the Log Topic Detail page.
2. On the Log Topic Detail page, select the Collection Configuration tab and find the Kafka data subscription configuration.

3. Click Add to create a Kafka data subscription task.


Step 3: Configuring the Kafka Data Subscription Task

1. In the Configure cluster step, select the target Kafka type first. You can choose CKafka or Self-built Kafka.
2. Based on the selected Kafka type, configure the corresponding parameters as per the instructions below:
CKafka
Self-built Kafka
Parameter
Required
Description
CKafka instance
Yes
Select the target CKafka instance.
Kafka topics
Yes
Select one or more Kafka topics.
Consumer group
No
​​When left empty​​, a consumer group will be automatically created using the naming convention cls-${taskid}. ​​If specified​​, the designated consumer group will be used for consumption.
​​Note:​​
1. If left empty​​, ensure the Kafka cluster has permissions to auto-create consumer groups.
2. If specified​​, verify the designated consumer group is not actively used by other tasks to prevent data loss.
Start position
Yes
Earliest: Start consuming from the earliest offset.
Latest: Start consuming from the latest offset.
Note: The starting position can only be configured when the subscription task is created and the position cannot be modified afterward.
Parameter
Required
Description
Access mode
Yes
You can choose to access your self-built Kafka cluster via Private network or public network access.
Network service type
Yes
If the access method is via Private network, you need to specify the network service type of the target self-built Kafka cluster.
CVM
CLB
Cloud Connect Network (CCN) (currently in beta, submit a ticket if you need to use it).
Direct connect gateway (currently in beta, submit a ticket if you need to use it).
Note:
For the differences and usage of different network service types, see Self-built Kafka Private Network Access Configuration Instructions.
Network(VPC)
Yes
When the network service type is selected as CVM or CLB, you need to select the VPC instance where the CVM or CLB is located.
Service Address
Yes
Enter the public IP address or domain name of the target Kafka.
Note:
If the Kafka protocol is used to consume logs from other log topics across regions/accounts, use the target log topic's Cross-Account Log Sync via Kafka Data Subscription.
Private Domain Resolution
No
When Kafka brokers deployed on CVM communicate using internal domain names, you need to specify the CVM domain name and IP address for each broker here. For detailed configuration scenarios, see Configuration Instructions for Self-built Kafka Private Network Access.
Authentication
Yes
Whether authentication is required to access the target Kafka cluster.
Protocol
Yes
If the target Kafka cluster requires authentication to access, you need to select the authentication protocol type:
plaintext
sasl_plaintext
sasl_ssl
ssl
Authentication mechanism
Yes
If the target Kafka cluster requires authentication to access, and the protocol type is sasl_plaintext or sasl_ssl, you need to select the authentication mechanism:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
Username/Password
Yes
If the target Kafka cluster requires authentication to access, and the protocol type is sasl_plaintext or sasl_ssl, you need to enter the username and password required to access the target Kafka cluster.
Client SSL Authentication
Yes
If the access protocol type for the target Kafka cluster is sasl_ssl or ssl, and client CA certificates are required for access, you need to enable this configuration and choose an existing certificate or go to SSL Certificate Service to upload the CA certificate.
Server SSL Authentication
Yes
If the access protocol type for the target Kafka cluster is sasl_ssl or ssl, and server certificates are required for access, you need to enable this configuration and choose an existing certificate or go to SSL Certificate Service to upload the server certificate.
Kafka topics
Yes
Enter one or more Kafka topics. Separate multiple topics with commas.
Consumer group
No
If it is left empty, a consumer group will be automatically created with the naming convention cls-${taskid}. If it is specified, the designated consumer group will be used for consumption.
Notes:
If it is left empty, ensure that the Kafka cluster can automatically create a consumer group.
If it is specified, ensure that the designated consumer group is not being used by other tasks, as this may cause data loss.
Start position
Yes
Earliest: Start consuming from the earliest offset.
Latest: Start consuming from the latest offset.
Note: The starting position can only be configured when the subscription task is created and the position cannot be modified afterward.
3. After completing the cluster configuration, you can click Back.
4. After confirming that the cluster configuration is correct, click Next to proceed to the Subscription rule configuration step.
5. In the subscription rule configuration step, configure the following parameters:
Parameter
Required
Description
Configuration Name
Yes
The name of the Kafka data subscription configuration.
Data extraction mode
Yes
You can choose from three extraction modes: JSON, Single-line full-text log, and Single-line full regular expression. For more details, see Data Extraction Mode.
Log Sample
Yes
If the data extraction mode is set to single-line full regular expression, you need to manually enter or automatically obtain a log sample to validate the regular expression and extract key-value pairs.
Regular Expression
Yes
If the data extraction mode is set to single-line full regular expression, you need to manually enter or automatically generate a regular expression. The system will validate and extract key-value pairs based on the regular expression you provide. For detailed instructions on how to automatically generate a regular expression, see Automatically Generating Regular Expressions.
Log Extraction Result
Yes
If the data extraction mode is set to single-line full regular expression, you need to configure or modify the field names extracted based on the regular expression.
Manual Verification
No
If the data extraction mode is set to single-line full regular expression, you can optionally provide one or more log samples to validate the correctness of the regular expression.
Upload Parsing-Failed Logs
Yes
If the data extraction mode is set to JSON or single-line full regular expression, and if uploading parsing-failed logs is enabled, LogListener will upload the logs where parsing fails. If it is disabled, the failed logs will be discarded.
Key Name of Parsing-Failed Logs
Yes
If uploading parsing-failed logs is enabled, you can specify a field name as the Key, and the logs that fail to be parsed will be uploaded as the Value of the specified field.
Encoding format
Yes
Based on your logs, you can choose from the following two encoding formats:
UTF-8
GBK
Use default time
Yes
When it is enabled, the system will use the current system time or the Kafka message timestamp as the log timestamp. When it is disabled, the timestamp from the log's time field will be used.
Default Time Source
Yes
When Use default time is enabled, you can choose from the following two default events as the log timestamp:
Current system time
Kafka message timestamp
Time field
Yes
When Use default time is disabled, and the data extraction mode is JSON or regex, you can specify the field name in the log that represents the time. The value of this field will be used as the log's timestamp.
Time extraction regex
Yes
When Use default time is disabled, and the data extraction mode is single-line full-text, you can define the field that represents the time in the log using a regular expression.
Note: If the regular expression matches multiple fields, the first one will be used.
Example: If the original log is message with time 2022-08-08 14:20:20, you can set the time extraction regex as \\d\\d\\d\\d-\\d\\d-\\d\\d \\d\\d:\\d\\d:\\d\\d
Time field format
Yes
When Use default time is disabled and the time field in the log is confirmed, you need to further specify the time format to parse the value of the time field. For more details, see Configure Time Format.
Time zone of the time field
Yes
When Use default time is disabled and the time field and format in the log are confirmed, you need to choose between the following two time zone standards:
UTC (Coordinated Universal Time)
GMT (Greenwich Mean Time)
Time used when the parsing failed
Yes
When Use default time is disabled, if the time extraction regex or time field format parsing fails, users can choose between the following two default times as the log timestamp:
Current system time
Kafka message timestamp
Filter
No
The purpose of the filter is to add log collection filtering rules based on business needs, helping you filter out valuable log data. The following filtering rules are supported:
Equal to: Only collect logs with specified field values matching the specified characters. Exact or regular matching is supported.
Not equal to: Only collect logs whose specified field values do not match the specified characters. Exact or regular matching is supported.
Field exists: Only logs where the specified field exists are collected.
Field does not exist: Only logs in which the specified field does not exist are collected.
For example, if you want all log data with response_code of 400 or 500 in the original JSON format log content to be collected, then configure response_code at key, select equals as filtering rule, and configure 400|500 at value.
Note:
The relationship between multiple filter conditions is an and logic. If multiple filter conditions are configured for the same key name, the rules will be overwritten.
Kafka metadata
No
The following 4 types of Kafka-related metadata are supported for selection to be uploaded along with the logs:
kafka_topic
kafka_partition
kafka_offset
kafka_timestamp
Note:
If there are fields in the original log with the same name as the above metadata, they will be overwritten.
6. After completing the subscription rule configuration, you can click Back to preview the export results.
If you need to further process the collected CLS logs, such as structuring, masking, or filtering, before writing them into the log topic, you can click Data Processing at the bottom of the page, add data processing, and then configure the index.
Note:
For data processing-related operations, see Create Processing Task document, specifically the Preprocessing of Data tab.
For writing data processing scripts, see Overview of data processing functions, or Practical processing case.
Data processing will incur fees. For more details, see Billing Overview.
7. After confirming the preview is correct, click Next to proceed to the Index Configuration step.
8. On the Index Configuration page, set the following information.

Index Status: Confirm whether to enable the index.
Note:
To retrieve the logs, the index status should be enabled; otherwise, the logs cannot be retrieved.
Full-text index: The full-text index splits the entire log into multiple tokens for index construction. During retrieval, it uses keywords for retrieval (full-text retrieval). For example, you can retrieve all logs that contain the keyword error by using the term error.
Configuration Item
Feature Description
Full-Text Delimiter
A set of characters that split the field value into segments. Only English symbols are supported. The default separator on the console is @&? |#()='",;:<>[]{}/ \\n\\t\\r\\\\.
Case sensitive
Whether it is case-sensitive during retrieval. For example, if the log is Error and case-sensitive, it cannot be retrieved with error.
Allow Chinese Characters
Enable this feature when the log includes Chinese and needs to be retrieved. For example, if the log is "User log-in API timeout", without enabling this feature, the log cannot be retrieved by searching "Timeout". The log can only be retrieved by completely searching "User log-in API timeout". After this feature is enabled, the log can be retrieved by searching "Timeout".
Key-value index: The key-value index splits the raw log into multiple tokens based on fields (key:value) for index construction. During retrieval, it uses the key-value method for retrieval (key-value retrieval). For example, you can retrieve logs where the log level (level) is error and the time cost (timeCost) is greater than 1,000 ms by using the query level:error AND timeCost:>1000. Some logs also contain a special type of metadata field, and the index configuration for these fields is the same as for regular fields.
Configuration Item
Feature Description
Field Name
The field name. A single log topic key-value index can have up to 300 fields.
Only letters, digits, underscores, and -./@ are supported, and the field name cannot start with an underscore.
Field Type
The data types of the field include text, long and double.
The text type supports fuzzy retrieval using wildcards and does not support range comparison.
The long and double types support range comparison, but do not support fuzzy retrieval.
Delimiter
Character set for word segmentation of field values. Only English symbols are supported. The default word separator on the console is @&? |#()='",;:<>[]{}/ \\n\\t\\r\\\\.
Chinese Characters
This feature can be enabled when the field includes Chinese and you need to retrieve it. For example, the log is "message: User log-in API timeout", if the feature is not enabled, use message: "Timeout" could not retrieve the log, only using message: "User log-in API timeout" can retrieve the log. After enabling this feature, you can use message: "Timeout" to retrieve the log.
Statistics
If this parameter is enabled, you can use SQL to analyze this field. When the text type field is enabled for statistics, if the value is too long, only the first 32766 characters are involved in statistical calculations.
Enabling statistics will not incur additional fees. It is recommended that you enable it.
Case Sensitivity
Specifies whether the retrieval is case-sensitive. For example, if the log is level:Error and case sensitivity is enabled, retrieving with level:error will not work.
Note:
For more details on indexing, see Index Configuration.
9. After completing the index configuration, click Submit to finish creating the Kafka data subscription task.

Step 4: Viewing the Kafka Data Subscription Task

After completing the Kafka data subscription task, you can find all the created Kafka data subscription tasks in the Log Topic Details Page > Collection Configuration tab.


Step 5: Retrieving and Analyzing Logs

After completing the Kafka data subscription task, you can start using log retrieval and analysis, as well as advanced features like dashboard alarms.
Dashboard

Specifications and Limits

For specifications and limitations, see Kafka Data Subscription Specifications and Limits.

Best Practices

Appendix

Regular Expression Auto Generation

1. In the pop-up Auto-Generate Regular Expression modal, select the log content that needs key-value extraction based on the actual retrieval and analysis requirements. Then, enter the key name in the text box that appears and click Confirm. As shown below:

2. The system will automatically generate a regular expression for this part of the content, and the automatic extraction result will appear in the key-value table. As shown below:

3. Repeat Step 1 until all key-value pairs are extracted. As shown below:



4. Click OK, and the system will automatically generate a complete regular expression based on the extracted key-value pairs. As shown below:




Data Extraction Mode

Kafka Data Subscription provides multiple parsing methods, as shown in the table below:
JSON
Single-Line Full-Text
Single-Line Full Regex
Assume that one of your JSON log raw data is:
{"remote_ip":"10.135.46.111","time_local":"22/Jan/2019:19:19:34 +0800","body_sent":23,"responsetime":0.232,"upstreamtime":"0.232","upstreamhost":"unix:/tmp/php-cgi.sock","http_host":"127.0.0.1","method":"POST","url":"/event/dispatch","request":"POST /event/dispatch HTTP/1.1","xff":"-","referer":"http://127.0.0.1/my/course/4","agent":"Mozilla/5.0 (Windows NT 10.0; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0","response_code":"200"}
After being processed and structured by CLS, this log will become as follows:
agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0
body_sent: 23
http_host: 127.0.0.1
method: POST
referer: http://127.0.0.1/my/course/4
remote_ip: 10.135.46.111
request: POST /event/dispatch HTTP/1.1
response_code: 200
responsetime: 0.232
time_local: 22/Jan/2019:19:19:34 +0800
upstreamhost: unix:/tmp/php-cgi.sock
upstreamtime: 0.232
url: /event/dispatch
xff: -
A single-line full-text log represents that the content of a complete log contains only one line. If a single-line full-text log is collected, CLS will use the line break \\n as the end identifier of the log. For unified structured management, each log will have a default key-value __CONTENT__, but the log data itself will not be processed in a structured manner, nor will log fields be extracted. The time attribute of a log is determined by the time when the log is collected.
A sample raw data entry of a log is as follows:
Tue Jan 22 12:08:15 CST 2019 Installed: libjpeg-turbo-static-1.2.90-6.el7.x86_64
The data collected from CLS is:
__CONTENT__:Tue Jan 22 12:08:15 CST 2019 Installed: libjpeg-turbo-static-1.2.90-6.el7.x86_64
The single-line full regular expression format is usually used to process structured logs, which represents a log parsing pattern that extracts multiple key-value pairs from a complete log using regular expressions.
A sample raw data entry of a log is as follows:
10.135.46.111 - - [22/Jan/2019:19:19:30 +0800] "GET /my/course/1 HTTP/1.1" 127.0.0.1 200 782 9703 "http://127.0.0.1/course/explore?filter%5Btype%5D=all&filter%5Bprice%5D=all&filter%5BcurrentLevelId%5D=all&orderBy=studentNum" "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0" 0.354 0.354
The configured regular expression is as follows:
(\\S+)[^\\[]+(\\[[^:]+:\\d+:\\d+:\\d+\\s\\S+)\\s"(\\w+)\\s(\\S+)\\s([^"]+)"\\s(\\S+)\\s(\\d+)\\s(\\d+)\\s(\\d+)\\s"([^"]+)"\\s"([^"]+)"\\s+(\\S+)\\s(\\S+).*
The data collected from CLS is:
body_bytes_sent: 9703
http_host: 127.0.0.1
http_protocol: HTTP/1.1
http_referer: http://127.0.0.1/course/explore?filter%5Btype%5D=all&filter%5Bprice%5D=all&filter%5BcurrentLevelId%5D=all&orderBy=studentNum
http_user_agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0
remote_addr: 10.135.46.111
request_length: 782
request_method: GET
request_time: 0.354
request_url: /my/course/1
status: 200
time_local: [22/Jan/2019:19:19:30 +0800]
upstream_response_time: 0.354

Self-built Kafka Private Network Access Configuration Guide

CVM Type Access

1. If your Kafka broker nodes are all deployed on CVMs within the same VPC, you can use this method for access.
2. The broker node configuration on a single CVM is as follows:
For example, if the CVM node's IP address within the VPC is 10.0.0.2 and the broker port is 9092, other nodes can use the same configuration by simply replacing the IP address.
listener.security.protocol.map=CVM:PLAINTEXT
listeners=CVM://10.0.0.2:9092
advertised.listeners=CVM://10.0.0.2:9092
3. When configuring a subscription task in the CLS Console under Self-built Kafka > Private Network Access, select the CVM type for the network service type, choose the VPC corresponding to the CVM's VPC ID, and enter the private network service address as 10.0.0.2:9092.

CLB Type Access

1. If your Kafka broker nodes are all deployed on CVM/TKE nodes within the same VPC, you can use this method for access.
2. You need to first create a corresponding CLB instance for each broker on a one-to-one basis and configure a TCP listener. It is recommended to use the same port as the Kafka broker's port for easier management.

image


3. The broker node configuration on a single CVM/TKE is as follows: For example, if the CVM/TKE node's IP address within the VPC is 10.0.0.2, the corresponding CLB address for the node is 10.0.0.12, and the broker port is 29092.
listener.security.protocol.map=CLB:PLAINTEXT
listeners=CLB://10.0.0.2:29092
advertised.listeners=CLB://10.0.0.12:29092
Other nodes can use the same configuration, simply replacing the IP address accordingly.
4. When configuring a subscription task in the CLS Console under Self-built Kafka > Private Network Access, select the CLB type for the network service type, choose the VPC corresponding to the CLB's VPC ID, and enter the private network service address as 10.0.0.12:29092.
5. In the TKE scenario, you may use a private network domain name as the listening address in the broker configuration. In this case, you will need to use private network domain name resolution (DNS) capabilities for processing. For more details, see Private network domain name resolution.

Private Network Domain Name Resolution

1. In some scenarios, such as the TKE scenario, where the node IP address is not fixed, you may want to use a private network domain name to access Kafka. The configuration is as follows:
listener.security.protocol.map=DOMAIN:PLAINTEXT
listeners=DOMAIN://10.0.0.2:9092
advertised.listeners=DOMAIN://broker1.cls.tencent.com:9092
2. To facilitate direct access to CLS in such scenarios, CLS provides a private network DNS feature for domain name mapping.
3. For example, if CLB is used to associate with the backend RS and the CLB address is 10.0.0.12, you will need to add a private network DNS entry in the Self-built Kafka Subscription Task Configuration, like this:
Domain name: broker1.cls.tencent.com
IP: 10.0.0.12
4. If multiple broker nodes use this access policy, private network DNS should be added for each of them.
5. After configuring the private network DNS, in the Self-built Kafka Subscription Task Configuration, you only need to enter the private network service address as broker1.cls.tencent.com:9092.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback