当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka批处理侦听器没有接收到超过1或2条消息

穆理
2023-03-14

Spring Kafka批处理消费者只能收到一两条消息,我们已经将fetch.min.bytes增加到9000

即使在增加值后,我们只收到1或2条消息,我们是否还需要增加fetch.min.bytes和fetch.max.wait.ms的值,或者我们是否需要添加任何其他配置,或者我们是否需要减少最大轮询记录大小?在本地环境中,我们收到10条消息,但在AWS MSK集群中,我们收到1或2条消息

消费者配置信息日志:

2022-05-13 16:15:48.118,"2022-05-13 16:15:48.117 [,] INFO main org.apache.kafka.clients.consumer.ConsumerConfig361  ConsumerConfig values: "
2022-05-13 16:15:48.118,    allow.auto.create.topics = true
2022-05-13 16:15:48.118,    auto.commit.interval.ms = 5000
2022-05-13 16:15:48.118,    auto.offset.reset = latest
2022-05-13 16:15:48.118,"   bootstrap.servers = [xyz.amazonaws.com:yyyy, xxxxxyzxx.amazonaws.com:yyyy, xxxxxxzzz.amazonaws.com:yyyy]"
2022-05-13 16:15:48.118,    check.crcs = true
2022-05-13 16:15:48.118,    client.dns.lookup = use_all_dns_ips
2022-05-13 16:15:48.118,    client.id = consumer-consumer.group.qa-5
2022-05-13 16:15:48.118,    client.rack = 
2022-05-13 16:15:48.118,    connections.max.idle.ms = 540000
2022-05-13 16:15:48.118,    default.api.timeout.ms = 60000
2022-05-13 16:15:48.118,    enable.auto.commit = false
2022-05-13 16:15:48.118,    exclude.internal.topics = true
2022-05-13 16:15:48.118,    fetch.max.bytes = 52428800
2022-05-13 16:15:48.118,    fetch.max.wait.ms = 5000
2022-05-13 16:15:48.118,    fetch.min.bytes = 9000
2022-05-13 16:15:48.118,    group.id = consumer.group.qa
2022-05-13 16:15:48.118,    group.instance.id = null
2022-05-13 16:15:48.118,    heartbeat.interval.ms = 3000
2022-05-13 16:15:48.118,    interceptor.classes = []
2022-05-13 16:15:48.118,    internal.leave.group.on.close = true
2022-05-13 16:15:48.118,    internal.throw.on.fetch.stable.offset.unsupported = false
2022-05-13 16:15:48.118,    isolation.level = read_uncommitted
2022-05-13 16:15:48.118,    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2022-05-13 16:15:48.118,    max.partition.fetch.bytes = 1048576
2022-05-13 16:15:48.118,    max.poll.interval.ms = 300000
2022-05-13 16:15:48.118,    max.poll.records = 200
2022-05-13 16:15:48.118,    metadata.max.age.ms = 300000

共有1个答案

萧晔
2023-03-14

你能提供以下信息吗?

  • MAX TPS(在主题中生成事件时)
  • 每个事件的最大消息大小。

这些是理想的使用者配置值。

enable.auto.commit = false
auto.commit.interval.ms = 5000 <<Upto the Client, but this is ideal, if you enable Auto Commit>>
connections.max.idle.ms = 540000
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
heartbeat.interval.ms = 3000
max.poll.interval.ms = <<5 Sec is ideal>>
max.poll.records = <<Upto the Client>>
session.timeout.ms = 10000
 类似资料:
  • 我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本 我的场景是以下一个: 所以我写了folloqing代码 生产者微服务 spring kafka配置: 在制作人方面所有的工作都很好。我能创造话题和发送信息。 消费者微服务 动态侦听器类 当我在生产者端发送消息时,我可以看到以下日志: 在消费者方面,我没有看到任何信息。我只看到下面的指纹: 谁能告诉我我错在哪

  • 我有一个自定义跳过管理步骤。我定义了一个跳过策略,其源代码如下: 我的Skip侦听器如下: 我的步骤定义如下: 我想跳过一个约束冲突异常。但是,不会调用侦听器或跳过策略。

  • 我正在使用以下配置的批处理侦听,但我的消息反序列化错误: 在yml中: 使用上面的方法,我轮询了5条消息,但收到了超过100条消息,当我选中它时,将列表中的一条消息反序列化为多条消息。 我检查了我的投票配置没有工作。有人能给我建议解决办法吗 以下是我的日志:

  • 我在Android上制作了一个客户端-服务器应用程序的简单原型 我设法将两个客户端连接到服务器上,服务器可以接收它们的消息。现在的问题是,我似乎无法向其他客户端广播/接收消息。 下面是Server类的完整代码 下面是Client类的完整代码

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我的EJBTest有问题。 我安装了WildFly并配置了用户管理和应用程序管理。 我编写了一个EJB 3.0并进行了部署: 之后,我编写了一个简单的客户端来连接它: 用户名和密码都是应用程序用户凭据,而不是管理!对吗? 我收到以下错误: 线程“main”java中出现异常。lang.IllegalStateException:EJBClient00025:没有可用于处理调用上下文组织的[appN