我有一个消费转换产品应用程序,在Kafka中有一次精确的权杖。(事务性)生成阶段在同一主题上生成新消息,然后使用该消息(事务性=read_committed)。只有一个线程执行此操作,并确保消费者轮询在生产者的事务提交之后发生。现在我每轮只有一份民意调查报告。
当我运行我的测试用例时,有时可能会有其他生产者在我的生产者的事务提交之前发送的消息。然后我经历了以下情况:
我的单个poll语句只返回这个外来消息,但不返回我刚才生成的消息,尽管上一轮的事务已成功提交。
>
final Map consumerConfig=新建LinkedHashMap
事务生产者
final Map producerConfig=新建LinkedHashMap
我的投票超时为2秒
为了让您了解poll的工作原理,我们传递给poll()的参数是一个超时时间间隔,它控制如果消费者缓冲区中没有数据,poll()将阻塞多长时间。如果设置为0,poll()将立即返回;否则,它将等待指定的毫秒数,以便数据从代理到达。因此,如果您将轮询配置为0毫秒,并且数据缓冲区中没有数据,您将不会收到任何数据。
至于您没有收到最近生成的数据,这取决于您对生产者的配置。除非生成的消息没有副本,并且基于acks参数,否则消费者可以使用该消息。
例如:如果已将副本设置为3,且acks=all,除非所有复制者都确认其已收到消息,否则消费者将无法使用此消息。
说到这个问题,你怎么知道你是否已经读取了整个分区,如果你的轮询不再给你任何记录(假设其余的都正常工作),那么它表明你已经使用了该主题的所有消息。
我从源主题收到一条消息。然后我将消息分成3个部分,并将每个部分发送到3个不同的主题。现在有2条消息成功传递到第2个主题。但是在发送第3条消息时,我们会收到异常(例如ProducerFencedException|OutOfOrderSequenceException|AuthorizationException|RecordLengthException) 如何回滚/还原前2个主题中的其他2条消息
我很感激你在这方面的帮助。 我正在构建一个ApacheKafka消费者,以订阅另一个已经运行的Kafka。现在,我的问题是,当我的制作人将消息推送到服务器时。。。我的消费者没有收到。。我在打印的日志中得到以下信息: 我不确定我是否遗漏了任何重要的配置。。。但是,我可以使用WireShark看到一些来自我的服务器的消息,但是我的消费者没有消费这些消息。。。。 我的代码是示例消费者示例的精确副本:ht
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:
我们有一个基于spring boot的事务性Kafka制作人!使用的版本如下 spring-boot-starter-父-2.3.0。释放 spring-kafka-2.5.0。释放 我们的kafka(集群)版本是2.1. x! 作为生产者,我们启用了幂等性,定义了事务id前缀,并在事务中执行kafka模板调用。我们还有一个将隔离级别设置为只读的使用者! 现在我们遇到了一个行为,不知道如何推断,
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
我有一个用例,希望在Spring云流应用程序中获得底层的Kafka生产者(KafkaTemplate)。在浏览代码时,我偶然发现了,它有一个方法。然而,它无法自动接线。 此外,如果我直接自动连接,模板将使用默认属性初始化,它将忽略SCSt配置的