当前位置: 首页 > 知识库问答 >
问题:

Kafka Ktable changelog(使用toStream())在同时到达多个具有相同密钥的消息时丢失了一些Ktable更新

丁阎宝
2023-03-14

我有一个输入流,我用它来创建一个KTABLE。然后使用toStream()方法,使用ktable changelog创建一个输出流。问题是toStream()方法创建的流不包含更新了KTABLE的输入流中的所有消息。下面是我的代码:

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
      aggregateKtableMethod,
      storageConf);

KStream<String, event> outputStream = KTable.toStream();

我想这是因为Ktable的更新是通过一些批处理操作进行的,但我找不到任何与之相关的配置或文档。是这些丢失事件的原因吗?你知道如何更改配置以便我不会丢失任何消息吗?

共有1个答案

姬俊驰
2023-03-14

我找到解决办法了。问题出在我用来创建ktable的“StorageConf”中,缓存是可以的。我只需要禁用它,函数是:

storageConf.withCachingDisabled();

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
  aggregateKtableMethod,
  storageConf);

现在我在输出流中有了我的所有事件。

 类似资料:
  • 我想在js中制作一个网页,将加密明文,所以我可以把它发送给朋友,谁将使用相同的网页来解密它。 我们将共享同一密钥,并将其用于多条消息。 我知道当使用AES CBC-有需要随机iv为每个消息,但我喜欢使用AES CTR。 我将使用256键,而不是密码。 我有两个问题: 我可以在CTR和no iv中多次使用同一密码吗 如果我将使用CBC,在发送加密消息的同时发送明文iv是否安全 我使用的是aes js

  • 我找遍了,但没有找到是否可能。 我有一个MySQL查询: 字段id有一个“唯一索引”,所以不能有两个。现在,如果数据库中已经存在相同的id,我想更新它。但我真的必须再次指定所有这些字段吗,比如: 或: 我已经在插入中指定了所有内容。。。 一个额外的注意事项,我想使用周围的工作来获得ID! 我希望有人能告诉我什么是最有效的方法。

  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 请参阅下面的更新以显示潜在的解决方案 我们的应用程序使用2个主题作为KTables,执行左连接,并输出到一个主题。在测试过程中,我们发现当我们的输出主题只有一个分区时,这项功能可以正常工作。当我们增加分区的数量时,我们注意到生成到输出主题的消息数量减少了。 在启动应用程序之前,我们用多个分区配置测试了这一理论。使用1个分区,我们可以看到100%的消息。使用2,我们可以看到一些消息(少于50%)。对

  • 我试图用不同的密钥将消息存储到不同的分区。 例如: 但是当我尝试运行我的Producer类时,它总是存储在单个分区中。 根据文档,使用查找分区。我还看到这个问题Kafka分区键工作不正常‏, 但我在Kafka Client库的0.9.x版本中找不到<code>ByteArrayPartitioner</code>类。 更新:我正在使用代码动态创建主题。 如果我手动创建一个带有分区的主题,那么它可以

  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。