当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者无法间歇解析侦听器方法

鲁杜吟
2023-03-14

我一直在Kafka消费者方面面临下面的异常。令人惊讶的是,这个问题不一致,旧版本的代码(具有完全相同的配置,但有一些新的不相关功能)按预期工作。有人能帮助确定是什么导致了这种情况吗?

[ERROR][938f3c68-f481-4224-b2c6-43af5fb27ada-0-C-1][org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:77) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2010) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1854) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1720) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1699) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) [spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.1.jar:2.7.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.mycompany.listener.KafkaBatchListener.onMessage(java.lang.Object,org.springframework.kafka.support.Acknowledgment)]
Bean [com.mycompany.listener.KafkaBatchListener@7a59780b]; nested exception is org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver, failedMessage=GenericMessage [payload=[[B@21bc784f, MyPOJO(), [B@33bb5851], headers={kafka_offset=[4046, 4047, 4048], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4871203f, kafka_timestampType=[CREATE_TIME, CREATE_TIME, CREATE_TIME], kafka_receivedPartitionId=[0, 0, 0], kafka_receivedMessageKey=[[B@295620f1, MyPOJOKey(id=0), [B@5d3d6361], kafka_batchConvertedHeaders=[{myFirstHeader=[B@1f011689, myUUIDHeader=[B@7691bce8, myMetadataHeader=[B@6e585b63, myRequestIdHeader=[B@58c81ba2, myMetricsHeader=[B@4f6aeb6c, myTargetHeader=[B@34677895}, {myUUIDHeader=[B@1848ae39, myMetadataHeader=[B@c5b399, myRequestIdHeader=[B@186c1966, myMetricsHeader=[B@1740692e, myTargetHeader=[B@4a242499}, {myUUIDHeader=[B@67d01f3f, myMetadataHeader=[B@1f0f9d8a, myRequestIdHeader=[B@b928e5c, isLastMessage=[B@6079735b, myMetricsHeader=[B@7b7b18c, myTargetHeader=[B@64378f3d}], kafka_receivedTopic=[my_topic, my_topic, my_topic], kafka_receivedTimestamp=[1623420136620, 1623420137255, 1623420137576], kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@7bc81d89, kafka_groupId=dev-consumer-grp}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2367) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2003) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more
Caused by: org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): Could not resolve parameter [0] in public void com.mycompany.listener.KafkaBatchListener.onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K, V>>,org.springframework.kafka.support.Acknowledgment): No suitable resolver
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:145) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.12.RELEASE.jar:5.2.12.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1983) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1973) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1925) ~[spring-kafka-2.7.1.jar:2.7.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1837) ~[spring-kafka-2.7.1.jar:2.7.1]
    ... 8 more

我的应用程序使用以下内容:

  1. 自定义侦听器类com。我的公司。听众。Kafka巴奇列斯特纳

附加查询:即使设置了ContainerProperties.setOnlyLogRecordMetadata(true),异常堆栈跟踪仍然包含我省略的完整有效负载。知道为什么吗?

提前感谢!

更新:

  1. KafkaBatchListener
package com.mycompany.listener;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

public class KafkaBatchListener<K, V> implements BatchAcknowledgingMessageListener<K, V> {

    @Override
    @com.mycompany.listener.KafkaListener
    public void onMessage(final List<ConsumerRecord<K, V>> consumerRecords, final Acknowledgment acknowledgment) {

        // process batch using MyService<K, V>.process(consumerRecords)
        acknowledgment.acknowledge();
    }
}

package com.mycompany.listener;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Retention(RUNTIME)
@Target(METHOD)
public @interface KafkaListener {

}
package com.mycompany.factory;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;

public class KafkaBatchListenerContainerFactory<K, V>
        extends ConcurrentKafkaListenerContainerFactory<K, V> {

    public KafkaBatchListenerContainerFactory(final DefaultKafkaConsumerFactory<K, V> consumerFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler, final int concurrency) {

        super.setConsumerFactory(consumerFactory);
        super.setBatchErrorHandler(errorHandler);
        super.setConcurrency(concurrency);
        super.setBatchListener(true);
        super.setAutoStartup(true);

        final ContainerProperties containerProperties = super.getContainerProperties();
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
        containerProperties.setOnlyLogRecordMetadata(true);
    }

}
package com.mycompany.errorhandler;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.listener.RecoveringBatchErrorHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

@Component
public class ListenerContainerRecoveringBatchErrorHandler extends RecoveringBatchErrorHandler {

    public ListenerContainerRecoveringBatchErrorHandler(
            @Value("${spring.kafka.consumer.properties.backOffMS:0}") final int backOffTimeMS,
            @Value("${spring.kafka.consumer.properties.retries:3}") final int retries) {

        super(new FixedBackOff(backOffTimeMS, retries));
    }

}
package com.mycompany.config;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import com.mycompany.errorhandler.ListenerContainerRecoveringBatchErrorHandler;
import com.mycompany.factory.KafkaBatchListenerContainerFactory;
import com.mycompany.listener.KafkaBatchListener;

@Configuration
public class KafkaBatchListenerConfigurer<K, V> implements KafkaListenerConfigurer {

    private final List<KafkaBatchListener<K, V>> listeners;
    private final BeanFactory beanFactory;
    private final ListenerContainerRecoveringBatchErrorHandler errorHandler;
    private final int concurrency;

    @Autowired
    public KafkaBatchListenerConfigurer(final List<KafkaBatchListener<K, V>> listeners, final BeanFactory beanFactory,
            final ListenerContainerRecoveringBatchErrorHandler errorHandler,
            @Value("${spring.kafka.listener.concurrency:1}") final int concurrency) {
        this.listeners = listeners;
        this.beanFactory = beanFactory;
        this.errorHandler = errorHandler;
        this.concurrency = concurrency;
    }

    @Override
    public void configureKafkaListeners(final KafkaListenerEndpointRegistrar registrar) {

        final Method listenerMethod = lookUpBatchListenerMethod();

        listeners.forEach(listener -> {
            registerListenerEndpoint(listener, listenerMethod, registrar);
        });
    }

    private void registerListenerEndpoint(final KafkaBatchListener<K, V> listener, final Method listenerMethod,
            final KafkaListenerEndpointRegistrar registrar) {

        // final Map<String, Object> consumerConfig = get ConsumerConfig from a custom provider;
        registrar.setContainerFactory(createContainerFactory(consumerConfig));
        registrar.registerEndpoint(createListenerEndpoint(listener, listenerMethod, consumerConfig));
    }

    private KafkaBatchListenerContainerFactory<K, V> createContainerFactory(final Map<String, Object> consumerConfig) {

        final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);

        final KafkaBatchListenerContainerFactory<K, V> containerFactory = new KafkaBatchListenerContainerFactory<>(
                consumerFactory, errorHandler, concurrency);

        return containerFactory;
    }

    private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(final KafkaBatchListener<K, V> listener,
            final Method listenerMethod, final Map<String, Object> consumerConfig) {

        final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setId(UUID.randomUUID().toString());
        endpoint.setBean(listener);
        endpoint.setMethod(listenerMethod);
        endpoint.setBeanFactory(beanFactory);
        endpoint.setGroupId("my-group-id");
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());

        // final String topicName = get TopicName for this key-value from a custom utility;
        endpoint.setTopics(topicName);

        final Properties properties = new Properties();
        properties.putAll(consumerConfig);
        endpoint.setConsumerProperties(properties);

        return endpoint;
    }

    private Method lookUpBatchListenerMethod() {
        return Arrays.stream(com.mycompany.listener.KafkaBatchListener.class.getMethods())
                .filter(m -> m.isAnnotationPresent(com.mycompany.listener.KafkaListener.class))
                .findAny()
                .orElseThrow(() -> new IllegalStateException(
                        String.format("[%s] class should have at least 1 method with [%s] annotation.",
                                com.mycompany.listener.KafkaBatchListener.class.getCanonicalName(),
                                com.mycompany.listener.KafkaListener.class.getCanonicalName())));
    }

}

共有1个答案

邵繁
2023-03-14

当监听器已经实现了一个消息监听器接口时,您不需要所有调用基础结构的标准方法;与其为每个侦听器注册endpoint,不如从工厂为每个侦听器创建一个容器,并将侦听器添加到容器属性中。

val container = containerFactory.createContainer("topic1");
container.getContainerProperties().set...
...
container.getContainerProperies().setMessageListener(myListenerInstance);
...
container.start();
 类似资料:
  • 这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。

  • 我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。 有人能帮我吗?

  • 我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!

  • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

  • 我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann