当前位置: 首页 > 知识库问答 >
问题:

JavaCamel框架:在处理器中丢失消息体

章增
2023-03-14

全部的

这里有一条简单的路线:

 <route>
    <from uri="jetty://http://0.0.0.0:9090/myproject" />
    <setExchangePattern pattern="InOnly" />
    <process ref="JsonValidator"/> 
    <unmarshal>
       <json library="Jackson" unmarshalTypeName="com.myproject.JsonPojo"/>
    </unmarshal>
    ...
 </route>

JsonValidator是一个简单的Javabean,我在其中扩展了处理器。在这里,我想确保在我继续使用Jackson散集调用以将JSON散集到我的POJO之前,所有必需的字段都被传入。

我现在在豆子里做的只是一行:

  public void process(Exchange exchange) throws Exception {
      String input = exchange.getIn().getBody(String.class);
  }

只需调用exchange.getIn().getBody(String.class),就会导致路由中的下一个(解组)步骤抛出一个错误,表示没有要解组的内容。事实上,我通过在JsonValidator之后添加另一个处理器来测试这一点——在那里,交换体不是空的,而是空的。

错误如下:

com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
         at [Source: org.apache.camel.converter.stream.InputStreamCache@78f0a00a; line: 1, column: 1]
            at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164)
            at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:2931)
            at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2873)
            at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2080)
            at org.apache.camel.component.jackson.JacksonDataFormat.unmarshal(JacksonDataFormat.java:105)
            at org.apache.camel.processor.UnmarshalProcessor.process(UnmarshalProcessor.java:65)
            at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
            at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
            at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
            at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
            at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
            at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
            at org.apache.camel.component.jetty.CamelContinuationServlet.service(CamelContinuationServlet.java:151)
            at javax.servlet.http.HttpServlet.service(HttpServlet.java:668)
            at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1496)
            at org.eclipse.jetty.servlets.MultiPartFilter.doFilter(MultiPartFilter.java:136)
            at org.apache.camel.component.jetty.CamelFilterWrapper.doFilter(CamelFilterWrapper.java:44)
            at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1467)
            at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:499)
            at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
            at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
            at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
            at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
            at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
            at org.eclipse.jetty.server.Server.handle(Server.java:370)
            at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
            at org.eclipse.jetty.server.AbstractHttpConnection.content(AbstractHttpConnection.java:982)
            at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.content(AbstractHttpConnection.java:1043)
            at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:865)
            at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:240)
            at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
            at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
            at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
            at java.lang.Thread.run(Thread.java:744)

我错过了什么?

共有1个答案

祝高超
2023-03-14

流(通常)只能读取一次。读取后,您必须将结果存储在交易所中:

public void process(Exchange exchange) throws Exception {
    String input = exchange.getIn().getBody(String.class);
    exchange.getIn().setBody(input);
}

或者,您可以让Camel按照Camel留档中的说明进行缓存。

 类似资料:
  • 主要内容:一、业务场景,二、意外宕机,问题凸现,三、总结一、业务场景 这篇文章,我们来看看订单服务和消息服务是如何基于MQ来收发消息的。 我们稍微把这个图细化一点,简单来说就是多个订单服务实例给queue推送消息,多个仓储服务每个消费一部分消息。如下图所示: 二、意外宕机,问题凸现 假如你线上对MQ技术的使用就到此为止了,那么基本可以跟offer说拜拜了。。。 因为如果是我的话,作为一个面试官就没法继续往下问了。你这个MQ的使用以及理解的深度仅此而已的

  • 我正在用SQS和JavaSDK发送和接收消息。几乎所有的消息都工作正常,但是其中一些丢失了,我不明白为什么。这是发送消息的代码: 以及接收代码(在循环中运行): 问题是,我能够接收到一些消息,但有些消息不是(总是相同类型的数据)。发送和接收的代码对于所有消息都是相同的。应用程序日志: 正在发送消息:{QueueUrl:https://sqs.us-east-1.amazonaws.com/0000

  • 在网页应用中,你经常需要在处理完表单或其它类型的用户输入后,显示一个通知消息(也叫做“flash message”)给用户。 对于这个功能,Django 提供基于Cookie 和会话的消息,无论是匿名用户还是认证的用户。其消息框架允许你临时将消息存储在请求中,并在接下来的请求(通常就是下一个请求)中提取它们并显示。每个消息都带有一个特定level 标签,表示其优先级(例如info、warning

  • 寻找设计我的Kafka消费者的最佳方法。基本上,我想看看什么是避免数据丢失的最佳方法,以防在处理消息期间出现任何异常/错误。 我的用例如下。 a)我使用SERVICE来处理消息的原因是 - 将来我计划编写一个ERROR处理器应用程序,该应用程序将在一天结束时运行,它将尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。 b)我想确保没有消息丢失,所以我会将消息保存

  • 鉴于以下情况: 我在本地启动zookeeper和单个kafka代理,并创建“测试”主题,如kafka快速入门中所述:https://kafka.apache.org/quickstart 然后,我运行一个简单的java程序,该程序每秒向“测试”主题生成一条消息。一段时间后,我关闭了本地的kafka代理,看到制作人继续生成消息,它没有抛出任何异常。最后,我再次启动kafka broker,produ

  • 我试图在我的电脑(Windows 8 Pro 64x和XAMPP 3.2.1)上安装composer,因此我下载了Windows Installer并运行它,但出现以下错误: 缺少openssl扩展,这意味着不可能进行安全的HTTPS传输。 我搜索了这个问题并尝试了一些解决方法 extension=php\u openssl。动态链接库 在php中没有注释。C:\xampp\php\php中的in