当前位置: 首页 > 知识库问答 >
问题:

使Kafka ConcurrentMessageListenerContainer作为批处理工作

支铭晨
2023-03-14

我知道ConcurrentKafkaListenerContainerFactory具有属性“setBatchListener(true)”,但就我对Kafka文档的理解而言,这种类型的工厂配置仅与@KafKalistener注释一起使用。

任何帮助都将不胜感激。

共有1个答案

濮阳君浩
2023-03-14

只需将侦听器更改为实现BatchMessageListener或其子接口之一。容器将自动检测其类型。

请参阅消息侦听器。

@kafkalistener代码使用工厂标志来确定要创建哪种类型的侦听器适配器。

 类似资料:
  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我正在使用hibernate jpa执行批处理更新。 更新:我得到了解决方案:问题是我正在刷新已经刷新的事务,因此它没有给我任何事务正在进行中的错误以及我上面发布的错误,所以我只是删除了`getem().flush();和getEm().clear();从finally块开始工作:)

  • 我正在使用lambda、API网关和Dynamodb 我正在创建一个batchWrite函数,在一个函数中上载25个以上的项目 但我发现,当我上传35个项目时,前25个项目成功上传到dynamodb,但最后10个项目失败,但没有记录任何错误 该函数在node.js中编写 日志如下所示。

  • 知道为什么下面的代码会出现以下错误吗? java.lang.NullPointerException:org.springframework.batch.item.database.HibernateCursoritemReader.DoRead处为null(HibernateCursoritemReader.java:155)

  • 我使用的是spring-kafka“2.1.7.Release”,我试图理解max.poll.interval.ms是如何将AckMode作为批处理而将enable.auto.commit作为“false”工作的。这里是我的消费者设置。 这是我的出厂设置 这是我的消费者,我增加了2分钟的延迟 现在,我发布了5条消息,并观察到它处理了所有记录,没有任何问题。但是,如果我将AckMode设置为记录它在

  • 我在我的应用程序中使用Hibernate ORM和PostgreSQL,有时我使用批处理操作。起初我不明白为什么在批处理大小=25的日志中,会生成25个查询,起初我认为它不能正常工作。但是在那之后,我查看了pg驱动程序的源代码,在Pg语句类中发现了以下几行: 在PgPreparedStatement类中 我注意到,如果批处理的大小变为25,则会发送25个查询并附带参数。 数据库日志证实了这一点,例