tencent cloud

Stream Compute Service

Flink Dynamic CEP Quick Start

Download
Focus Mode
Font Size
Last updated: 2026-06-04 11:00:48

Prerequisites

A Flink dynamic CEP job needs to run on a dedicated Stream Compute Service (SCS) cluster. If you do not have a cluster, see Creating a Private Cluster.

Operation Procedures

Step 1: Prepare test data.

Prepare upstream Kafka topics.
1. Log in to the TDMQ for CKafka console, create or select an available CKafka instance.
2. Create a topic named topic_cep_demo to store the simulated user behavior logs.
Prepare MySQL databases.
1. Log in to the TencentDB for MySQL console, then create or select an available MySQL instance.
2. Create the t_mysql_demo rule table to record rules to be applied in Flink CEP jobs.
CREATE DATABASE d_cep_demo;
USE d_cep_demo;

CREATE TABLE t_mysql_demo (
`id` VARCHAR(64),
`version` INT,
`pattern` VARCHAR(4096),
`function` VARCHAR(512)
);

Step 2: Develop and start Flink CEP jobs.

1. Contact Online Customer Service to obtain the flink-cep JAR package, and add the package to the dependency library of your Maven project.
2. Develop job code.
3. Upload the JAR package for the job on the SCS console, and deploy and start the JAR job.
Main class: com.tencent.cloud.oceanus.cep.demo.FlinkCepDemo. The main class input parameters are as follows:
--bootstrap-server 172.28.22.5:9092 --topic topic_cep_demo --properties.group.id flink_cep_demo --url jdbc:mysql://172.28.28.24:3306/d_cep_demo?user=root&password=waze6011601 --table-name t_mysql_demo --pattern-update-interval-ms 3000

Step 3: Insert rules.

1. Insert dynamic update rules into the MySQL table.
INSERT INTO t_mysql_demo (
`id`,
`version`,
`pattern`,
`function`
) values(
'1',
1,
'{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}',
'com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction')
;

To get started quickly and improve the readability of the Pattern field in the database, SCS defines a set of JSON format rule descriptions. The value of the pattern field in the above SQL statement is exactly the serialized pattern string provided according to the JSON format rules. The purpose of the string is to match this pattern: after 3 consecutive events with action 0, the next event still has an action not equal to 1.
2. Send messages to the Kafka Topic topic_cep_demo.
1,ZhangSan,0,1,1697181992000
1,ZhangSan,0,1,1697181993000
1,ZhangSan,0,1,1697181994000
1,ZhangSan,0,1,1697181995000
View JobManager logs to search for "JDBCPeriodicPatternProcessorDiscoverer", and view the latest rules.
"{\\"name\\":\\"end\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"SINGLE\\"],\\"times\\":null,\\"untilCondition\\":null},\\"condition\\":null,\\"nodes\\":[{\\"name\\":\\"end\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"SINGLE\\"],\\"times\\":null,\\"untilCondition\\":null},\\"condition\\":{\\"className\\":\\"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition\\",\\"type\\":\\"CLASS\\"},\\"type\\":\\"ATOMIC\\"},{\\"name\\":\\"start\\",\\"quantifier\\":{\\"consumingStrategy\\":\\"SKIP_TILL_NEXT\\",\\"properties\\":[\\"LOOPING\\"],\\"times\\":{\\"from\\":3,\\"to\\":3,\\"windowTime\\":null},\\"untilCondition\\":null},\\"condition\\":{\\"expression\\":\\"action == 0\\",\\"type\\":\\"AVIATOR\\"},\\"type\\":\\"ATOMIC\\"}],\\"edges\\":[{\\"source\\":\\"start\\",\\"target\\":\\"end\\",\\"type\\":\\"SKIP_TILL_NEXT\\"}],\\"window\\":null,\\"afterMatchStrategy\\":{\\"type\\":\\"SKIP_PAST_LAST_EVENT\\",\\"patternName\\":null},\\"type\\":\\"COMPOSITE\\",\\"version\\":1}"
View logs in TaskManager to search for "A match for Pattern of", and view the CEP log printing.
A match for Pattern of (id, version): (1, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000)]end: [Event(1, ZhangSan, 0, 1, 1697181995000)]

Step 4: Update the match rule and check whether the rule is valid.

1. Update rules in the MySQL table.
1.1 Modify the action == 0 in StartCondition to action == 0 || action == 2, and we will modify the number of occurrences from >=3 to >=5. The corresponding SQL statements are as follows:
INSERT INTO t_mysql_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction');
1.2 Insert another record with ID 2 for the new rule. It is the same as version 1 of rule 1, with StartCondition remaining as action == 0 and the number of occurrences as >=3.
INSERT INTO t_mysql_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.tencent.cloud.oceanus.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.tencent.cloud.oceanus.cep.demo.dynamic.DemoPatternProcessFunction');
2. Send data to the Kafka topic.
1,ZhangSan,0,1,1697181992000
1,ZhangSan,0,1,1697181993000
1,ZhangSan,0,1,1697181994000
1,ZhangSan,2,1,1697181995000
1,ZhangSan,0,1,1697181996000
1,ZhangSan,0,1,1697181997000
1,ZhangSan,0,1,1697181998000
1,ZhangSan,2,1,1697181999000
3. View logs in TaskManager to search for "A match for Pattern of", and view the CEP log printing.
Rule matching log for ID 1:
A match for Pattern of (id, version): (1, 2) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000), Event(1, ZhangSan, 2, 1, 1697181995000), Event(1, ZhangSan, 0, 1, 1697181996000)]end: [Event(1, ZhangSan, 0, 1, 1697181997000)]
Rule matching log for ID 2:
A match for Pattern of (id, version): (2, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181992000), Event(1, ZhangSan, 0, 1, 1697181993000), Event(1, ZhangSan, 0, 1, 1697181994000)]end: [Event(1, ZhangSan, 2, 1, 1697181995000)]
A match for Pattern of (id, version): (2, 1) is found. The event sequence: start: [Event(1, ZhangSan, 0, 1, 1697181996000), Event(1, ZhangSan, 0, 1, 1697181997000), Event(1, ZhangSan, 0, 1, 1697181998000)]end: [Event(1, ZhangSan, 2, 1, 1697181999000)]

Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback