当前位置: 首页 > 知识库问答 >
问题:

如何使用KSQL将KTable的所有行输出到Kafka主题

汪高岑
2023-03-14

我是Ksql新手,正在尝试聚合,已经创建了一个Kstream

kstream。。。

 CREATE STREAM test
  (id BIGINT,
   type VARCHAR,
   sales BIGINT
  WITH (KAFKA_TOPIC='test1',
        VALUE_FORMAT='JSON');

ktable...

CREATE TABLE test_total AS
SELECT ID,
       SUM(SALES) AS TOTAL_SALES
FROM test
GROUP BY ID
EMIT CHANGES;

将这些值发布到Kafka test1主题中

{"id": 1, "type": "test","sales": 200}
{"id": 1, "type": "test","sales": 300}

当我使用Kafka控制台使用者时,我只看到输出为

{“总销售额”:200}{“总销售额”:500}

我怎样才能看到身份证也印在Kafka主题中?我需要在桌子外创建某种视图吗?

共有2个答案

公冶鸣
2023-03-14

在ksqlDB中,基本类型作为基本类型序列化到主题中。

查阅https://docs.ksqldb.io/en/latest/reference/serialization/

你对此无能为力。

陈允晨
2023-03-14

在您的控制台-消费者命令中添加

--property print.key=true

如果已经在使用ksql,还可以从创建的表中进行选择

 类似资料:
  • 我是KSQL的新手。我想使用KSQL查询将kafka集群中存在的所有主题名称存储到另一个主题。来自KSQL CLI的给我主题列表。我想通过创建流将所有这些主题信息存储在另一个主题中。我将轮询这个新主题(使用消费者),每当在集群中创建新主题时,我的消费者都会收到一条消息。我想要一个KSQL查询来完成这一点。 提前感谢。

  • 我的流服务执行的操作很少: 在进行测试时,我发现我的服务在调用函数后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。 我检查了KStreams创建的主题,主题就在那里: 我发现有三个输入,即,我不知道第三个输入是什么: 为了确保所有内容都被覆盖,这里是我的配置: 我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:

  • 对于我的应用程序,我使用KTable-Ktable连接,这样无论何时在主数据流或子数据流上接收数据,它都可以为所有三个表设置带有setters和getters的复合对象。这三个传入流具有不同的键,但是在创建KTable时,我为所有三个KTable设置相同的键。 我有一个分区的所有主题。当我在单个实例上运行应用程序时,一切都运行良好。我可以看到compositeObject填充了所有三个表中的数据。

  • 我将一个KStream与一个KTable左联接,但我没有看到输出主题的任何输出: 如果我绕过连接,直接将输入主题输出到输出,我会看到消息到达。我已经将联接更改为左联接,添加了一些printlns以查看何时提取键(但控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的想法快用完了。此外,我还添加了一些对存储的测试访问,它可以工作,并且包含来自流的键(尽管这不应该因为左

  • 我能做到写作和阅读的中间主题: 有没有简单的方法从中获取?这是我第一个使用Kafka Streams的应用程序,所以我可能错过了一些明显的东西。

  • 我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。