我正在构建一个将订单路由到订单管理系统的示例实现。路由的入站是具有两个标识符的交换,一个用于客户 ID,另一个用于目录项 ID。然后,我将入站交换的主体转换为我的订单域对象。然后,我的目标是调用内容丰富器模式,一次用于聚合来自客户的数据,一次用于聚合来自目录项的数据。
我的路线是:
@Override
public void configure() {
// Start by building an instance of RestConfigurationDefinition. Need to
// specify the component we are going to use for enabling REST endpoints,
// specifically CamelServlet in this case. Set the binding mode to JSON.
restConfiguration().
// Leverage the CamelServlet component for the REST DSL
component("servlet").
// Bind using JSON
bindingMode(RestBindingMode.json).
// I like pretty things...
dataFormatProperty("prettyPrint", "true").
// This is the context path to be used for Swagger API documentation
apiContextPath("api-doc").
// Properties for Swagger
// Title of the API
apiProperty("api.title", "Order Management API").
// Version of the API
apiProperty("api.version", "1.0.0").
// CORS (resource sharing) enablement
apiProperty("cors", "true").
// Use localhost for calls
apiProperty("host", "localhost:8083").
// Set base path
apiProperty("base.path", "nvisia-order-router-camel-service/api");
// Definition of the post order endpoint
rest("/orderRouter").
// This is a POST method call for routing an order using the
// order form
post().
// Description of what the method does
description("Routes a new order to the order management service").
// Define the type used for input
type(OrderForm.class).
// Define the type used for output, in this case the order
outType(String.class).
// Next, define where the message is routed to, first transformation
to("bean:orderRouterService?method=transformOrderFormToOrder(${body})")
.to("direct:enrichOrder");
// Definition of the enrich order endpoint
from("direct:enrichOrder").
// Use the Content Enricher EIP to aggregate customer info in the
// order.
enrich(
"http4://localhost:8081/nvisia-customer-camel-service/api/customer/${body.customerId}",
new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Order originalBody = (Order) oldExchange.getIn().getBody();
Customer resourceResponse = (Customer) newExchange.getIn().getBody();
originalBody.setCustomer(resourceResponse);
if (oldExchange.getPattern().isOutCapable()) {
oldExchange.getOut().setBody(originalBody);
} else {
oldExchange.getIn().setBody(originalBody);
}
return oldExchange;
}
}).
// Use the Content Enricher EIP to aggregate catalog info in the
// order.
enrich(
"http4://localhost:8080/nvisia-catalog-camel-service/api/customer/${body.catalogItemId}",
new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Order originalBody = (Order) oldExchange.getIn().getBody();
CatalogItem resourceResponse = (CatalogItem) newExchange.getIn()
.getBody();
originalBody.setCatalogItem(resourceResponse);
if (oldExchange.getPattern().isOutCapable()) {
oldExchange.getOut().setBody(originalBody);
} else {
oldExchange.getIn().setBody(originalBody);
}
return oldExchange;
}
}).to("direct:sendOrder");
// Definition of the send order endpoint
from("direct:sendOrder").
// Need to define the content type on the header
setHeader(org.apache.camel.Exchange.CONTENT_TYPE,
constant("application/json"))
.
// Be safe and define this as a post
setHeader(Exchange.HTTP_METHOD,
constant(org.apache.camel.component.http4.HttpMethods.POST))
.
// Finally, send the order to be managed and get back the order ID
to("http4://localhost:8082/nvisia-order-management-camel-service/api/order");
}
我得到的例外是:
org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: OrderForm [customerId=1, catalogItemId=1] of type: com.nvisia.examples.camel.orderrouter.OrderForm on: Message[]. Caused by: No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1]. Exchange[ID-nvisia-mhoffman-50981-1463522552963-0-8]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1]]
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:107)
at org.apache.camel.component.http4.HttpProducer.createRequestEntity(HttpProducer.java:523)
at org.apache.camel.component.http4.HttpProducer.createMethod(HttpProducer.java:422)
at org.apache.camel.component.http4.HttpProducer.process(HttpProducer.java:110)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.Enricher.process(Enricher.java:187)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:468)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:62)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:468)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:143)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:729)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration$ApplicationContextHeaderFilter.doFilterInternal(EndpointWebMvcAutoConfiguration.java:261)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:115)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:103)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.springframework.boot.context.web.ErrorPageFilter.doFilter(ErrorPageFilter.java:120)
at org.springframework.boot.context.web.ErrorPageFilter.access$000(ErrorPageFilter.java:61)
at org.springframework.boot.context.web.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:95)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.boot.context.web.ErrorPageFilter.doFilter(ErrorPageFilter.java:113)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:217)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:518)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1091)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:673)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: com.nvisia.examples.camel.orderrouter.OrderForm to the required type: java.io.InputStream with value OrderForm [customerId=1, catalogItemId=1]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:198)
at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:105)
... 79 more
注意我使用的是最新的camel,2.17.1。我的问题是,如果我得到一个主体是JSON格式的交换,我如何调用两个内容丰富器来填充我的Order bean?由于我不需要向任何一个内容丰富器调用发送输入流,所以我考虑在每次交换之前有一个处理器,但是我认为在第二个聚合中维护来自第一个聚合的信息会很困难。如果这不是使用内容丰富器的推荐方式,请告诉我。
使用rich的选项类似于此集成模式
不要在扩充
中直接使用 http4
<enrich strategyRef="myAgregator">
<simple>http4://foo-url/</simple> <!-- InvalidPayloadException -->
</enrich>
因此,与直接
uri间接使用:
<camel:route id="consume-service1">
<from uri="direct:consume-service1" />
<camel:removeHeaders pattern="CamelHttp*" />
<to uri="http4://order-service/" />
<unmarshal ref="orderJson" />
<enrich strategyRef="myAgregator">
<simple>direct:consume-service2</simple>
</enrich>
<marshal ref="myJson" />
</camel:route>
<camel:route id="consume-service2">
<from uri="direct:consume-service2" />
<camel:setBody><constant></constant></camel:setBody>
<camel:removeHeaders pattern="CamelHttp*" />
<to uri="http4://catalog-service" />
<unmarshal ref="catalogJson" />
</camel:route>
我能够解决这一问题,不再使用内容丰富器,而是使用两个直接endpoint的多播。我还需要显式地解组和编组,这也导致了一些额外的问题。这是我使用的最终路线,以防对其他人有用。
// Definition of the post order endpoint
rest("/orderRouter")
// This is a POST method call for routing an order using the order
// form
.post()
// Description of what the method does
.description("Routes a new order to the order management service")
// Define the type used for input
.type(OrderForm.class)
// Define the type used for output, in this case the order
.outType(OrderNumber.class)
// Now, process the order
.to("direct:processOrder");
// This is the route that processes the order sent. First, we need to take
// the data from the order form passed, put it in the header and empty out
// the body of our incoming exchange.
from("direct:processOrder")
// Set header for customer ID
.setHeader("customerId", simple("${body.customerId}"))
// Set header for catalog item ID
.setHeader("catalogItemId", simple("${body.catalogItemId}"))
// Empty the body
.setBody(constant(""))
// Now, aggregate the data to an order type
.end()
// Use multicasting to call the customer and catalog item services
// in parallel. Then, use a strategy that groups the exchanges
// returned from the service calls into a single list for
// processing.
.multicast(new GroupedExchangeAggregationStrategy())
// Use parallel processing
.parallelProcessing()
// Send to two direct components to get the data
.to("direct:getCustomerData", "direct:getCatalogItemData")
// End the multicast call
.end()
// Now process the exchange
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<Exchange> exchanges = exchange.getIn().getBody(List.class);
Order order = new Order();
for (Exchange exchangeToProcess : exchanges) {
if (exchangeToProcess.getIn().getBody() instanceof Customer) {
order.setCustomer(
exchangeToProcess.getIn().getBody(Customer.class));
} else if (exchangeToProcess.getIn()
.getBody() instanceof CatalogItem) {
order.setCatalogItem(
exchangeToProcess.getIn().getBody(CatalogItem.class));
} else {
// Ignore it for now.
}
}
order.setOrderDate(new Date(System.currentTimeMillis()));
exchange.getIn().setBody(order);
}
})
// End this processor definition
.end()
// Need to marshal the body to JSON
.marshal()
// Need to use JSON for marshalling
.json(JsonLibrary.Jackson)
// Then convert it to a string
.convertBodyTo(String.class)
// We can now send the order to order management. Need to define the
// content type on the header
.setHeader(org.apache.camel.Exchange.CONTENT_TYPE,
constant("application/json"))
// Be safe and define this as a post
.setHeader(Exchange.HTTP_METHOD,
constant(org.apache.camel.component.http4.HttpMethods.POST))
// Set the HTTP uri to be used.
.setHeader("CamelHttpUri", simple(
"http://localhost:8082/nvisia-order-management-camel-service/api/order"))
// Finally, send the order to be managed and get back the order ID
.to("http4://localhost:8082/nvisia-order-management-camel-service/api/order")
// Next, convert the input stream returned to a string
.convertBodyTo(String.class)
// Finally, unmarshal the string to an object
.unmarshal().json(JsonLibrary.Jackson, OrderNumber.class);
// Retrieves the customer data from the REST service for customer.
from("direct:getCustomerData")
// Set the http method as GET
.setHeader("CamelHttpMethod", constant("GET"))
// Set the HTTP uri to be used.
.setHeader("CamelHttpUri", simple(
"http://localhost:8081/nvisia-customer-camel-service/api/customer/${header.customerId}"))
// Define the endpoint; though, url will be ignored in favor of
// header
.to("http4://localhost:8081/nvisia-customer-camel-service/api/customer/${header.customerId}")
// Next, convert the input stream returned to a string
.convertBodyTo(String.class)
// Finally, unmarshal the string to an object
.unmarshal().json(JsonLibrary.Jackson, Customer.class);
// Retrieves the catalog item data from the REST service for catalog
// items.
from("direct:getCatalogItemData")
// Set the http method as GET
.setHeader("CamelHttpMethod", constant("GET"))
// Set the HTTP uri to be used.
.setHeader("CamelHttpUri", simple(
"http://localhost:8080/nvisia-catalog-camel-service/api/catalogItem/${header.catalogItemId}"))
// Define the endpoint; though, url will be ignored in favor of
// header
.to("http4://localhost:8080/nvisia-catalog-camel-service/api/catalogItem/${header.catalogItemId}")
// Next, convert the input stream returned to a string
.convertBodyTo(String.class)
// Finally, unmarshal the string to an object
.unmarshal().json(JsonLibrary.Jackson, CatalogItem.class);
我正在实现RESTful服务(使用CXFRS组件),它应该为某些请求返回文件。每个文件都是通过其id和扩展名获取的,即。每个文件一旦添加就不会更改。文件在获取后不应移动或删除,通常它们应该可以同时访问。这是我的Camel上下文的一部分: 此配置的问题是,响应只有第二个非空主体(为什么?)请求,无超时设置服务在第二个请求时进入永恒循环,并显示调试消息 Apace Camel版本为2.10.4 任何帮
在Spring Integration中,我有如下消息: 我需要使用一些Rest服务来丰富/转换这个消息,这将为我提供属性值。 例如会回答我 最后一个对象应如下所示: 如何实现这一点? 应该是这样吗?https://www.youtube.com/watch?time_continue=273 入站适配器-
问题内容: 我的网络服务返回一个JSON对象,如下所示 即,当我将此地址放入chrome浏览器中时,我可以到达上面。 我正在尝试在浏览器中阅读此内容,以便可以创建一个下拉选项…但是从以下代码开始我一无所获: 我总是在警报框中。我用fiddler2再次检查了Web服务请求/响应是否正常,我什至可以拦截Web服务和浏览器之间的json对象。 我也试过 我又得到了空。 我已经看过Ajax要求的json响
我有一条骆驼路线,根据一些id进行拆分和聚合。当检索到一个id时,调用另一个endpoint来根据这个id检索项目信息。在检索项目信息之后,我必须通过调用多个enrich()方法来丰富它。在第一个enrich方法中,我必须做一些xpath处理,其中我将能够检索一个primaryOrgId值,我将在交换中将其设置为属性,不要担心xpath处理,我已经解决了这个问题,但我的问题是当我在第一个enric
在我的应用程序中,我想丰富一个无限的事件流。流本身是通过对ID进行散列来并行的。对于每个事件,都可能有一个对外部源(例如REST和DB)的调用。这个呼叫本质上是阻塞的。必须保持一个流分区内事件的顺序。 我的想法是创建一个RichMapFunction,它设置连接,然后轮询每个事件的外部源。阻塞调用通常耗时不长,但在最坏的情况下,服务可能会关闭。
我试图用ApacheCamel和Spring Boot编写我的第一个项目。它应该调用Restendpoint并处理数据,但从未调用我的处理器。我做错了什么? 日志显示路由已启动并且它从“Direct://httpRoute”中消耗。但在最后没有日志表明调用了MyProcess。 pplication.java 我的路线。JAVA pom.xml 日志