当前位置: 首页 > 知识库问答 >
问题:

如何将网关中直接通道的消息有效载荷复制到恢复通道?

梁丘琛
2023-03-14

我有一个“DataChannel”,它使用入站通道适配器从DB获取结果集。在这里,我从db获得了一个名为'process_id'的字段。在通过int:HTTP-Outbound网关调用外部系统之后,我定义了一个恢复通道。我只想对该process_id执行更新查询。但是我无法在恢复通道中获得进程id。获取一个异常无效属性“payload[process_id]”...无论如何,是否可以将process_id传递到恢复通道,以便我可以在如下的

为了清楚起见,下面是spring-integration xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:task="http://www.springframework.org/schema/task" xmlns:int-http="http://www.springframework.org/schema/integration/http"
	xmlns:stream="http://www.springframework.org/schema/integration/stream"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
            http://www.springframework.org/schema/integration/stream 
		http://www.springframework.org/schema/integration/stream/spring-integration-stream-4.1.xsd
               http://www.springframework.org/schema/integration/http
    http://www.springframework.org/schema/integration/http/spring-integration-http-4.1.xsd
            http://www.springframework.org/schema/integration/jdbc
            http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-4.1.xsd
            http://www.springframework.org/schema/jdbc
            http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd">

	<int:channel id="requestchannel"></int:channel>
	<int:channel id="xtifyrequestchannel"></int:channel>
	<int:channel id="xtifyresponsechannel"></int:channel>
	<int:channel id="tpgrequestchannel"></int:channel>
	<int:channel id="tpgresponsechannel"></int:channel>
   <int:channel id="xtifyerrorchannel">
	</int:channel>

	<int:channel id="tpgerrorchannel">
	</int:channel>

	<int:channel id="executerchannel">
		<int:dispatcher task-executor="taskExecutor" />
	</int:channel>

	<task:executor id="taskExecutor" pool-size="2" />

	<bean id="pollerdatamapper" class="main.java.com.as.poller.PollerDataMapper" />

	<bean id="pollerservice" class="main.java.com.as.poller.PollerService" />

	<bean id="requestFactory"
		class="org.springframework.http.client.SimpleClientHttpRequestFactory">
		<property name="connectTimeout" value="10000" />
		<property name="readTimeout" value="10000" />
	</bean>

	<int:logging-channel-adapter id="logger"
		level="INFO" />

	<int-jdbc:inbound-channel-adapter id="datachannel"
		query="select loyalty_id,process_id,mobile_uid,mobile_os from TBL_RECEIPT where r_cre_time=(select min(r_cre_time) from TBL_RECEIPT where receipt_status=0)"
		data-source="dataSource" max-rows-per-poll="1" row-mapper="pollerdatamapper">

		<int:poller fixed-rate="5000">
		</int:poller>

	</int-jdbc:inbound-channel-adapter>


	<int:gateway id="requestGateway" service-interface="main.java.com.as.poller.RequestGateway"
		default-request-channel="requestchannel" default-reply-timeout="20000">
		<int:method name="pushNotification" />
		<int:method name="sendTPGRequest" request-channel="tpgrequestchannel">
			<int:header name="Content-Type" value="multipart/form-data" />
		</int:method>
	</int:gateway>

	<int:object-to-json-transformer
		input-channel="requestchannel" output-channel="xtifyrequestchannel"></int:object-to-json-transformer>

	<int-http:outbound-gateway id="xtifygateway"
		request-channel="xtifyrequestchannel" reply-channel="xtifyresponsechannel" request-factory="requestFactory"
		url="${xtifyUrl}" http-method="POST">
		<int-http:request-handler-advice-chain>
			<int:retry-advice max-attempts="3" recovery-channel="xtifyerrorchannel">
			</int:retry-advice>
		</int-http:request-handler-advice-chain>
	</int-http:outbound-gateway>

	<int-http:outbound-gateway id="tpggateway"
		request-channel="tpgrequestchannel" reply-channel="tpgresponsechannel"
		request-factory="requestFactory" expected-response-type="java.lang.String"
		url="${tpg_url}" http-method="POST">
		<int-http:request-handler-advice-chain>
			<int:retry-advice max-attempts="3" recovery-channel="tpgerrorchannel">
			</int:retry-advice>
		</int-http:request-handler-advice-chain>
	</int-http:outbound-gateway>

	<int:json-to-object-transformer
		input-channel="tpgresponsechannel" type="main.java.com.as.rest.response.TPGResponse" />


	<int:service-activator input-channel="datachannel"
		output-channel="executerchannel" ref="pollerservice" method="getRecordFromPoller">
	</int:service-activator>

	<int:service-activator input-channel="executerchannel"
		ref="pollerservice" method="getDataFromExecuterChannel">
	</int:service-activator>

	<int-jdbc:outbound-channel-adapter
		id="tpgsystemfailure"
		query="update TBL_RECEIPT set receipt_status=1 
		where process_id in (:payload.failedMessage.payload[process_id])"
		data-source="dataSource" channel="tpgerrorchannel" />

	<int-jdbc:outbound-channel-adapter
		id="xtifysystemfailure"
		query="update TBL_RECEIPT set receipt_status=4 where process_id in (:payload.failedMessage.payload[process_id])"
		data-source="dataSource" channel="xtifyerrorchannel" />
		
		<int-jdbc:outbound-channel-adapter
		id="xtifysystemsuccess"
		query="update TBL_RECEIPT set receipt_status=5 where process_id in (:payload.process_id)"
		data-source="dataSource" channel="xtifyresponsechannel" />
		
		
