tencent cloud

流计算 Oceanus

动态与公告
产品动态
产品简介
产品概述
产品优势
应用场景
购买指南
计费概述
计费模式
退费说明
调整配置费用说明
快速入门
从零开始上手
创建独享集群
创建 SQL 作业
创建 JAR 作业
创建 ETL 作业
创建 Python 作业
操作指南
作业管理
作业开发
作业监控
作业日志
事件与诊断
元数据管理
快照管理
作业调优
依赖管理
集群管理
权限管理
SQL 开发指南
开发指南概述
术语和数据类型
DDL 数据定义语句
DML 数据操作语句
MySQL CDC 多 Source 复用
上下游开发指南
SET 控制语句
运算符和内置函数
标识符与保留字
Python 开发指南
ETL 开发指南
概述
ETL 作业术语表
上下游开发指南
常见问题
联系我们

时间窗口函数

PDF
聚焦模式
字号
最后更新时间: 2023-11-08 11:33:46
在流式计算中,流通常是无穷无尽的,我们无法知道什么时候数据源会继续/停止发送数据,所以在流上处理聚合事件(count、sum 等)的处理方式与批处理中的处理方式会有所差异。在流上一般用窗口(Windows)来限定聚合的范围,例如“过去2分钟网站点击量的计数”、“在最近100个人中点赞这个视频的总人数”。窗口的概念相当于帮我们收集了一张有限数据的动态表,我们可以对表中的数据进行聚合计算。
窗口函数是一种特殊的函数,它并不在 SELECT 的投影列表中使用,而是在 GROUP BY 子句中使用。流计算 Oceanus 支持三种类型的窗口函数 TUMBLE、HOP 和 SESSION。
Flink 流处理介绍详见 及时流处理

TUMBLE WINDOW

TUMBLE WINDOW(滚动窗口)将每个进入的数据分配到一个指定窗口大小的窗口中。滚动窗口可以自定义固定的大小,并且不会出现重叠。我们可以对窗口内的数据进行计算。

语法

TUMBLE(time_attr, interval)
time_attr 参数表示时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 flink 处理的时刻,一般用在 Processing Time 模式下
interval 参数用来设置窗口大小。例如,设置为1天:INTERVAL '1' DAY;设置为2小时:INTERVAL '2' HOUR,其他用法可参见 时间相关函数
注意
如果在 Event Time 时间模式下(使用 WATERMARK FOR 语句定义了时间戳字段),那么 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为该字段。
如果在 Processing Time 时间模式下,则 TUMBLE、HOP、SESSION 窗口函数的第一个参数必须为 proctime() 函数生成的计算列,下文用 PROCTIME 举例,请在实际作业中替换为实际的列名。

标识函数

标识函数用来表示窗口的起始与结束时间。
函数名
功能描述
TUMBLE_START(time-attr, size-interval)
返回该窗口的起始时间
TUMBLE_END(time-attr, size-interval)
返回该窗口的结束时间

模拟用例

下文以 TUMBLE WINDOW 为例,帮助您更容易地理解 TUMBLE WINDOW。使用 Event Time 模拟统计每小时各用户收入金额。
示例数据:
username(VARCHAR)
income(BIGINT)
times(TIMESTAMP)
Tom
20
2021-11-11 10:30:00.0
Jack
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:40:00.0
Tom
15
2021-11-11 11:30:00.0
Jack
10
2021-11-11 11:30:00.0
Jack
15
2021-11-11 11:40:00.0
SQL 语句:
CREATE TABLE user_income (
username VARCHAR,
Income INT,
times TIMESTAMP(3),
WATERMARK FOR times AS times - INTERVAL '3' SECOND
) WITH (
...
);

CREATE TABLE output (
win_start TIMESTAMP,
win_end TIMESTAMP,
username VARCHAR,
hour_income BIGINT
)WITH(
...
);

INSERT INTO output
SELECT
TUMBLE_START(times,INTERVAL '1' HOUR),
TUMBLE_END(times,INTERVAL '1' HOUR),
username,
SUM(Income)
FROM user_income
GROUP BY TUMBLE(times,INTERVAL '1' HOUR),username;
输出结果:
win_start(TIMESTAMP)
win_end(TIMESTAMP)
username(VARCHAR)
hour_income(BIGINT)
2021-11-11 10:00:00.0
2021-11-11 11:00:00.0
Tom
40
2021-11-11 10:00:00.0
2021-11-11 11:00:00.0
Jack
10
2021-11-11 11:00:00.0
2021-11-11 12:00:00.0
Tom
15
2021-11-11 11:00:00.0
2021-11-11 12:00:00.0
Jack
25

HOP WINDOW

HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。
HOP WINDOW(滑动窗口)保持窗口大小(Size)不变,每次滑动指定的时间周期(Slide),因而允许窗口之间的相互重叠。
Slide 的大小决定了 Flink 创建新窗口的频率。
当 Slide 小于 Size 时(如图 window1 与 window2),相邻窗口会重叠,一个时间会被分配到多个窗口。
当 Slide 大于 Size 时(如图 window1 与 window4),可能会导致有些事件被丢弃。
当 Slide 等于 Size 时(如图 window1 与 window3),等于是 TUMBLE WINDOW。

语法

HOP(time_attr, sliding_interval, window_size_interval)
time_attr 参数表示时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 flink 处理的时刻,一般用在 Processing Time 模式下。
window_size_interval 参数用来设置窗口大小。例如,设置为1天:INTERVAL '1' DAY;设置为2小时:INTERVAL '2' HOUR,其他用法可参见 时间相关函数
sliding_interval 参数用来设置滑动时间周期大小。例如,设置为1天:INTERVAL '1' DAY;设置为2小时:INTERVAL '2' HOUR,其他用法可参见 时间相关函数

标识函数

标识函数用来表示窗口的起始与结束时间。
函数名
功能描述
HOP_START(time-attr, slide-interval,size-interval)
返回该窗口的起始时间
HOP_END(time-attr, slide-interval,size-interval)
返回该窗口的结束时间

模拟用例

下文以 HOP WINDOW 为例,帮助您更容易地理解 HOP WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,每30分钟更新一次。1小时的窗口,10分钟滑动一次。
样例数据:
username(VARCHAR)
income(BIGINT)
times(TIMESTAMP)
Tom
20
2021-11-11 10:30:00.0
Jack
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:40:00.0
Tom
15
2021-11-11 11:35:00.0
Jack
10
2021-11-11 11:30:00.0
Jack
15
2021-11-11 11:40:00.0
SQL 语句:
CREATE TABLE user_income (
username VARCHAR,
Income INT,
times TIMESTAMP(3),
WATERMARK FOR times AS times - INTERVAL '3' MINUTE
)WITH(
...
);

CREATE TABLE output (
win_start TIMESTAMP,
win_end TIMESTAMP,
username VARCHAR,
hour_income BIGINT
)WITH(
...
);

INSERT INTO output
SELECT
HOP_START(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
HOP_END(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),
username,
SUM(income)
FROM user_income
GROUP BY HOP(times,INTERVAL '30' MINUTE,INTERVAL '1' HOUR),username;
输出结果:
win_start(TIMESTAMP)
win_end(TIMESTAMP)
username(VARCHAR)
hour_income(BIGINT)
2021-11-11 10:00:00.0
2021-11-11 11:00:00.0
Tom
40
2021-11-11 10:00:00.0
2021-11-11 11:00:00.0
Jack
10
2021-11-11 10:30:00.0
2021-11-11 11:30:00.0
Jack
10
2021-11-11 10:30:00.0
2021-11-11 11:30:00.0
Tom
40
2021-11-11 11:00:00.0
2021-11-11 12:00:00.0
Tom
15
2021-11-11 11:00:00.0
2021-11-11 12:00:00.0
Jack
25
2021-11-11 11:30:00.0
2021-11-11 12:30:00.0
Jack
25
2021-11-11 11:30:00.0
2021-11-11 12:30:00.0
Tom
15

SESSION WINDOW

SESSION WINDOW(会话窗口)通过 session 活动对元素进行分组,session 窗口与滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置。这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中。
Session Window 并非以长度来划分窗口,而是以非活跃时间来划分。例如超过30分钟不活跃(没有新数据),则之前的窗口结束,下一个来到的数据将会形成一个新窗口。

语法

SESSION(time_attr, interval)
time_attr 参数表示时间戳字段,表示每条记录被处理的时间戳。如果指定为 PROCTIME 是自动生成的时间戳,记录了数据被 flink 处理的时刻,一般用在 Processing Time 模式下。
interval 参数用来设置窗口大小。例如,设置为1天:INTERVAL '1' DAY;设置为2小时:INTERVAL '2' HOUR,其他用法可参见 时间相关函数

标识函数

标识函数用来表示窗口的起始与结束时间。
函数名
功能描述
SESSION_START(time-attr, size-interval)
返回该窗口的起始时间
SESSION_END(time-attr, size-interval)
返回该窗口的结束时间

模拟用例

下文以 SESSION WINDOW 为例,帮助您更容易地理解 SESSION WINDOW。使用 Event Time 模拟统计每小时各用户收入金额,会话超时时长为30分钟。
样例数据:
username(VARCHAR)
income(BIGINT)
times(TIMESTAMP)
Tom
20
2021-11-11 10:30:00.0
Jack
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:35:00.0
Tom
10
2021-11-11 10:40:00.0
Tom
15
2021-11-11 11:50:00.0
Jack
10
2021-11-11 11:40:00.0
Jack
15
2021-11-11 11:45:00.0
SQL 语句:
CREATE TABLE user_income (
username VARCHAR,
Income INT,
times TIMESTAMP(3),
WATERMARK FOR times AS times - INTERVAL '3' MINUTE
)WITH(
...
);

CREATE TABLE output (
win_start TIMESTAMP,
win_end TIMESTAMP,
username VARCHAR,
hour_income BIGINT
)WITH(
...
);

INSERT INTO output
SELECT
SESSION_START(times,INTERVAL '30' MINUTE),
SESSION_END(times,INTERVAL '30' MINUTE),
username,
SUM(Income)
FORM user_income
GROUP BY SESSION(times,INTERVAL '30' MINUTE),username;
输出结果:
win_start(TIMESTAMP)
win_end(TIMESTAMP)
username(VARCHAR)
hour_income(BIGINT)
2021-11-11 10:30:00.0
2021-11-11 11:10:00.0
Tom
40
2021-11-11 10:35:00.0
2021-11-11 11:05:00.0
Jack
10
2021-11-11 11:30:00.0
2021-11-11 12:00:00.0
Tom
15
2021-11-11 11:30:00.0
2021-11-11 12:10:00.0
Jack
25

更多说明

以上三种窗口都有对应的辅助函数。以 TUMBLE 窗口为例(HOP、SESSION 也一样,只是前缀不同),辅助函数如下:
TUMBLE_ROWTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Event Time 时间模式下使用)。示例如下:
SELECT user,
TUMBLE_START(rowtime, INTERVAL '12' HOUR) AS sStart,
TUMBLE_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '12' HOUR), user
TUMBLE_PROCTIME:表示 TUMBLE 窗口的末端界限(包含,可用作 JOIN 或 GROUP 以及 OVER 条件,Processing Time 时间模式下使用)。示例如下:
SELECT user,
TUMBLE_START(PROCTIME, INTERVAL '12' HOUR) AS sStart,
TUMBLE_PROCTIME(PROCTIME, INTERVAL '12' HOUR) AS snd,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(PROCTIME, INTERVAL '12' HOUR), user


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