产品动态
公告


pip install git+https://github.com/TencentCloud/tencentcloud-cls-sdk-python.git
log_group {source //日志来源,一般为机器的 IPfilename //日志文件名logs {time //日志时间,unix 时间戳,微秒级别user_defined_log_kvs //用户日志字段}}
class SampleConsumer(ConsumerProcessorBase):last_check_time = 0def initialize(self, topic_id):self.topic_id = topic_iddef process(self, log_groups, offset_tracker):for log_group in log_groups:for log in log_group.logs:# 处理单行数据item = dict()item['filename'] = log_group.filenameitem['source'] = log_group.sourceitem['time'] = log.timefor content in log.contents:item[content.key] = content.value# Subsequent data processing# put your business logic hereprint(json.dumps(item))# offset commitcurrent_time = time.time()if current_time - self.last_check_time > 3:try:self.last_check_time = current_timeoffset_tracker.save_offset(True)except Exception:import tracebacktraceback.print_exc()else:try:offset_tracker.save_offset(False)except Exception:import tracebacktraceback.print_exc()return None
配置参数 | 说明 | 默认值 | 取值范围 |
endpoint | - | 支持地域:ALL | |
access_key_id | - | - | |
access_key | - | - | |
region | - | 支持地域:ALL | |
logset_id | 日志集 ID,仅支持一个日志集。 | - | - |
topic_ids | 日志主题 ID,多个主题请使用','隔开。 | - | - |
consumer_group_name | 消费者组名称。 | - | - |
internal | 内网:TRUE 公网:FALSE 说明: | FALSE | TRUE/FALSE |
consumer_name | 消费者名称。同一个消费者组内,消费者名称不可重复。 | - | 0-9、aA-zZ、 '-'、'_'、'.'组成的字符串 |
heartbeat_interval | 消费者心跳上报间隔,2个间隔没有上报心跳,会被认为是消费者下线。 | 20 | 0-30分钟 |
data_fetch_interval | 消费者拉取数据间隔,不小于1秒。 | 2 | - |
offset_start_time | 拉取数据的开始时间,字符串类型的 UNIX 时间戳,精度为秒,例如 "1711607794",也可以直接可配置为"begin"、"end"。 begin:日志主题生命周期内的最早数据 end:日志主题生命周期内的最新数据 | "end" | "begin"/"end"/UNIX 时间戳 |
max_fetch_log_group_size | 消费者单次拉取数据大小,默认2M,最大10M。 | 2097152 | 2M - 10M |
offset_end_time | 拉取数据的结束时间,支持字符串类型的 UNIX 时间戳,精度为秒,例如"1711607794"。不填写代表持续拉取。 | - | - |
class App:def __init__(self):self.shutdown_flag = False# access endpointself.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')# regionself.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')# secret idself.access_key_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')# secret keyself.access_key = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')# logset idself.logset_id = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')# topic idsself.topic_ids = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')# consumer group name,self.consumer_group = 'consumer-group-1'# consumer id, 建议您的消费者个数=日志主题分区数.self.consumer_name1 = "consumer-group-1-A"assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and ""logset_id cannot be empty")signal.signal(signal.SIGTERM, self.signal_handler)signal.signal(signal.SIGINT, self.signal_handler)def signal_handler(self, signum, frame):print(f"catch signal {signum},cleanup...")self.shutdown_flag = Truedef run(self):print("*** start to run consumer...")self.consume()# waiting for exit signalwhile not self.shutdown_flag:time.sleep(1)# shutdown consumerprint("*** stopping workers")self.consumer.shutdown()sys.exit(0)def consume(self):try:# consumer configoption1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,offset_start_time='begin', max_fetch_log_group_size=1048576)# init consumerself.consumer = ConsumerWorker(SampleConsumer, consumer_option=option1)# start consumerprint("*** start to consume data...")self.consumer.start()except Exception as e:import tracebacktraceback.print_exc()raise e
文档反馈