我已经使用Spring云流启动了一个小型微服务。
我只有两个流绑定,如下所示:
cloud:
stream:
bindings:
channelone:
destination: org.queue.app.EventsOne
contentType: application/json
group: app
channeltwo:
destination: org.queue.app.EventsTwo
contentType: application/json
group: app
我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方:
@Autowired
@Qualifier(Channels.EVENTS_ONE_CHANNEL)
SubscribableChannel eventsOneChannel
@Autowired
@Qualifier(Channels.EVENTS_TWO_CHANNEL)
SubscribableChannel eventsTwoChannel
哪里:
Channels.EVENTS_ONE_CHANNEL and EVENTS_TWO_CHANNEL
只是定义为字符串常量:
@UtilityClass
public class Channels {
public static final String EVENTS_ONE_CHANNEL= "channelone";
public static final String EVENTS_TWO_CHANNEL= "channeltwo";
}
组件测试模块导入依赖项:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
</dependency>
我发送的信息如下:
eventsOneChannel.send(someMessage)
快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。
这是一个监听器的例子:
@StreamListener(Channels.EVENTS_ONE_CHANNEL)
@SendTo(Channels.DTO_GENERATED)
public BonusDTO receive(Message<String> message) {
try {
log.info("Received Event event with payload [{}]", message.getPayload());
return toDto(message.getPayload());
} catch (Exception ex) {
log.error("Error converting Event to DTO", ex);
throw new EventHandlingException(ex);
}
}
从try/catch引发异常时,错误由服务激活器处理:
@ServiceActivator(inputChannel = "org.queue.app.EventsOne.app.errors")
public void handle(ErrorMessage errorMessage) {
log.info("Error");
}
在没有Spring-Cloud-Stream测试的情况下运行应用程序时,如果出现处理消息的错误,则会触发之前的服务激活并处理该错误。
但是,在测试期间不会发生同样的事情。使用spring-cloud-stream-test,当侦听器抛出异常时,不会调用从错误通道激活的服务。
我还想测试错误流。
这是Spring-Cloud-Stream测试的限制吗?在使用spring cloud stream测试时,是否有将错误消息发送到错误通道的配置(技巧或提示)?
谢谢
spring cloud stream测试支持一个非常基本的测试绑定器;它没有真正活页夹的所有功能。
@Joao Pereira我认为这里有一个更大的问题,所以我会试着把它摆出来,希望能提供一些清晰
>
使用ServiceActivator注释错误处理方法的能力是框架提供的合同,这意味着它的测试是我们的责任。此外,您使用的机制甚至不是来自Spring Cloud Stream,而是来自Spring Integr。但无论如何,我质疑应用程序是否应该测试它,因为您不能在应用程序级别以任何方式影响它,因为它不是您的功能。同样,这是我的观点,我很想看看你的想法。
春云流3.0.0。RC1(及其后续版本)我们实际上不支持Spring-Cloud-Stream测试支持,而支持Gary提到的新测试绑定。我刚才提供的链接中记录了这样做的原因,但请随时提出问题。虽然它的用法有很好的文档记录,但这里是我们自己使用它的一个测试用例,供您参考。尽管ref文档中的示例显示了基于函数的消息处理程序,但它与基于注释的消息处理程序(您正在使用)的工作方式相同。
谈到基于注释的编程模型,请参阅我们刚刚发布的以下博客(在工作中可以查看更多),其中我们阐述了为什么我们要放弃基于注释的编程模型,我认为您也应该开始考虑更改代码。毕竟,所有的更改都相当于删除所有注释,并稍微更改消息处理程序方法的签名,以表示为函数bean
我说这些的原因很多,但您上面的代码和您表达的担忧再次提醒我,为什么我们要远离这种编程模型。
我将在这里停下来,因为我相信这里有很多东西需要消化,但鉴于我刚才所说的内容,请随时提出更尖锐的问题。
一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.
我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。 application.yaml: 现在,我正在处理错误,有两个问题: > 我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服
我设置了一个Spring云流Kafka制作人和消费者,有3个Kafka经纪人在运行。我已经设置了min.insync。将副本复制到4,以查看生产者错误处理的工作方式。消息通道。send(发送) 以上是我的生产者配置。虽然retries设置为3,但生产者仍会多次重试。虽然sync设置为true,但发送呼叫会立即发出。虽然定义了错误通道和目标,并且将errorChannelEnabled设置为true
我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedK
我写了一个jUnit测试,但我在执行它时遇到了错误: 我的应用程序是这样配置的: 我的测试: 我的上下文配置: 我的网络。xml配置: 根据网络上的不同教程,配置似乎很好,但我不知道错误来自哪里。 我认为TestDao是一个上下文。我的测试类没有很好地实现xml,因为当我将上下文文件移动到另一个项目时,错误根本不会改变。此外,我测试了许多不同的方法来配置@ContextConfiguration,