当前位置: 首页 > 知识库问答 >
问题:

无输出的Spring云流反应侦听器

沈琛
2023-03-14

我使用的是Reactive Spring Cloud Stream,在创建没有输出的StreamListener时遇到了困难。只要没有收到格式错误的消息,下面的代码就可以工作。当收到格式错误的消息时,流量将关闭。

@StreamListener
public void handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
    payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}

如果我理解正确的话,最好让框架订阅流量,而不是手动订阅流量。当侦听器有输出时,这不是一个问题,因为我可以简单地返回流量,如下所示:

@StreamListener
@Output(MessagingConfig.OUTPUT)
public Flux<String> handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
    return payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave));
}

框架似乎以一种在返回时不会关闭流量的方式处理坏消息。当侦听器没有指定输出时,是否有任何方法可以让框架处理流量?

共有1个答案

龙永福
2023-03-14

考虑切换到使用我们最近采用的Spring Cloud Function(SCF)编程模型。基本上,只要您有最新的代码库(2.1.0.RC4是最新的,几天后发布)就可以了。下面是使用SCF编程模型的代码示例:

@SpringBootApplication
@EnableBinding(Sink.class)
public class SampleReactiveConsumer {

    public static void main(String[] args) {
        SpringApplication.run(SampleReactiveConsumer.class, 
                   "--spring.cloud.stream.function.definition=consume");
    }

    @Bean
    public Consumer<Flux<String>> consume(){
        return payloads -> payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
    }
}

您还可以从类路径中删除reactive模块,因为我们也在考虑一起反对它

 类似资料:
  • 我应该说我对spring-cloud-sleuth和Zipkin的简单实用印象深刻。 然而,我正在研究一个POC,我正在考虑为其开发反应工具包。VertX3是我列表中第一个尝试的项目(使用spring cloud生态系统)。我想知道Sleuth日志跟踪是否可以在反应上下文中工作,因为我猜它依赖于ThreadLocals在上下文中传递?渴望了解Sleuth在反应性环境中的地位。

  • spring cloud stream如何将多个Kafka分区分配给属于同一消费者组的反应流? 我注意到,如果我使用普通的非反应流侦听器,每个线程将被分配到一个分区,这取决于使用者并发配置。然而,在流(流量输入)的情况下,我没有注意到任何这样的并行行为。似乎只定义了一个流来处理来自所有分区的消息。 我的期望是每个Kafka主题分区都有独立的流,即使在由不同线程备份的同一节点上也是如此。

  • 嘿,当我运行应用程序时,它会给出一个错误java.lang.IllegalArgumentException:Invalid listener:null,这说明侦听器是空的。我是初学者,所以请大家帮忙解决这个问题。在这一行中出现错误:LocationManager.RequestLocationUpdates(provider,2000,0,locationListener);//这里是我的示例代

  • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

  • 我试图在我的Spring Boot应用程序中实现反应性kafka消费者,我正在看这些例子:https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java 看起来在被动Kafka中还没有对Spring的

  • 问题内容: 我下面发布的示例代码显示了两个类。一个实现KeyListener,另一个实现Runnable,并在每20 ms休眠的无限循环中运行。当按下键时,采用int形式的keyChar用作索引,设置布尔数组的索引true或false,表示是否按下了该键。同时,过程循环正在键数组中搜索其true或false值,并将true设置为false,然后打印出char。我的问题是我是否需要使用带锁的同步来访