CREATE TABLE `nginx_source`( # Fields in the log`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka partition`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',# cls kafka protocol consumption topic name provided by the console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console'topic' = 'Your consumption topics',# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, fill in according to your actual situation'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# Replace with your consumer group nameproperties.group.id' = 'Consumer group ID','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# username is logset ID, for example ca5cXXXXdd2e-4ac0af12-92d4b677d2c6The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that jaas.config must end with a semicolon; an error will be reported if not filled in.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.14.4</version></dependency>
CREATE TABLE `nginx_source`(#Fields in the log`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,# kafka partition`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',# cls kafka protocol consumption topic name provided by the console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console'topic' = 'Your consumption topics',# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, fill in according to your actual situation'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# Replace with your consumer group nameproperties.group.id' = 'Consumer group ID','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# username is logset ID, for example ca5cXXXXdd2e-4ac0af12-92d4b677d2c6The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. It is recommended to use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. Configure the action and resource in the sub-account access policy to the minimum range to fulfill the operations. Note that the jaas.config must end with a semicolon; an error will be reported if not filled in.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
select count(*) , host from nginx_source group by host;
Feedback