我使用SpringKafka实现了一个消费者,它可以读取某个主题的消息。所有这些消息都由它们处理,并通过RESTAPI导出到另一个系统中。为此,代码使用Spring Webflux项目中的WebClient,从而生成反应式代码:
@KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
public void listenToTopic(final ConsumerRecord<String, String> record) {
// minimal, non-reactive code here (logging, serizializing the string)
webClient.get().uri(...).retrieve().bodyToMono(String.class)
// long, reactive chain here
.subscribe();
}
现在我想知道这种设置是否合理,或者这是否会导致很多问题,因为来自spring kafka的KafkaListener逻辑本身并不是被动的。我想知道是否有必要用KafkaReactor来代替。我对整个被动世界以及Kafka世界的理解非常有限,但我目前假设上述设置将包含以下内容:
问题是:我对形势的理解正确吗?我错过了这个设置可能导致的其他问题吗?
您的理解是正确的;要么切换到非反应性的其余客户端(例如RestTem板
),要么为消费者使用reer-kafka
。
apache camel eip框架有一个支持反应流的组件。 从我可以从留档说,reactive流组件仅在单个JVM中工作,将反应性流从/路由到camel。 在一个应用程序中有一个骆驼生产者,在另一个应用程序中有一个骆驼消费者,通过网络产生和消费反应性流,什么是合适的机制? 我假设需要某种中间件,哪种中间件适合这种情况?
我有一个带有KafkaListener方法的Spring组件: 现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建: 但我不知道接下来会发生什么。如何测试此方法?
我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload
当客户端发出请求时,“MonoJust”会打印在第4行,但“test”会在Http响应正文中返回。我知道发布者在订阅之前不会生成数据,那么为什么Http响应包含“test”而不是“MonoJust”?
我在我的Spring Boot应用程序中使用KafkaListener接口,它工作得很好。偏移量由Kafka本身存储。 现在,让我们假设一个主题的消费者部署了一个新版本,并且浪费了2小时的消息。然后,他们修复了应用程序,并希望启动新版本,与两个小时前有所不同。 我可以在以前的consumer.offsetsForTimes()调用中使用consumer.seek(),但这只在使用轮询机制时是直接的
我目前正在使用自定义JWT身份验证进行SpringCloudGateway。身份验证后,我希望使用GlobalFilter将标头中的JWT令牌字符串传递到下游服务: JWT令牌字符串可以通过调用主体来获得。getName(); 我的问题是:我如何实现