我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点
@PostConstruct
public void init() {
myProps= generateTopicDynamically();
}
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject) {
//Do something with my event
}
它工作得非常好。
主要问题是当我想添加另一个注释时,它会触发某些方面的编程
@MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload MyObject myObject)
这里是aspect类
@Aspect
@Configuration
@Slf4j
public class MyCustomAnnotationToRecordPerformanceAspect {
@Pointcut("@annotation(MyCustomAnnotationToRecordPerformance)")
public void annotationMyCustomAnnotationToRecordPerformance() {
}
@Around("annotationMyCustomAnnotationToRecordPerformance()")
public Object doSomething(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return proceedingJoinPoint.proceed();
}
}
我之所以有这个问题,是因为Spring试图在调用@PostConstruct之前解决侦听器。
Caused by: java.lang.IllegalArgumentException: @KafKaListener can't resolve 'null' as a String
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveAsString(KafkaListenerAnnotationBeanPostProcessor.java:648)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveTopics(KafkaListenerAnnotationBeanPostProcessor.java:520)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processListener(KafkaListenerAnnotationBeanPostProcessor.java:419)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:370)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:298)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:431)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
... 41 common frames omitted
目前,我试图延迟管理@KafkaListener的处理器,但我无法找到我可以在不重新定义完全Kafka配置的情况下改变它的地方
@enableKafka导入KafkaListenerConfigurationSelector,这是DeferredConfigurationSelector。
下面是对这门课的评论
A {@link DeferredImportSelector} implementation with the lowest order to import a {@link KafkaBootstrapConfiguration} as late as possible.
因此,根据评论,我认为它已经尽可能推迟了
我用@Transactional测试它,我也有同样的问题。
@Transactional
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject)
你知道吗?
我现在看到的唯一选择是将我的类拆分为2个,并创建2个bean。KafkaListener方法调用另一个bean。但我觉得这样做很奇怪。
提前感谢你的帮助。
多亏了加里的帮助,我找到了解决办法。一旦我们有了方面,类就会被代理,CGLIB对象中的属性就会变为null。我们需要调用getter以获得来自原始对象的值,而不是来自代理对象的值
SpEL能够读取将在原始对象上执行的公共getter,而不是CGLIB对象
因此,解决方案就是为我的私人计算机创建一个公共getter
public String getMyProps(){
return this.myProps;
}
谢谢大家。
我刚刚用@Transactional
对它进行了测试,它对我来说就像预期的那样工作-我已经确认,当我们到达@KafkaListener
注释BPP时,我们已经有了一个CGLIB代理。。。
@SpringBootApplication
@EnableTransactionManagement
public class So69817946Application {
public static void main(String[] args) {
SpringApplication.run(So69817946Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69817946").partitions(1).replicas(1).build();
}
}
@Component
class listener {
public String getTopic() {
return "so69817946";
}
@Transactional
@KafkaListener(id = "so69817946", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Component
class TM extends AbstractPlatformTransactionManager {
@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}
}
so69817946: partitions assigned: [so69817946-0]
我可以在调用堆栈中看到事务拦截器。
所以,是的,MCVE会很有帮助。
下面有一个类似的问题: 一个Spring的Kafka消费者听众能听多个话题吗? 现在我明白了,我可以为KafkaListener注释的topics参数提供一个字符串数组,但是我想知道以下几点: 如何从属性文件中获取主题名称作为字符串数组? 从多个主题中读取如何影响偏移?客户(SpringKafka)会保持每个主题的补偿吗?
我在我的Spring Boot应用程序中使用KafkaListener接口,它工作得很好。偏移量由Kafka本身存储。 现在,让我们假设一个主题的消费者部署了一个新版本,并且浪费了2小时的消息。然后,他们修复了应用程序,并希望启动新版本,与两个小时前有所不同。 我可以在以前的consumer.offsetsForTimes()调用中使用consumer.seek(),但这只在使用轮询机制时是直接的
我有一个带有KafkaListener方法的Spring组件: 现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建: 但我不知道接下来会发生什么。如何测试此方法?
我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?
我使用SpringKafka实现了一个消费者,它可以读取某个主题的消息。所有这些消息都由它们处理,并通过RESTAPI导出到另一个系统中。为此,代码使用Spring Webflux项目中的WebClient,从而生成反应式代码: 现在我想知道这种设置是否合理,或者这是否会导致很多问题,因为来自spring kafka的KafkaListener逻辑本身并不是被动的。我想知道是否有必要用KafkaR
我正在使用spring boot 2.1.7.Release和spring-kafka 2.2.7.Release.并且我正在使用@KafKalistener注释来创建一个消费者,并且我正在使用消费者的所有默认设置。 根据apache kafka文档,'max.poll.records'的默认值是500。 在这里,我试图理解spring是如何处理记录处理的。现在我的问题是,如果我们已经在主题a上发