当前位置: 首页 > 知识库问答 >
问题:

如果我在Kafka中有Transactional Producer,我可以使用Kafka Streams阅读一次消息吗?

阎英朗
2023-03-14

我希望只使用一次语义,但我不想和消费者一起阅读信息。我宁愿读Kafka的留言。如果我加上处理。保证=恰好一次流配置,将恰好一次语义保留?

共有1个答案

羊舌勇
2023-03-14

精确一次处理是基于读-处理-写模式的。Kafka Streams使用这种模式,因此,如果您编写一个常规的Kafka Streams应用程序,将结果写回Kafka主题,您将得到一次处理保证。

注意,副作用不在本保证范围内。还要注意,如果失败,可能会在内部重试。恰好一次意味着,您在输出主题中看到的结果与不会发生错误(因此不会重试)的结果相同。

想了解更多细节,你可能想看一段关于Kafka一次担保的谈话录音。Confluent的网页上有多个可用页面(免责声明:我是Confluent的员工):https://www.confluent.io/resources/

 类似资料:
  • 我想做以下几点: 将iPhone放在设备的NFC标签前 读取标记的NDEF消息 读取后,标记的NDEF消息将被我的设备(而不是iPhone)覆盖。转到%2. 读取“不再有数据”消息后,停止读取并转到3。 有没有可能用CoreNFC做到这一点,而不必来回移动iPhone来再次识别“新的”NFC标签呢?

  • 我正在编写一个使用Kafka流的应用程序。它从主题A读取,进行一些转换,然后写入主题B。在转换期间,值按键分组,因此输出键、值类型不同于输入值类型。Kafka流使用特定类型的Serdes(例如String serdes序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后它将无法工作。如何在Streams API中定义不同的序列化器和反序列化器?

  • Kafka consumer有一个配置< code>max.poll.records,它控制对poll()的单次调用中返回的最大记录数,其默认值为500。我将它设置为一个很高的数字,这样我就可以在一次轮询中获得所有的消息。然而,即使这个主题有更多的信息,在一次呼叫中,民意调查只返回几千条信息(大约6000条)。< br> 如何进一步增加单个消费者阅读的邮件数量?

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我们使用的是spring集成kafka版本3.1.2。RELEASE and int kafka:消息驱动的通道适配器,用于使用来自远程kafka主题的消息。生产者发送加密消息,我们使用反序列化器解密实际消息。我们可以使用主题中发布的所有消息。我们将自动提交用作false。我们想知道在成功处理消息后如何从我们的服务提交或确认消息。有人能帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?

  • 拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...