tencent cloud

Cloud Log Service

Consumption Demo - Consumption Pre-filtering

PDF
Focus Mode
Font Size
Last updated: 2026-04-22 19:02:53
This operation guide walks you through how to use Python SDK to implement pre-filtering and consumption of log data.

Preparing the Environment

SDK complete sample: continuous_sample_consumer.
Python version: Python 3.5 and above are recommended.

Core Logic Description

Data Structure (LogGroup)

The consumer processes the consumed data, and user log data is saved in the log_group struct.
log_group {
source //Log source, generally the machine's IP
filename //log filename
logs {
time //Log time, unix timestamp, at microsecond level
user_defined_log_kvs //User log fields
}
}

Consumer method implementation (SampleConsumer)

By extending the ConsumerProcessorBase class, define the specific log processing logic.
class SampleConsumer(ConsumerProcessorBase):
last_check_time = 0

def initialize(self, topic_id):
self.topic_id = topic_id

def process(self, log_groups, offset_tracker):
for log_group in log_groups:
for log in log_group.logs:
# Process single-line data
item = dict()
item['filename'] = log_group.filename
item['source'] = log_group.source
item['time'] = log.time
for content in log.contents:
item[content.key] = content.value

# Subsequent data processing
# put your business logic here
print(json.dumps(item))

# offset commit
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
offset_tracker.save_offset(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
offset_tracker.save_offset(False)
except Exception:
import traceback
traceback.print_exc()

return None

Consumption task starts

Configuration key points:
Pre-filtering configuration: Implemented by setting the query parameter. If not configured, it defaults to full consumption.
Concurrency recommendation: To ensure consumption performance, the number of consumers should match the number of partitions (Partition) in the log topic.
In the Demo, there is only one consumer. It is recommended that the number of consumers matches the number of partitions in the log topic.
class App:
def __init__(self):
self.shutdown_flag = False
# access endpoint
self.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
# region
self.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')
# secret id
self.access_key_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
# secret key
self.access_key = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
# logset id
self.logset_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
# topic ids
self.topic_ids = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')
# Pre-filtering conditions (Pre-filtering before consumption is achieved by configuring the query parameter; not configuring this parameter means consuming all logs in full)
# Example of query: log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))
# Effect achieved: Only logs with status greater than 400 and containing "pwd" in the cdb_message field are consumed
self.query = 'Your filter conditions'
# consumer group name,
self.consumer_group = 'consumer-group-1'
# consumer id, it is recommended that the number of consumers = the partition count of the log topic.
self.consumer_name1 = "consumer-group-1-A"
assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and "
"logset_id cannot be empty")
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)

def signal_handler(self, signum, frame):
print(f"catch signal {signum},cleanup...")
self.shutdown_flag = True

def run(self):
print("*** start to run consumer...")
self.consume()
# waiting for exit signal
while not self.shutdown_flag:
time.sleep(1)
# shutdown consumer
print("*** stopping workers")
self.consumer.shutdown()
sys.exit(0)

def consume(self):
try:
# consumer config
option1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,
self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
offset_start_time='begin', max_fetch_log_group_size=1048576,query=self.query)
# init consumer
self.consumer = ConsumerWorker(
SampleConsumer, consumer_option=option1)

# start consumer
print("*** start to consume data...")
self.consumer.start()
except Exception as e:
import traceback
traceback.print_exc()
raise e


Example of pre-filtering effect test

When the query parameter is configured, logs can be filtered before being transmitted from the server-side to the client, significantly reducing bandwidth consumption.
Raw log
{"cdb_message":"password:99743036","log_level":"ERROR","status":"400","__SOURCE__":"127.0.0.1"}
{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}

Pre-filtering Conditions (Query)
log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))
Final Consumed Results
Only the second log that meets the criteria will be downloaded locally for process handling.

{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}



Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback