当前位置: 首页 > 知识库问答 >
问题:

KafkaListener和u listener@KafkaListener无法使用方面注释解析“null”

岳俊雅
2023-03-14

我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点

@PostConstruct
public void init() {
    myProps= generateTopicDynamically();
}

@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject) {
       //Do something with my event
}

它工作得非常好。

主要问题是当我想添加另一个注释时,它会触发某些方面的编程

@MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload MyObject myObject)

这里是aspect类

@Aspect
@Configuration
@Slf4j
public class MyCustomAnnotationToRecordPerformanceAspect {

    @Pointcut("@annotation(MyCustomAnnotationToRecordPerformance)")
    public void annotationMyCustomAnnotationToRecordPerformance() {
    }

    @Around("annotationMyCustomAnnotationToRecordPerformance()")
    public Object doSomething(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return proceedingJoinPoint.proceed();
    }
}

我之所以有这个问题,是因为Spring试图在调用@PostConstruct之前解决侦听器。

Caused by: java.lang.IllegalArgumentException: @KafKaListener can't resolve 'null' as a String
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveAsString(KafkaListenerAnnotationBeanPostProcessor.java:648)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveTopics(KafkaListenerAnnotationBeanPostProcessor.java:520)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processListener(KafkaListenerAnnotationBeanPostProcessor.java:419)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:370)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:298)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:431)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
    ... 41 common frames omitted

目前,我试图延迟管理@KafkaListener的处理器,但我无法找到我可以在不重新定义完全Kafka配置的情况下改变它的地方

@enableKafka导入KafkaListenerConfigurationSelector,这是DeferredConfigurationSelector。

下面是对这门课的评论

A {@link DeferredImportSelector} implementation with the lowest order to import a {@link KafkaBootstrapConfiguration} as late as possible.

因此,根据评论,我认为它已经尽可能推迟了

我用@Transactional测试它,我也有同样的问题。

@Transactional 
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject)

你知道吗?

我现在看到的唯一选择是将我的类拆分为2个,并创建2个bean。KafkaListener方法调用另一个bean。但我觉得这样做很奇怪。

提前感谢你的帮助。

共有2个答案

万俟皓
2023-03-14

多亏了加里的帮助,我找到了解决办法。一旦我们有了方面,类就会被代理,CGLIB对象中的属性就会变为null。我们需要调用getter以获得来自原始对象的值,而不是来自代理对象的值

SpEL能够读取将在原始对象上执行的公共getter,而不是CGLIB对象

因此,解决方案就是为我的私人计算机创建一个公共getter

public String getMyProps(){
    return this.myProps;
}

谢谢大家。

穆文斌
2023-03-14

我刚刚用@Transactional对它进行了测试,它对我来说就像预期的那样工作-我已经确认,当我们到达@KafkaListener注释BPP时,我们已经有了一个CGLIB代理。。。

@SpringBootApplication
@EnableTransactionManagement
public class So69817946Application {

    public static void main(String[] args) {
        SpringApplication.run(So69817946Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69817946").partitions(1).replicas(1).build();
    }

}

@Component
class listener {

    public String getTopic() {
        return "so69817946";
    }

    @Transactional
    @KafkaListener(id = "so69817946", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Component
class TM extends AbstractPlatformTransactionManager {

    @Override
    protected Object doGetTransaction() throws TransactionException {
        return new Object();
    }

    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
    }

    @Override
    protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
    }

}
so69817946: partitions assigned: [so69817946-0]

我可以在调用堆栈中看到事务拦截器。

所以,是的,MCVE会很有帮助。

 类似资料:
  • 下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?

  • 我在我的Spring Boot应用程序中使用KafkaListener接口,它工作得很好。偏移量由Kafka本身存储。 现在,让我们假设一个主题的消费者部署了一个新版本,并且浪费了2小时的消息。然后,他们修复了应用程序,并希望启动新版本,与两个小时前有所不同。 我可以在以前的consumer.offsetsForTimes()调用中使用consumer.seek(),但这只在使用轮询机制时是直接的

  • 我有一个带有KafkaListener方法的Spring组件: 现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建: 但我不知道接下来会发生什么。如何测试此方法?

  • 我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?

  • 我使用SpringKafka实现了一个消费者,它可以读取某个主题的消息。所有这些消息都由它们处理,并通过RESTAPI导出到另一个系统中。为此,代码使用Spring Webflux项目中的WebClient,从而生成反应式代码: 现在我想知道这种设置是否合理,或者这是否会导致很多问题,因为来自spring kafka的KafkaListener逻辑本身并不是被动的。我想知道是否有必要用KafkaR

  • 我正在使用spring boot 2.1.7.Release和spring-kafka 2.2.7.Release.并且我正在使用@KafKalistener注释来创建一个消费者,并且我正在使用消费者的所有默认设置。 根据apache kafka文档,'max.poll.records'的默认值是500。 在这里,我试图理解spring是如何处理记录处理的。现在我的问题是,如果我们已经在主题a上发