当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者接收相同消息

南宫正阳
2023-03-14

我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。

需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。

那么,我必须在哪里修复配置

  • 生产者级别,以便它不重新发送
  • 或者生产者不参与重新发送,而是代理-Kafka服务器,因此消费者必须在处理之前确认消息。

我的制作人具有以下属性:

<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
       <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
       <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />

我的消费者配置看起来像

<endpoint id="apolloKafkaJanitorEventListenerURI"
            uri="kafka:${kafka.bootstrap.servers}?topic=${apollo.janitor.event.topic}&amp;
                                                        groupId=${apollo.janitor.event.group.id}&amp;
                                                        consumersCount=${apollo.janitor.event.consumer.count}&amp;
                                                        consumerRequestTimeoutMs=${eventConsumerRequestTimeoutMs}&amp;
                                                        sessionTimeoutMs=${eventConsumerSessionTimeoutMs}&amp;
                                                        maxPartitionFetchBytes=${eventConsumerMaxPartitionFetchBytes}" />

我已经谷歌没有找到任何相关的问题。在生产者和使用者上找到“acks=0”属性如下。我还没有测试,但我想先看看我是否在正确的轨道上

KafkaManualCommit manual =
       exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
   manual.commitSync();

https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/docs/kafka-component.adoc

共有1个答案

时同
2023-03-14

问题可能出在生产者端。您可能不需要检查生产者是否正在重新发送消息。您可以使用日志记录语句。也可以对kafka Producer使用一次语义。您只需要为相同的添加一个额外的属性。

另一种可能是您的消费者没有提交抵消。在这一端你可能也需要做一些头脑Storm

 类似资料:
  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条

  • 我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有

  • Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)

  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • 我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置: 制片人: 生产记录设置为 消费者: 对于Java代码: 少了什么?