下面是带有suppress运算符的窗口的简单定义:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.suppress(untilWindowCloses(unbounded())
.toStream()
// process last event here
...
所以我的问题是,suppress运算符如何检测一个事件是否是窗口的最后一个事件?让我们想象一下,我移除suppress运算符:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
...
我知道,对于Ktable
的每一个更改,都将生成两个事件:
null
值的记录,以删除上一条记录我要做的是移除suppress
运算符,自己检测最后一条记录:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
.toStream()
.filter( /* detect the last record here */ )
这些信息是在DSL还是处理器API中公开的?
这些信息只是间接暴露的。suppress()
运算符使用状态存储来跟踪以前接收的消息。这允许相互比较旧消息/新消息,并决定何时实际发出某些消息。
请注意,无状态filter()
不能实现这一点。如果您想了解细节,您需要阅读源代码。
但是,主要的问题是:为什么首先要删除suppress()
?
问题内容: 我正在寻找连接2个表并仅显示明细表的最后一条记录的正确SQL代码。 我有一个带有2个表的数据库, 每个交易都有多个评论,但是我想创建一个显示所有交易的视图,并且仅显示每个交易的最后一条评论(由CommentTime确定)字段 问题答案: 编辑:我没有足够接近地阅读初始问题,也没有注意到视图中需要所有DEALS行。以下是我的修改后的答案:
问题内容: 我有一个这样的 LoginTime 表: 我想删除的最后一条记录。用户的最后一条记录可以通过识别。 如何使用一个查询执行此操作。 问题答案: 您需要按user_id(例如WHERE user_id = 1)过滤表,然后按时间(例如ORDER BY datetime)对其进行排序,然后将查询限制为一项(例如LIMIT1),然后删除此查询的结果。最后,您将获得如下查询:
null null
问题内容: 如标题所示,我想选择以分组的每组行的第一行和最后一行。 我在此表中包含以下数据: 我需要获取的是列开头的第一个值和列末尾的最后一个值,并按group by group列进行分组。 结果表应如下所示: 我使用和和进行了查询。它可以在SQL Server 2012中使用,但不能在SQL Server 2008中使用。我需要一个可以在SQL Server 2008中执行的查询。 问题答案:
问题内容: 我正在使用mysql并遇到一些问题。我想检索插入的最后一行。 <<以下是详细>> 以下是我创建表格的方式。 我在其中插入了四个值,如下所示 当我执行时,我得到如下输出 当我尝试下面的代码时, 我得到如下输出。 但是,当我使用代码时,出现错误 使用时,表中没有任何数据。 链接以播放数据 注意: 这里我使用4只是为了获得所需的输出。稍后我可以从查询中获取 如果我只想查看最后一条记录,请建议
我的工作是做以下事情: 根据事件时间使用Kafka主题中的事件 计算7天的窗口大小,以1天的幻灯片显示 将结果放入Redis 我有几个问题: 如果它从最近的记录中消耗Kafka事件,在作业存活1天后,作业关闭窗口并计算7天窗口。问题是作业只有1天的数据,因此结果是错误的。 如果我尝试让它从7天前的时间戳中消耗Kafka事件,当作业开始时,它从第一天开始计算整个窗口,并且花费了很多时间。另外,我只想