@EnableBinding(MySink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = MySink.INPUT, condition = "headers['type']=='bogey'")
public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
// handle the message
}
@StreamListener(target = MySink.INPUT, condition = "headers['type']=='bacall'")
public void receiveBacall(@Payload BacallPojo bacallPojo) {
// handle the message
}
@StreamListener(target = MySink.ANOTHER_INPUT, condition = "headers['type']=='bacall'")
public void receiveBacall(@Payload BacallPojo bacallPojo) {
// handle the message
}
}
如何提供一个在条件不匹配时调用的处理程序?
如果我有两个处理程序,第一个有条件,第二个没有条件,当第一个条件匹配时,两个处理程序都被调用。我如何避免这种情况?
我们可能需要修改你提到的部分,因为它有点过时了。
此外,我们不能(不应该)基于有效负载类型进行任何类型的路由,因为数据是以串行形式从线路中传入的,比如byte[]
。我在这篇旧帖子中详细讨论了它。但是您肯定可以使用传入消息的其他部分作为路由条件。推荐的最佳实践是依赖消息头。
让我们来看看这个示例:
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Function<String, String> lowercase() {
return v -> v.toLowerCase();
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
--spring.cloud.function.routing-expression=headers['type'] == 'upper' ? 'uppercase' : (headers['type'] == 'lower' ? 'lowercase' : ''reverse)
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
我试图在我的应用程序中集成spring cloud stream kinesis,但我找不到手册中的所有配置选项。我看过这个链接: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/as
问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。
是否可以使用函数()样式,使用多个独立的函数/绑定来实现反应性SCS应用程序?我发现的所有示例总是只注册一个具有默认绑定的函数bean。我想注册多个,每个都有自己的绑定。 传统上,这可以使用来完成,但现在不推荐使用函数支持。
我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。