spring.kafka.listener.type
可以看到下面的代码,我们是使用默认的监听器模式(spring.kafka.listener.type: single),接口参数的时候使用的是单个User对象(一条消息记录),而不是List集合对象。
spring.kafka.listener.type: single
我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。
我很难理解如何解决这个问题,所以我在这里问这个问题,希望其他人已经面临同样的问题。我们正在以手动确认模式运行@KafkaListener,死信恢复程序的重试限制为3。由于业务逻辑,在特定情况下(外部依赖),我们不确认消息并暂停消费5分钟,因此需要手动确认模式。 此外,我们确实需要由于某种原因无法处理的消息的死信队列。 现在,在手动确认模式下的问题是,当侦听器/消费者达到重试限制并将其移动到dl队列
我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用
我有一个2分区的主题。我在我的消费者应用程序中使用kafka批处理监听器模式。因为我使用的是单个消费者应用程序,所以我将从两个分区接收消息。一旦消费者应用程序处理了这些消息列表,我希望手动提交每个分区的最大偏移量。 如果我使用MANUAL_IMMEDIATE模式,它会提交每个分区的最高偏移量吗?如果不是,我应该使用什么方法?
我正在使用Spring靴和活动MQ设置一个持久的JMS主题使用者。我能够使用Spring靴@JmsListener注释来使一切正常工作(作为耐用消费者成功运行)。但是,因为我想动态创建侦听器,所以我尝试使用 JmsListener 配置接口来创建它们。 使用主题“消费者”下面的代码可以成功地创建和使用消息。但是,问题是它创造的消费者并不持久。我在工厂中将clientId、setSubscripti
我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack
高级使用者 API 似乎一次读取一条消息。 如果消费者想要处理这些消息并提交给其他下游消费者(如Solr或Elastic-Search ),这可能会给他们带来很大的问题,因为他们更喜欢批量接收消息,而不是一次接收一条。 在内存中批处理这些消息也并非易事,因为只有当批处理已经提交时,Kafka中的偏移量也需要同步,否则具有未提交下游消息的崩溃的 kafka 使用者(如在Solr或ES中)将已经更新其
我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow