当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka背压

葛海阳
2023-03-14

如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流?

@Bean
public Function<Flux<String>, Flux<String>> processor() {
    return flux -> flux.delayElements(Duration.ofSeconds(1));
}

如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。

有可能在春雨云流中使用cunsumer上的BackPereSure吗?

共有1个答案

仲涵亮
2023-03-14

你不能。背压意味着消息的生产者(从代理中退出队列)能够感知背压,而在本例中则不然。它依赖于Spring集成框架中的入站通道适配器。为了支持背压,这些消息代理需要真正的反应式API/方法,例如RabbitMQ中有一些倡议

 类似资料:
  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 如何使用Spring Cloud Stream Kafka Binder为生产者启用压缩(例如GZIP)?

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。

  • 是否可以为Apache Kafka的Spring Cloud DataFlow配置身份验证?在哪里可以看到示例? 谢谢你。