当前位置: 首页 > 知识库问答 >
问题:

在@KafkaListener注释的方法中使用反应性网络流量代码

班承德
2023-03-14

我使用SpringKafka实现了一个消费者,它可以读取某个主题的消息。所有这些消息都由它们处理,并通过RESTAPI导出到另一个系统中。为此,代码使用Spring Webflux项目中的WebClient,从而生成反应式代码

  @KafkaListener(topics = "${some.topic}", groupId = "my-group-id")
  public void listenToTopic(final ConsumerRecord<String, String> record) {
    
    // minimal, non-reactive code here (logging, serizializing the string)

    webClient.get().uri(...).retrieve().bodyToMono(String.class)
       // long, reactive chain here
       .subscribe();
  }

现在我想知道这种设置是否合理,或者这是否会导致很多问题,因为来自spring kafka的KafkaListener逻辑本身并不是被动的。我想知道是否有必要用KafkaReactor来代替。我对整个被动世界以及Kafka世界的理解非常有限,但我目前假设上述设置将包含以下内容:

  1. listenToTopic函数几乎会立即返回,因为大部分工作是在反应性链中完成的,这不会阻止函数返回。这意味着,据我所知,KafkaListener逻辑会假设消息在那里被正确处理,所以它可能会承认它,并在某个时候提交它。如果我理解正确,那么这意味着消息的处理可能会出错。当KafkaListener已经获取下一个记录时,工作仍然可以在前一个反应性链中完成。这意味着如果应用程序依赖于以严格的顺序完全处理消息,则上述设置将是错误的。但是如果没有,那么上面的设置就可以了吗?
  2. 上述设置的另一个问题是,如果有大量消息进来,应用程序可能会用工作超载。因为监听器函数几乎立即返回,所以大量的消息可以同时在反应性链内部处理。
  3. @KafkaListener逻辑内置的重试逻辑在这里不会真正工作,因为反应性链内部的异常不会触发它。任何重试逻辑都必须由监听器函数本身内部的反应性代码来处理。
  4. 当使用reer-kafka而不是@KafkaListener注释时,可以更改第1点中描述的行为。因为侦听器现在将被集成到反应性链中,所以只有当反应性链实际完成时才可能确认消息。这样,根据我的理解,下一条消息只有在一条消息通过反应性链被完全处理后才会被提取。这也可能解决第2-4点中描述的问题/行为。

问题是:我对形势的理解正确吗?我错过了这个设置可能导致的其他问题吗?

共有1个答案

吴高畅
2023-03-14

您的理解是正确的;要么切换到非反应性的其余客户端(例如RestTem板),要么为消费者使用reer-kafka

 类似资料:
  • apache camel eip框架有一个支持反应流的组件。 从我可以从留档说,reactive流组件仅在单个JVM中工作,将反应性流从/路由到camel。 在一个应用程序中有一个骆驼生产者,在另一个应用程序中有一个骆驼消费者,通过网络产生和消费反应性流,什么是合适的机制? 我假设需要某种中间件,哪种中间件适合这种情况?

  • 我有一个带有KafkaListener方法的Spring组件: 现在,我想测试这个方法。我想确保此方法正确接收消息。我尝试创建: 但我不知道接下来会发生什么。如何测试此方法?

  • 我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload

  • 当客户端发出请求时,“MonoJust”会打印在第4行,但“test”会在Http响应正文中返回。我知道发布者在订阅之前不会生成数据,那么为什么Http响应包含“test”而不是“MonoJust”?

  • 我在我的Spring Boot应用程序中使用KafkaListener接口,它工作得很好。偏移量由Kafka本身存储。 现在,让我们假设一个主题的消费者部署了一个新版本,并且浪费了2小时的消息。然后,他们修复了应用程序,并希望启动新版本,与两个小时前有所不同。 我可以在以前的consumer.offsetsForTimes()调用中使用consumer.seek(),但这只在使用轮询机制时是直接的

  • 我目前正在使用自定义JWT身份验证进行SpringCloudGateway。身份验证后,我希望使用GlobalFilter将标头中的JWT令牌字符串传递到下游服务: JWT令牌字符串可以通过调用主体来获得。getName(); 我的问题是:我如何实现