当前位置: 首页 > 知识库问答 >
问题:

KSQL窗口聚合流

公孙森
2023-03-14

我有一个,它是由一个kafka主题创建的,并且指定了Timestamp属性。

当我试图创建一个时,会话窗口化了一个查询,如下所示:

CREATE STREAM SESSION_STREAM AS
SELECT ...
  FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
   GROUP BY ...;

我总是得到错误:

CREATE STREAM SESSION_START_STREAM AS
SELECT *
  FROM SESSION_TABLE
 WHERE WINDOWSTART=WINDOWEND;

KSQL不支持对窗口表的持久查询

如何在KSQL中创建开始会话窗口的事件的

共有1个答案

鲍鸿波
2023-03-14

如果将create stream语句切换为create table语句,则将创建一个不断更新的表。接收器主题session_stream将包含对表的更改流,即它的changelog。

ksqlDB将其建模为表,因为它具有表语义,即表中只能存在具有任何特定键的单行。但是,changelog将包含已应用于表的更改流。

如果您想要的是一个包含所有会话的主题,那么类似这样的内容将创建:

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json');

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
 类似资料:
  • 我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?

  • 尝试合并多个 Kafka 流,聚合

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

  • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi

  • 问题内容: 嗨,我有一张看起来像这样的桌子 我想要按周分组的页面视图聚合,但显示过去30天的聚合-(每周滑动窗口聚合,窗口大小为30天) 我正在使用Google bigquery 编辑:戈登-对您的“客户”发表评论,实际上我需要的是稍微复杂一点的,这就是为什么我在上表中包括客户的原因。我希望获得每周30天的浏览量> n的客户数量。像这样的东西 但是,为了简单起见,如果我能够获得页面浏览量的滑动窗口

  • 我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的