</beans>
	
 

共有1个答案

曾晨
2023-03-14

recovery-channel获取errormessage。有效负载是一个MessagingException,具有两个属性FailedMessageCause

使用payload.failedmessage.payload[process_id]

 类似资料:
  • 我有一个spring boot service(2.4.5)显示了一个checkmarx错误,我们需要清理请求有效负载。我们如何对请求有效载荷进行消毒? 对于“@RequestBody final MyInput Input”,我得到以下checkmarx错误消息: 我想消毒我的有效载荷。或者除了使用DTO,然后将其转换为我的数据库实体之外,没有其他选择

  • 我正在创建一个不和谐机器人,它每天在某个时间向公会发送消息。我使用为预定的消息,但不确定为机器人编写什么代码来将消息发送到公会的一般频道,如果他们没有一般频道,则发送到带有大多数活动或消息。 由于已被删除,我不确定该如何执行此操作。另外,我不想使用通道id将消息发送到特定的通道,因为我想将此bot公开,以便许多不同的协会可以使用。 它不一定要发送到“默认”频道或包含大多数消息的频道。这些正是我想到

  • 我曾经能够做到这一点,但我正在努力应对0.3的期货。 下面是我从WebSocket获得的一个sink和stream: 我创建了一个在异步tokio任务之间通信的无界通道: 这是我被卡住的部分。我生成了一个异步任务,它应该连接无界接收器和接收器;我的想法是通过< code>unbounded_sender发送消息: 对于<code>send_all</code>,错误消息显示: 而且 而且 查看文档

  • 我有下面的代码从消息生成嵌入,它现在可以正常工作。我想要的是,在创建嵌入后,bot应该要求用户提到一个频道,在用户提到一个频道后,bot应该将该嵌入发送到那里。我该怎么做?

  • 在RabbitMQ总线上使用带重载的spring amqp,我们有时会从org获取日志。springframework。amqp。兔子联系CachingConnectionFactory说:通道关闭:清洁通道关闭;协议方法:#方法 你能解释一下这个日志吗?为什么它处于错误级别?我们有什么调整吗?提前谢谢你的回答。

  • Kafka MQ源连接器可以将事件从MQ带到1个Kafka主题,我们可以在Kafka MQ连接器内部进行基于消息的路由吗? 还是我们必须编写一个KStream应用程序来根据内容负载进行路由