log_group {source //Log source, generally the machine's IPfilename //log filenamelogs {time //Log time, unix timestamp, at microsecond leveluser_defined_log_kvs //User log fields}}
class SampleConsumer(ConsumerProcessorBase):last_check_time = 0def initialize(self, topic_id):self.topic_id = topic_iddef process(self, log_groups, offset_tracker):for log_group in log_groups:for log in log_group.logs:# Process single-line dataitem = dict()item['filename'] = log_group.filenameitem['source'] = log_group.sourceitem['time'] = log.timefor content in log.contents:item[content.key] = content.value# Subsequent data processing# put your business logic hereprint(json.dumps(item))# offset commitcurrent_time = time.time()if current_time - self.last_check_time > 3:try:self.last_check_time = current_timeoffset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()else:try:offset_tracker.save_offset(False)except Exception:import tracebacktraceback.print_exc()return None
class App:def __init__(self):self.shutdown_flag = False# access endpointself.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')# regionself.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')# secret idself.access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')# secret keyself.access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')# logset idself.logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')# topic idsself.topic_ids = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')# Pre-filtering conditions (Pre-filtering before consumption is achieved by configuring the query parameter; not configuring this parameter means consuming all logs in full)# Example of query: log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))# Effect achieved: Only logs with status greater than 400 and containing "pwd" in the cdb_message field are consumedself.query = 'Your filter conditions'# consumer group name,self.consumer_group = 'consumer-group-1'# consumer id, it is recommended that the number of consumers = the partition count of the log topic.self.consumer_name1 = "consumer-group-1-A"assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and ""logset_id cannot be empty")signal.signal(signal.SIGTERM, self.signal_handler)signal.signal(signal.SIGINT, self.signal_handler)def signal_handler(self, signum, frame):print(f"catch signal {signum},cleanup...")self.shutdown_flag = Truedef run(self):print("*** start to run consumer...")self.consume()# waiting for exit signalwhile not self.shutdown_flag:time.sleep(1)# shutdown consumerprint("*** stopping workers")self.consumer.shutdown()sys.exit(0)def consume(self):try:# consumer configoption1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,offset_start_time='begin', max_fetch_log_group_size=1048576,query=self.query)# init consumerself.consumer = ConsumerWorker(SampleConsumer, consumer_option=option1)# start consumerprint("*** start to consume data...")self.consumer.start()except Exception as e:import tracebacktraceback.print_exc()raise e
{"cdb_message":"password:99743036","log_level":"ERROR","status":"400","__SOURCE__":"127.0.0.1"}{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}
log_keep(op_and(op_gt(v("status"), 400), str_exist(v("cdb_message"), "pwd")))
{"cdb_message":"pwd:3qJ0VaPn","log_level":"INFO","status":"500","__SOURCE__":"127.0.0.1"}
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback