我是Ksql新手,正在尝试聚合,已经创建了一个Kstream
kstream。。。
CREATE STREAM test
(id BIGINT,
type VARCHAR,
sales BIGINT
WITH (KAFKA_TOPIC='test1',
VALUE_FORMAT='JSON');
ktable...
CREATE TABLE test_total AS
SELECT ID,
SUM(SALES) AS TOTAL_SALES
FROM test
GROUP BY ID
EMIT CHANGES;
将这些值发布到Kafka test1主题中
{"id": 1, "type": "test","sales": 200}
{"id": 1, "type": "test","sales": 300}
当我使用Kafka控制台使用者时,我只看到输出为
{“总销售额”:200}{“总销售额”:500}
我怎样才能看到身份证也印在Kafka主题中?我需要在桌子外创建某种视图吗?
我是KSQL的新手。我想使用KSQL查询将kafka集群中存在的所有主题名称存储到另一个主题。来自KSQL CLI的给我主题列表。我想通过创建流将所有这些主题信息存储在另一个主题中。我将轮询这个新主题(使用消费者),每当在集群中创建新主题时,我的消费者都会收到一条消息。我想要一个KSQL查询来完成这一点。 提前感谢。
我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:
对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。 我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。
我将一个KStream与一个KTable左联接,但我没有看到输出主题的任何输出: 如果我绕过连接,直接将输入主题输出到输出,我会看到消息到达。我已经将联接更改为左联接,添加了一些printlns以查看何时提取键(但控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的想法快用完了。此外,我还添加了一些对存储的测试访问,它可以工作,并且包含来自流的键(尽管这不应该因为左
我能做到写作和阅读的中间主题: 有没有简单的方法从中获取?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。
我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。