当前位置: 首页 > 知识库问答 >
问题:

在Spring Boot应用程序中实现反应式Kafka侦听器

阙俊友
2023-03-14

我试图在我的Spring Boot应用程序中实现反应性kafka消费者,我正在看这些例子:https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

看起来在被动Kafka中还没有对Spring的支持

我理解SpringKafka监听器如何在非反应式KafkaAPI中工作:最简单的解决方案是为ConcurrentKafkaListenerContainerFactory和ConsumerFactory配置bean,然后使用@KafkaListener annotation,瞧

但我不知道如何正确使用SpringKafka。

基本上我需要一个听众来讨论这个话题。我应该创建自己的循环或调度程序吗?或者我错过了什么。任何人都可以分享他们的知识和最佳实践吗?

共有1个答案

微生嘉
2023-03-14

我还没有现成的解决方案,但我正在尝试这个(Kotlin代码,Spring Boot)。有人在这里发布了部分代码片段https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

查看其他堆栈溢出问题。虽然不多,但可能会给你一些想法

  • 使用onErrorResume来处理问题负载发布到Kafka使用ReactorKafka
  • 反序列化异常后继续消耗kafkaReactor中的后续记录
 类似资料:
  • 我试图在我的基于微服务的Spring启动应用程序中实现普罗米修斯,部署在weblogic服务器上。作为POC的一部分,我已经将配置作为一场战争的一部分。为了启用它,我在下面设置了配置- 应用属性 格拉德尔- 但执行器请求被现有的拦截器阻止。它要求在特定于我们项目的标题中传递值。通过postman(http:localhost:8080/abc/activator/prometheus),我可以测试

  • 反应式编程与在事件监听器(鼠标、键)中调用函数有何不同,因为两者都是异步事件流,那么反应式编程相对于传统事件监听器调用有何优势?

  • 问题内容: 我的应用程序中有一个通知表的Firebase 事件侦听器,当该应用程序在后台时,我想触发推送通知。 这是监听器: 当应用程序位于前台时,这非常有用。然后,在我的应用程序委托方法中添加一个后台观察器,以在后台对其进行观察: 但是当应用程序在后台运行时,事件观察器不会触发。Ive调查了Firebase Cloud Messenger以解决此问题,并遇到了类似这样的帖子: 是否可以使用Fir

  • 本文向大家介绍Vue3 响应式侦听与计算的实现,包括了Vue3 响应式侦听与计算的实现的使用技巧和注意事项,需要的朋友参考一下 响应式侦听和计算 有时我们需要依赖于其他状态的状态——在 Vue 中,这是用组件 计算属性 处理的,以直接创建计算值,我们可以使用 computed 方法:它接受 getter 函数并为 getter 返回的值返回一个不可变的响应式 ref 对象。 我们先来看看一个简单的

  • 我试图在SpringMVC中运行SpringBoot应用程序,在SpringMVCPOM中添加SpringBoot应用程序依赖项,并扫描SpringBoot包,但我面临以下问题

  • 我正在尝试使用public void onLocationChanged(Location-Location)制作一个应用程序,在谷歌地图上跟踪用户的路径,但每次我尝试实现位置侦听器时,我的应用程序都会崩溃。这是我的密码。。如果您看到任何需要修复的问题,请告诉我。谢谢 Android清单文件 这是我的地图活动。java文件