当前位置: 首页 > 知识库问答 >
问题:

KSQL窗口聚合流,会话结束

曹和正
2023-03-14

我已经能够创建一个“会话开始信号”流,如本答案所述。

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json', partitions=2);

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;

-- Create a stream over the existing `SESSIONS` topic.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;

是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

共有1个答案

寿高阳
2023-03-14

我假设您的意思是,当会话窗口在为窗口配置的5秒内没有看到任何适合会话的新消息时,您希望发出事件/行?

我认为目前这是不可能的。

因为源数据可能有无序的记录,即时间戳比已处理的行早得多的事件,所以一旦5 seconds窗口过去,就不能“关闭”会话窗口。

 类似资料:
  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 尝试合并多个 Kafka 流,聚合

  • 我想使用窗口聚合,然后在中生成的结果之上运行窗口聚合。 是否可以在第一次聚合后修改属性以使其等于会话中最后观察到的事件的? 我正在尝试这样做: 关键部分是: 因此,我想将会话中最新事件的 重新分配给记录,而没有会话间隔(在本例中为 )。 这在BatchTable中运行良好,但在StreamTable中不起作用: 是的,我知道,感觉上我不想发明时间机器来改变时间顺序。但实际上有可能以某种方式实现所描

  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的