当前位置: 首页 > 知识库问答 >
问题:

使用Spring云流测试错误流

云韬
2023-03-14

我已经使用Spring云流启动了一个小型微服务。

我只有两个流绑定,如下所示:

 cloud:
    stream:
      bindings:
        channelone:
          destination: org.queue.app.EventsOne
          contentType: application/json
          group: app
        channeltwo:
          destination: org.queue.app.EventsTwo
          contentType: application/json
          group: app

我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方:


@Autowired
@Qualifier(Channels.EVENTS_ONE_CHANNEL)
SubscribableChannel eventsOneChannel

@Autowired
@Qualifier(Channels.EVENTS_TWO_CHANNEL) 
SubscribableChannel eventsTwoChannel

哪里:

Channels.EVENTS_ONE_CHANNEL and EVENTS_TWO_CHANNEL 

只是定义为字符串常量:

@UtilityClass
public class Channels {
    public static final String EVENTS_ONE_CHANNEL= "channelone";
    public static final String EVENTS_TWO_CHANNEL= "channeltwo";
}

组件测试模块导入依赖项:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
</dependency>

我发送的信息如下:

eventsOneChannel.send(someMessage)

快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。

这是一个监听器的例子:

@StreamListener(Channels.EVENTS_ONE_CHANNEL)
@SendTo(Channels.DTO_GENERATED)
public BonusDTO receive(Message<String> message) {
    try {
        log.info("Received Event event with payload [{}]", message.getPayload());
        return toDto(message.getPayload());
    } catch (Exception ex) {
        log.error("Error converting Event to DTO", ex);
        throw new EventHandlingException(ex);
    }
}

从try/catch引发异常时,错误由服务激活器处理:

@ServiceActivator(inputChannel = "org.queue.app.EventsOne.app.errors")
public void handle(ErrorMessage errorMessage) {
   log.info("Error");
}

在没有Spring-Cloud-Stream测试的情况下运行应用程序时,如果出现处理消息的错误,则会触发之前的服务激活并处理该错误。

但是,在测试期间不会发生同样的事情。使用spring-cloud-stream-test,当侦听器抛出异常时,不会调用从错误通道激活的服务。

我还想测试错误流。

这是Spring-Cloud-Stream测试的限制吗?在使用spring cloud stream测试时,是否有将错误消息发送到错误通道的配置(技巧或提示)?

谢谢

共有2个答案

高运诚
2023-03-14

spring cloud stream测试支持一个非常基本的测试绑定器;它没有真正活页夹的所有功能。

梁烨烨
2023-03-14

@Joao Pereira我认为这里有一个更大的问题,所以我会试着把它摆出来,希望能提供一些清晰

>

  • 使用ServiceActivator注释错误处理方法的能力是框架提供的合同,这意味着它的测试是我们的责任。此外,您使用的机制甚至不是来自Spring Cloud Stream,而是来自Spring Integr。但无论如何,我质疑应用程序是否应该测试它,因为您不能在应用程序级别以任何方式影响它,因为它不是您的功能。同样,这是我的观点,我很想看看你的想法。

    春云流3.0.0。RC1(及其后续版本)我们实际上不支持Spring-Cloud-Stream测试支持,而支持Gary提到的新测试绑定。我刚才提供的链接中记录了这样做的原因,但请随时提出问题。虽然它的用法有很好的文档记录,但这里是我们自己使用它的一个测试用例,供您参考。尽管ref文档中的示例显示了基于函数的消息处理程序,但它与基于注释的消息处理程序(您正在使用)的工作方式相同。

    谈到基于注释的编程模型,请参阅我们刚刚发布的以下博客(在工作中可以查看更多),其中我们阐述了为什么我们要放弃基于注释的编程模型,我认为您也应该开始考虑更改代码。毕竟,所有的更改都相当于删除所有注释,并稍微更改消息处理程序方法的签名,以表示为函数bean

    • https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
    • https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive
    • https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration

    我说这些的原因很多,但您上面的代码和您表达的担忧再次提醒我,为什么我们要远离这种编程模型。

    我将在这里停下来,因为我相信这里有很多东西需要消化,但鉴于我刚才所说的内容,请随时提出更尖锐的问题。

  •  类似资料:
    • 一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.

    • 我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。 application.yaml: 现在,我正在处理错误,有两个问题: > 我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服

    • 我设置了一个Spring云流Kafka制作人和消费者,有3个Kafka经纪人在运行。我已经设置了min.insync。将副本复制到4,以查看生产者错误处理的工作方式。消息通道。send(发送) 以上是我的生产者配置。虽然retries设置为3,但生产者仍会多次重试。虽然sync设置为true,但发送呼叫会立即发出。虽然定义了错误通道和目标,并且将errorChannelEnabled设置为true

    • 我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedK

    • 我写了一个jUnit测试,但我在执行它时遇到了错误: 我的应用程序是这样配置的: 我的测试: 我的上下文配置: 我的网络。xml配置: 根据网络上的不同教程,配置似乎很好,但我不知道错误来自哪里。 我认为TestDao是一个上下文。我的测试类没有很好地实现xml,因为当我将上下文文件移动到另一个项目时,错误根本不会改变。此外,我测试了许多不同的方法来配置@ContextConfiguration,