当前位置: 首页 > 知识库问答 >
问题:

Spring integration Java DSL-动态创建IntegrationFlow

鲜于子琪
2023-03-14

我正在使用Spring Boot 1.5.13.release和Spring Integration 4.3.16.release开发一个应用程序

我对Spring Integration相当陌生,我遇到了一个问题。

因此,基本思想是,在一些外部触发器(可能是和HTTP调用)上,我需要创建一个IntegrationFlow来使用来自rabbitMQ队列的消息,对它们进行一些处理,然后(可能)生成到另一个rabbitMQendpoint。

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

我必须澄清,这可以同时发生,同时创建多个IntegrationFlow。

因此,每次我试图创建IntegrationFlow时,我的“源”是一个如下所示的网关:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

它还不是@bean,但我希望在注册每个IntegrationFlow时注册它。

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}
public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

但是,即使像.filter这样的组件的bean名称问题已经解决,我仍然会得到关于MessageTransFormingHandler的相同异常。

我没有提到这样一个事实,即一旦每个IntegrationFlow完成其工作,就会使用IntegrationFlow Context删除它,如下所示:

flowContext.remove(flowId);

因此,通过使用相同的对象作为锁来同步流注册块和流移除块,似乎已经(某种程度上)奏效了。

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

但我想这是意料之中的,因为每次线程获取锁时,注册流及其所有组件或取消注册流及其所有组件都需要一些时间。

共有1个答案

吕和风
2023-03-14

你还有这个:

.transform(Transformers.toJson(jsonObjectMapper))

如果您也在其中添加.id()将如何工作?

另一方面,既然您说这是并发发生的,那么我想知道是否可以将代码中的某些部分合并,例如包装flowcontext.registration(integrationFlow).id(callUUID).register();

 类似资料:
  • 问题内容: 你好,我有这个设置 我需要为每个按钮获取以下内容 在Java中是否可以为我声明的每个按钮动态创建此按钮?因为当我有5个按钮时,我不需要3x5 = 15行代码,而是只有几行具有动态创建的按钮。 问题答案: 编写一个小循环并将您的按钮存储在数组中:

  • 你好,我已经准备好了 我需要为每个按钮获得以下内容 在Java中,是否可以为我声明的每个按钮动态创建它?因为当我有5个按钮时,我不希望3x5=15行代码,而只希望有几行动态创建的按钮。

  • 问题内容: 给定一个类名,我想动态创建一个Groovy类,向其添加属性和方法。我使用创建新类 对于我使用的方法 其中it.key是字符串(方法名),it.value是闭包。这很方便,因为我可以指定方法参数类型并进行类型检查。但是,如果不给它赋值,就无法指定动态创建的属性类型。我可以通过显式定义属性的getter和setter来解决此问题。这可行,但是metaClass.name = value或m

  • 问题内容: 我在mysql上创建数据库。首先创建主体表,每个表平均有30列。日志表的标准是引用表的pk加上每列*2。像这样: 参考表: 日志表: 现在,我想要创建一个过程,在该过程中,我将表名作为参数传递,并生成表日志查询并执行它。 做这个的最好方式是什么? 问题答案: 为了使一个字符串代表一个表(或数据库)名称,您将需要用变量连接查询字符串,并在存储过程中准备/执行一条语句。这是一个基本示例。

  • 问题内容: 我陷入GWT CellTable的问题。我需要动态创建单元表,而我没有实体(Bean)类。我已经看到了所有celltable的示例,并且在没有实体类的情况下进行了大量搜索。 我需要根据数据库中存储的一些元数据动态填充表。我可以创建表结构 考虑有两个类,一个是GRID,另一个是COLUMN,用于元数据和列定义。GRID将具有COLUMNS的列表作为列定义 现在,我需要从数据库中获取网格并

  • 问题内容: 我需要动态创建一个类。为了更详细,我需要动态创建Django类的子类。 通过“动态”,我打算基于用户提供的配置创建一个类。 例如 我想要一个命名为该类的子类的类。 该类应具有所选属性的列表。 ....在这种情况下 有什么有用的提示吗?:) 问题答案: 您可以通过调用内置函数并传递适当的参数来动态创建类,例如: 它适用于新型类。我不确定这是否也适用于老式类。

  • 我正在工作的一个项目,允许人们列出手机出售,并能够查看所有的手机出售。我的工作正常。jsp返回一个包含所有电话及其所有详细信息的列表。如品牌、型号、网络、存储、颜色、屏幕大小等。 问题是,它需要设置,以便您只看到“品牌”和“型号”(例如“i Phone”“6”),然后能够点击“更多信息”查看其余的细节。这个'more info'按钮应该会打开一个包含手机全部细节的新jsp。 抱歉,如果我没有提供足

  • 问题内容: 我正在尝试做的是动态创建子组件,这些子组件应该注入到父组件中。例如父组件是包含共享的内容为所有课程如如按钮,和其他东西。根据路线参数,应将作为 子组件的 课程内容动态注入到父组件中。子组件(课程内容)的HTML定义为外部某处的纯字符串,它可以是类似以下内容的对象: 通过在 父组件 模板中进行以下操作可以轻松解决问题。 在每次更改路线参数时,父组件的属性都会更改(内容(新模板)将从对象中