Spring集成头不会被注入服务激活器POJO。这只发生在负载测试时(小于10 tps)。正常流按预期工作。Spring集成流是
Splitter -> executor channel -> service activator -> aggregator.
SI version 4.1.2.RELEASE
Spring Version 4.1.2.RELEASE
Java version 1.8.0_60
//service activator -Method
public ProductDetail getProductDetails(final String productID,
@Header(STATS) final Stats statsType,
@Header(SKU) final String sku,
@Header(PRODUCT_LIST) final Collection<String> productList,
@Header(NO_OF_PRODUCTS) final String size)
final MessageBuilder<Map<String, Object>> msgBuilder = MessageBuilder
.withPayload(payload);//Map<String, Object> payload
this.addMessageHeaders(msgBuilder, httpHeadersMap, statsType);
private final boolean addMessageHeaders(MessageBuilder<?> msgBuilder,
final Map<String, String> httpHeadersMap, final Stats statsType) {
if ((httpHeadersMap != null) && (httpHeadersMap.size() > 0)) {
for (Map.Entry<String, String> entry : httpHeadersMap.entrySet()) {
msgBuilder.setHeader(entry.getKey(), entry.getValue());
}
}
if (statsType != null) {
msgBuilder.setHeader(STATS, statsType);
}
return true;
}
组织。springframework。信息。MessageHandlingException:嵌套异常是java。lang.IllegalArgumentException:所需标题不可用:组织统计数据。springframework。整合。汉德勒。方法调用消息处理器。processMessage(MethodInvokingMessageProcessor.java:78)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。汉德勒。ServiceHandler。handleRequestMessage(ServiceActivatingHandler.java:71)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。汉德勒。AbstractReplyProducingMessageHandler。handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。汉德勒。AbstractMessageHandler。handleMessage(AbstractMessageHandler.java:78)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。调度员。我是调度员。tryooptimizedDispatch(AbstractDispatcher.java:116)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。调度员。单播播放机。doDispatch(UnicastingDispatcher.java:101)~[spring-integration-core-4.1.2.RELEASE.jar:na]位于org。springframework。整合。调度员。单播播放机。访问org上的$000(UnicastingDispatcher.java:48)~[spring-integration-core-4.1.2.RELEASE.jar:na]。springframework。整合。调度员。单播Dispatcher 1美元。在org上运行(UnicastingDispatcher.java:92)~[spring-integration-core-4.1.2.RELEASE.jar:na]。springframework。整合。util。ErrorHandlingTaskExecutor$1。在xx运行(ErrorHandlingTaskExecutor.java:52)~[spring-integration-core-4.1.2.RELEASE.jar:na]。在java上运行(DelegatingContextRunnable.java:40)~[xx-core-3.xx.jar:3.24.0.52]。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1142)~[na:1.8.0_60]。util。同时发生的ThreadPoolExecutor$Worker。在java上运行(ThreadPoolExecutor.java:617)~[na:1.8.0_60]。朗。丝线。运行(Thread.java:745)~[na:1.8.0_60]由:java引起。lang.IllegalArgumentException:所需标题不可用:组织统计数据。springframework。util。明确肯定sun上的isTrue(Assert.java:68)~[spring-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]。反映NativeMethodAccessorImpl。在太阳上调用0(本机方法)~[na:1.8.0_60]。反映NativeMethodAccessorImpl。在sun上调用(NativeMethodAccessorImpl.java:62)~[na:1.8.0_60]。反映DelegatingMethodAccessorImpl。在java上调用(DelegatingMethodAccessorImpl.java:43)~[na:1.8.0_60]。朗,反思一下。方法调用(Method.java:497)~[na:1.8.0_60]
<int:channel id="retrieve-product.in" />
<int:channel id="retrieve-product.out" />
<int:channel id="retrieve-product.err" />
<int:channel id="retrieve-product.splitter.in"/>
<int:channel id="retrieve-product.sa.in">
<int:dispatcher task-executor="testExecutor" failover="false"/>
</int:channel>
<int:channel id="retrieve-product.aggregator.in"/>
<int:channel id="retrieve-product.transformer.in"/>
<bean id="retrieveProductsServiceActivatorSupport" class="com.test.eip.serviceactivator.support.RetrieveProductsServiceActivatorSupport" />
<bean id="retrieveProductsServiceActivator" class="com.test.eip.serviceactivator.RetrieveProductsServiceActivator" />
<bean id="retrieveProductsAggregator" class="com.test.eip.aggregator.RetrieveProductsAggregator" />
<bean id="retrieveProductResponseTransformer" class="com.test.eip.transformer.RetrieveProductsResponseTransformer" />
<bean id="productGroupsConfig" class="com.test.support.ProductGroupsConfig" >
<constructor-arg index="0" value="ProductMapping.xml" />
</bean>
<bean id="payloadSizeBasedReleaseStrategy" class="com.test.eip.aggregator.support.PayloadSizeBasedReleaseStrategy" />
<oxm:jaxb2-marshaller id="productConfigJaxbMarshaller" context-path="com.test.product.config.types" />
<int:gateway id="retrieveProductsGateway" service-interface="com.test.eip.gateway.IProductGateway" error-channel ="retrieve-product.err">
<int:method name="getProductsResponse" request-channel="retrieve-product.in" reply-channel="retrieve-product.out" />
</int:gateway>
<int:header-enricher input-channel="retrieve-product.in" output-channel="retrieve-product.splitter.in">
<int:header name="API_NAME" value="retrieve-product" />
</int:header-enricher>
<!-- Input List<String> contain product numbers -->
<int:splitter id="productListSplitter" input-channel="retrieve-product.splitter.in"
output-channel="retrieve-product.sa.in"></int:splitter>
<!-- Retrieve Product Features from USING API -->
<int:service-activator input-channel="retrieve-product.sa.in"
ref="retrieveProductsServiceActivator" method="getProductDetails"
output-channel="retrieve-product.aggregator.in"
id="retrieve-product-service-activator" />
<!-- Aggregate result from Parallel Call -->
<int:aggregator input-channel="retrieve-product.aggregator.in"
output-channel="retrieve-product.transformer.in"
method="aggregateProductDetails" ref="retrieveProductsAggregator"
release-strategy="payloadSizeBasedReleaseStrategy"
release-strategy-method="canRelease">
</int:aggregator>
<!-- Generate Service Response-->
<int:transformer id="retrieve-product-ResponseTransformer" method="transform"
ref="retrieveProductResponseTransformer" input-channel="retrieve-product.transformer.in" output-channel="retrieve-product.out" />
<!--
Listen for error and generate response
-->
<int:service-activator input-channel="retrieve-product.err"
ref="testErrorHandler" method="handleExceptions"
output-channel="retrieve-product.out"
id="retrieve-product-err-handler" />
public interface IProductGateway {
public RetrieveProductsResponse getProductsResponse(Message<?> inputMessage);
}
//Service
@Service
public class ProductsService implements IProductsService {
@Autowired
private IProductGateway retrieveProductsGateway;
@Override
public RetrieveProductsResponse getProductsResponse(
final RetrieveProductsRequest retrieveProductsRequest,
final Map<String, String> httpHeadersMap, final Stats statsType) {
final ProductsReqType products = retrieveProductsRequest.getProducts();
//populate productList From products
final Collection<String> productList = new HashSet<>();
httpHeadersMap.put(SKU, retrieveProductsRequest.getSKU());
httpHeadersMap.put(NO_OF_PRODUCTS,
String.valueOf(productList.size()));
final MessageBuilder<Collection<String>> msgBuilder = MessageBuilder
.withPayload(productList)
.setHeader(PRODUCT_LIST, new HashSet<>(productList));
this.addMessageHeaders(msgBuilder, httpHeadersMap, statsType);
final Message<Collection<String>> inMsg = msgBuilder.build();
LOG.debug("retrieveProductsGateway:: {}", this.retrieveProductsGateway);
final RetrieveProductsResponse retrieveProductsResponse = this.retrieveProductsGateway
.getProductsResponse(inMsg);
return retrieveProductsResponse;
}
}
//Jersey Filter is creating Stats setting it to requestContext( javax.ws.rs.container.ContainerRequestFilter)
/* Request Filter Creating and setting statsType
*/
@Override
public void filter(final ContainerRequestContext requestContext) throws IOException {
LOG.debug("filter(requestContext)::Entry");
final Stats statsType = STATS_OBJ_FACTORY.createStats();
final MultivaluedMap<String, String> headers = requestContext.getHeaders();
statsType.setBegin(DT_FACTORY.newXMLGregorianCalendar(new GregorianCalendar()));
requestContext.setProperty(STATS, statsType);
LOG.debug("filter(requestContext)::Exit");
}
//Resource Method taking statsType and passing to ProductsService
/*javax.ws.rs.core.Context*/
@POST
@Path("retrieve-product")
@Produces(MyMediaType.PD_XML_MEDIA_TYPE)
@Consumes(MyMediaType.PD_XML_MEDIA_TYPE)
public Response getProducts(RetrieveProductsRequest retrieveProductsRequest,
@Context final ContainerRequestContext containerRequestContext,
@Context final HttpHeaders headers) {
LOG.debug("getProducts::Entry");
final Stats statsType = (Stats) containerRequestContext
.getProperty(STATS);
final Map<String, String> httpHeadersMap = ResourceHelper
.getHttpHeadersAsMap(headers);
final RetrieveProductsResponse retrieveProductsResponse = this.productsService
.getProductsResponse(retrieveProductsRequest, httpHeadersMap, statsType);
LOG.debug("getProducts::Exit");
return Response.ok(retrieveProductsResponse).build();
}
谢谢,
所需标头不可用:STATS
简单地说,创建消息的任何上游都不会填充消息头;您需要显示所有配置(可能还有代码)。调试日志记录和跟踪消息通常有助于调试此类情况。
一个常见的错误是缺乏线程安全性——比如在拆分器中。
我有一个集成测试来测试Spring Boot服务器的其余endpoint。 我需要创建一些数据(不使用RESTendpoint),所以我尝试使用TestEntityManager,所以我用@SpringBootTest注释了我的测试类。到目前为止还不错,测试启动了Spring Boot上下文,因此我的服务器和测试通过了。 问题:我需要在这个集成测试之外启动Spring Boot服务器,以便为所有集
我正在使用: Azure服务总线Spring集成2.6.0 Java11 Azure Spring Cloud Stream主题活页夹2.6.0 Azure Spring Integration会自动将“Azure\u服务\u总线\u时间\u到\u live”标头添加到消息中。标头的类型是java。util。期间当消息被发送/存储到Azure服务总线主题时,Duration对象被转换为类似“PT3
在GatewayFilter中,我试图将一个头注入到请求中,如下所示。
我更新了createInjector调用以包含我的JPAPersisteModule。。。 在我的持久化单元声明如下:
我正在运行一个使用TestNG作为测试框架的Spring Boot应用程序。我的测试是这样设置的: 父类,负责设置逻辑并负责所有配置内容: 有多个子测试类。每个on继承父测试类,因此它们共享相同的设置逻辑。 问题是:以前我使用的是注释,这意味着父类的设置方法为每个子测试类运行一次。这是可以的,但等待相同的设置运行多次确实很慢。 所以我想:我将把ParentTestClass中的注释改为!那会解决我
在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法