tencent cloud

Customized Consumption
Last updated: 2025-12-03 18:30:49
Customized Consumption
Last updated: 2025-12-03 18:30:49

Prerequisites

1. Cloud Log Service is activated. Create a log set and log topic, and you have successfully collected log data.
2. Sub-accounts/Collaborators need root account authorization. For authorization steps, see CAM-Based Permission Management. For copying authorization policy, see CLS Access Policy Templates.

Consumption Process within a Consumer Group

When consuming data within a consumer group, the server manages the consumption tasks for all consumers within the group. It automatically balances these tasks based on the correlation between the number of topic partitions and the number of consumers. Moreover, it records the consumption progress for each partition in the topic to guarantee that different consumers can consume data without any duplication. The detailed process of consumption within a consumer group proceeds as follows:
1. Create a consumer group.
2. Every consumer periodically sends heartbeats to the server.
3. The consumer group automatically assigns topic partitions to consumers according to the load balancing situation of the topic partitions.
4. Consumers retrieve the partition offsets and consume the data according to the list of allocated partitions.
5. Consumers periodically update their consumption progress for each partition to the consumer group, facilitating the next round of task allocation by the group.
6. Repeat steps 2 through 6 until consumption is completed.

Consumption Balancing

The consumer group will dynamically adjust the consumption tasks of each consumer according to the number of active consumers and topic partitions to ensure balanced consumption. At the same time, consumers can save the consumption progress in each topic partition to ensure that they can continue to consume data after fault recovery and avoid repeated consumption.

Example 1: Topic Partition Change

For example, a log topic has two consumers. Consumer A consumes data in partitions 1 and 2, and consumer B in partitions 3 and 4. After partition 5 is added through partition splitting, the consumer group will automatically allocate partition 5 to consumer B for consumption, as shown in the figure below:




Example 2: Consumer Change

For example, a log topic has two consumers. Consumer A consumes data in partitions 1, 2, and 3, and consumer B in partitions 4, 5, and 6. To ensure that the consumption speed is equal to the generation speed, consumer C is added. The consumer group will reallocate partitions. Then, partitions 3 and 6 will be allocated to consumer C for consumption, as shown in the figure below:




Consumption Demo (Python)

For the complete Demo, see tencentcloud-cls-sdk-python, It is recommended to use Python version 3.5 or above for data consumption. The usage and instructions of the Demo are as follows:
1. Install SDK. For details, see tencentcloud-cls-sdk-python.
pip install git+https://github.com/TencentCloud/tencentcloud-cls-sdk-python.git
2. Process the data to be consumed by the consumer and save the user log data in the log_group struct. The log_group struct is as follows:
log_group {
source //Log source, which is usually the machine's IP address.
filename //Log file name
logs {
time //Log time, which is a Unix timestamp in microseconds.
user_defined_log_kvs //User log fields
}
}
The implementation by using the SampleConsumer method is as follows:
class SampleConsumer(ConsumerProcessorBase):
last_check_time = 0

def initialize(self, topic_id):
self.topic_id = topic_id

def process(self, log_groups, offset_tracker):
for log_group in log_groups:
for log in log_group.logs:
# Process a single row of data.
item = dict()
item['filename'] = log_group.filename
item['source'] = log_group.source
item['time'] = log.time
for content in log.contents:
item[content.key] = content.value

# Subsequent data processing
# put your business logic here
print(json.dumps(item))

# offset commit
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
offset_tracker.save_offset(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
offset_tracker.save_offset(False)
except Exception:
import traceback
traceback.print_exc()

return None
3. Create a consumer and start the consumer thread. The consumer then consumes data from the specified topic.
Parameter
Description
Default Value
Value Range
endpoint
Request Domain, domain name of the API for Log Upload Tag page.
-
Supported regions: ALL
access_key_id
For your Secret_id, go to CAM.
-
-
access_key
For your Secret_key, go to CAM.
-
-
region
Topic's region. For example, ap-beijing, ap-guangzhou, ap-shanghai. For more details, see Regions and Access Domains.
-
Supported regions: ALL
logset_id
Logset ID. Only one logset is supported.
-
-
topic_ids
Log topic ID. For multiple topics, use , to separate.
-
-
consumer_group_name
Consumer Group Name
-
-
internal
Private network: TRUE
Public network: FALSE
Note:
For private network/public network read traffic cost, see Product Pricing.
FALSE
TRUE/FALSE
consumer_name
Consumer name. Within the same consumer group, consumer names must be unique.
-
A string consisting of 0-9, aA-zZ, '-', '_', '.'.
heartbeat_interval
The interval of heartbeats. If consumers fail to report a heartbeat for two intervals, they will be considered offline.
20
0-30 minutes
data_fetch_interval
The interval of consumer data pulling. Cannot be less than 1 second.
2
-
offset_start_time
The start time for data pulling. The string type of UNIX Timestamp , with second-level precision. For example, 1711607794. It can also be directly configured as "begin" and "end".
begin: The earliest data within the log topic lifetime.
end: The latest data within the log topic lifetime.
"end"
"begin"/"end"/UNIX Timestamp
max_fetch_log_group_size
The data size for a consumer in a single pulling. Defaults to 2 M and up to 10 M.
2097152
2M - 10M
offset_end_time
The end time for data pulling. Supports a string-type UNIX Timestamp , with second-level precision. For example, 1711607794. Not filling this field represents continuous pulling.
-
-
Note:
Start time: If you consumed from a specified time point (e.g., offset_start_time=1711607794) once and need to start consuming from this time point again, modify the consumer group name.
In the following Demo, there is only one consumer. We recommend keeping the consumer count consistent with the log topic partition count.
class App:
def __init__(self):
self.shutdown_flag = False
# access endpoint
self.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
# region
self.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')
# secret id
self.access_key_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
# secret key
self.access_key = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
# logset id
self.logset_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
# topic ids
self.topic_ids = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')
# consumer group name,
self.consumer_group = 'consumer-group-1'
# consumer id, we recommend setting the consumer count equal to the log topic partition count.
self.consumer_name1 = "consumer-group-1-A"
assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and "
"logset_id cannot be empty")
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)

def signal_handler(self, signum, frame):
print(f"catch signal {signum},cleanup...")
self.shutdown_flag = True

def run(self):
print("*** start to run consumer...")
self.consume()
# waiting for exit signal
while not self.shutdown_flag:
time.sleep(1)
# shutdown consumer
print("*** stopping workers")
self.consumer.shutdown()
sys.exit(0)

def consume(self):
try:
# consumer config
option1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,
self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
offset_start_time='begin', max_fetch_log_group_size=1048576)
# init consumer
self.consumer = ConsumerWorker(
SampleConsumer, consumer_option=option1)

# start consumer
print("*** start to consume data...")
self.consumer.start()
except Exception as e:
import traceback
traceback.print_exc()
raise e

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback