我有一个流
,它是由一个kafka主题创建的,并且指定了Timestamp
属性。
当我试图创建一个流
时,会话窗口化了一个查询,如下所示:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是得到错误:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL不支持对窗口表的持久查询
如何在KSQL中创建开始会话窗口的事件的流
?
如果将create stream语句切换为create table语句,则将创建一个不断更新的表。接收器主题session_stream
将包含对表的更改流,即它的changelog。
ksqlDB将其建模为表,因为它具有表语义,即表中只能存在具有任何特定键的单行。但是,changelog将包含已应用于表的更改流。
如果您想要的是一个包含所有会话的主题,那么类似这样的内容将创建:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?
尝试合并多个 Kafka 流,聚合
我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些
我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi
问题内容: 嗨,我有一张看起来像这样的桌子 我想要按周分组的页面视图聚合,但显示过去30天的聚合-(每周滑动窗口聚合,窗口大小为30天) 我正在使用Google bigquery 编辑:戈登-对您的“客户”发表评论,实际上我需要的是稍微复杂一点的,这就是为什么我在上表中包括客户的原因。我希望获得每周30天的浏览量> n的客户数量。像这样的东西 但是,为了简单起见,如果我能够获得页面浏览量的滑动窗口
我有一个事件流,我想聚集基于时间窗口。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到过,这对于stream来说是正常的,因为它会以更改日志的形式给出结果。另外,在研究过程中,我遇到了两步窗口聚合与Kafka Streams DSL和如何发送最终的kafka-streams聚合结果的时间窗口Ktable?.但是第一篇文章中的解决方案有些过时(使用不推荐的API)。我使用了在那些不推荐的