下面是我对kafka侦听器的方法定义,如果接收空或空的有效负载字符串,我想我会得到下面的错误...你能帮帮我吗。
@KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}
[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)
您需要指定有效负载不是必需的。
@Payload(required = false) String payload, ...
我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。 有人能帮我吗?
我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann
我在学Kafka春靴。我想在我的consumer类中添加第二个consumer,它订阅了与第一个主题相同的主题,但具有不同的groupID。这些类不是很复杂,当我只有第一个消费Json的消费者(或者至少输出是Json?)时,它们就可以工作。还有一点需要说明的是,我从一个生产者和消费者开始,他们使用@EnableBindings方法,但这种方法不受欢迎,所以我正在学习正确/新的方法。 任何提示!请让
问题内容: 我想监听方法调用,以便在调用周围动态附加附加行为。我已经在带有自定义批注和运行器的JUnit方法上完成了此操作。我正在尝试在标准的Java应用程序上执行此操作。 主要思想是: 这只是一个抽象的想法,我没有任何具体的示例,但是我很好奇Java中是否可能。 我发现了一些方法: 其中仅为接口定义了代理(但不用于装饰具体类)。似乎与注入模式相似,这是一个不同的关注点。 这里的另一种方法是将工厂
问题内容: 我正在使用spring amqp rabbitmq,并使用 我的消息处理程序是 我已经用钩住了 但是我变得无法执行 生产者端的replyHandler也有类似的问题 另外,如果importExchange中有任何异常,如何在ReplyHandler中获取异常? 问题答案: 由于您正在使用POJO收听消息,因此您无法处理消息。 您应该接受消息正文(在您的情况下),并返回适合于回复消息正文