如何使用DSL为以下步骤设置无功流:
JsonSchemaValidator
类]BusinessService:business logic,state machine)
- 持久化对象R2DBC出站适配器
我在看这个:https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
在上述示例中,创建了专用流,返回发布者,在测试中,发布者被订阅。然而,当SqsMessageDrivenChannelAdapter将消息引入通道时,将触发我的流。
对于上述步骤1到5的场景,如何实现无功流配置?
更新:添加示例代码
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
更新2:使用executor通道将JPA移动到diff线程
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.channel(persistChannel())
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel persistChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
您可能需要更加熟悉到目前为止我们在Spring集成中针对反应流所做的工作:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-溪流
您使用该测试类显示的示例与您的用例完全无关。在该测试中,我们试图涵盖我们在Spring集成中公开的一些API,有点像单元测试。它与整个流程无关。
您的用例实际上只是一个完整的黑盒流,从SQS侦听器开始,到R2DBC结束。因此,在你的流程中,没有必要尝试将它的一部分转换为发布者,然后将其带回流程的另一部分:你不会跟踪一些方式并自己订阅该发布者。
您可以考虑在流的endpoint之间放置一个FLuxMessageChannel
,但它对您的用例仍然没有意义。它不会像您期望的那样完全反应性,仅仅因为org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer
没有阻塞消费者线程以准备来自下游的反压力。
流中唯一真正反应性的部分是R2DBC出站通道适配器,但它可能不会给您带来太多价值,因为数据源不是反应性的。
正如我所说:您可以尝试放置一个通道(通道-
我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?
我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI
我的流在数据库中配置,我的程序不断创建和销毁流。 因此,流配置(例如cron配置)可以随时更改。 这些流是用方法IntegrationFlowContext注册的。使用IntegrationFlowRegistration方法注册并销毁。销毁。 流的运行从第0秒开始,可以在任何一分钟开始。销毁和创建新流从每分钟1秒开始。 这是一个好方法吗?当我测试这个时,它起作用了。但我在想,这是一种很好的方法吗
我下载了Sprind SAML示例应用程序,它在本地tomcat中运行良好(针对SSOCircle)。然后我添加了一个新的SP,以指向我们公司的ADFS。我有几个问题,一个接一个地解决了。现在,我能够发送请求并获得有效的saml响应和断言令牌。但是,我得到以下错误消息: null
使用SpringBoot2和SpringIntegration为面向流的侦听套接字/服务器开发非阻塞TCP,代码如下。 应用程序开始指示tcp端口正在按预期进行侦听,但几分钟后引发以下异常:
我正在致力于将Spring集成与AWS SQS队列集成。 当我用ServiceActivator注释的方法引发异常时,我遇到了一个问题。在这种情况下,消息似乎无论如何都会从队列中删除。我已在SqsMessageDrivenChannelAdapter中将MessageDeletionPolicy配置为成功时的。 这是我的通道/适配器配置https://github.com/sdusza1/spri