当前位置: 首页 > 知识库问答 >
问题:

Kafka--曾经的生产者和消费者

廉展鹏
2023-03-14

在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

共有1个答案

高兴贤
2023-03-14

“在消费方面,我需要保证消费者准确地阅读每条记录一次”

Gopinath的回答很好地解释了你如何在KafkaProducer和KafkaConsumer之间实现精确--一次。这些配置(连同KafkaProducer中事务API的应用)保证了生产者发送的所有数据都将在Kafka中存储一次。但是,它并不能保证使用者只读取一次数据。当然,这取决于您的抵销管理。

无论如何,我理解您的问题,您想知道使用者本身是如何处理一个被消耗的消息的。

    null

方法commitSync和commitAsync不会让您在这里走得更远,因为它们只能确保在使用者中进行最多一次或至少一次处理。此外,有益的是您的处理是幂等的。

有一个很好的博客解释了这样一个实现,它使用ConsumerReBalanceListener并将偏移量存储在本地文件系统中。还提供了完整的代码示例。

“我是否需要借助Kafka事务API在使用者轮询循环中创建事务生产者”

 类似资料:
  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我没有使用Spring Kafka模块来生成和使用消息。相反,我在生产者和消费者实现中使用Apache客户端库。由于我没有使用Spring Kafka,因此Spring Slueth自动配置不适用于生成跟踪。我已经提到https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/integration

  • 我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa