当前位置: 首页 > 知识库问答 >
问题:

使用Artemis质子/Qpid客户端远程处理骆驼泉-挂起发送消息

史懿轩
2023-03-14
  • 当我使用Camel Spring远程配置发送一些消息时,生产者和消费者都在不同的JVM中运行。
    • 使用Apache artemis 2.14.0版本
    • camel(2.20.0)、qpid(0.54.0)、pooled-jms(1.1.1)的版本

    我使用LoadMessageSupport类来推送消息,我看到骆驼路由被调用,并且在调试日志消息下面。

    我注意到在Artemis控制台中启用了生产者会话。

    任何线索,如何调试此问题或是什么可能导致此问题。

    我安全地忽略了一些与调试相关的错误。

    ...
    DEBUG [main] (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=components,name="bean"
    DEBUG [main] (DefaultComponent.java:266) - Cannot resolve property placeholders on component: org.apache.camel.component.bean.BeanComponent@cda0432 as PropertiesComponent is not in use
    DEBUG [main] (AbstractAutowireCapableBeanFactory.java:448) - Creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
    DEBUG [main] (AbstractAutowireCapableBeanFactory.java:484) - Finished creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
     INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (processMessage) from(direct://proxy-msg-handler) --> log[Log message on incoming message with body] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
    DEBUG [main] (CamelLogger.java:153) - Log message on incoming message with body
     INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (SubmitNotificationEvent) log[Log message on incoming message with body] --> amqpcomponent://queue:message.queue <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
    DEBUG [main] (SendProcessor.java:147) - >>>> service-event-queue://queue:message.queue Exchange[ID-local-vm-1624040900482-0-1]
    DEBUG [main] (InternalLoggerFactory.java:45) - Using SLF4J as the default logging framework
    ...
    DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 } is now open: 
     INFO [AmqpProvider :(1):[amqp://localhost:5672]] (JmsConnection.java:1339) - Connection ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 connected to server: amqp://localhost:5672
    DEBUG [main] (JmsTemplate.java:492) - Executing callback on JMS Session: JmsPoolSession { org.apache.qpid.jms.JmsSession@7fd26ad8 }
    DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProducerBuilder.java:68) - Creating AmqpFixedProducer for: null
    DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade 
    

    启用跟踪级别日志后,请注意下面的消息

    DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade { org.apache.qpid.jms.provider.amqp.message.AmqpJmsObjectMessageFacade@36cc9385 }
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpFixedProducer.java:100) - Holding Message send until credit is available.
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
    TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
    
    • 下面是我用来从java类发送消息的上下文xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
        xmlns:util="http://www.springframework.org/schema/util"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
             http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
             http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
        http://www.springframework.org/schema/util  ttp://www.springframework.org/schema/util/spring-util.xsd">
    
      <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
         <property name="remoteURI" value="amqp://localhost:5672?amqp.traceFrames=true"/>
      </bean>
    
      <bean id="jpcf" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop" >
        <property name="maxConnections" value="3" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
      </bean>
    
      <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jpcf" />
        <property name="concurrentConsumers" value="3" />
      </bean>
    
      <bean id="amqpcomponent" class="org.apache.camel.component.amqp.AMQPComponent">
        <property name="configuration" ref="jmsConfig" />
      </bean>
      
      
        <!-- Camel Spring Remoting Interface -->
        <camel:proxy id="proxyObject" binding="false" serviceUrl="direct:proxy-msg-handler" serviceInterface="com.myexample.MessageHandler"/>       
    
        <!-- Bean that initialize the Spring Remoting for handling message -->
        <bean id="BeanProxy" class="com.myexample.MessageProducer">
            <property name="messageHandler" ref="proxyObject"/>
        </bean>
        
        <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
            
            <camel:route autoStartup="true" id="processMessage">
                <camel:from uri="direct:proxy-msg-handler"/>
                <camel:log message="Log incoming message" logName="Incoming" loggingLevel="DEBUG"/>
                 <camel:inOnly uri="amqpcomponent:queue:message.queue"/>
            </camel:route>
        </camelContext>
    </beans>
    
    • 运行上下文的java类,用于调用远程Springbean方法
      • 使用below java类将消息推送到Artemis队列
      package com.myexample;
      
      public class LoadMessageSupport {
      
          public static void main(String ...strings) {
              ApplicationContext appContext =null;
              try {
                  appContext = new ClassPathXmlApplicationContext("file:/paht/to/context/message-handler-context.xml");
                  MessageProducer messageProducer = appContext.getBean(MessageProducer.class);
                   message = "{ \"itemDesc\" : \"test description\" }" ;
                  System.out.println(message);
                  messageProducer.sendMessage(MessageType.ITEM_DESC, message); 
                 // enum messagetype already defined within project
      
                  //System.exit(0);
      
              }catch(Exception exe) {
                  System.out.println("Something wrong... ");
                  exe.printStackTrace();
              }finally {
                  if(camelContext!=null) {
                      System.out.println("camel context stopped...");
                      camelContext.stop();
                  }
              }
          }
      }
      
      • 消息接收器类
      
      @InOnly
      public interface MessageHandler{
         public processMessage(MessageType type, Order order);
         public processMessage(MessageType type, String message); // trying to invoke this message
      }
      
      • 生产者阶级
      
      public class MessageProducer{
      
        ​// using the proxy object within the producer object 
        ​// this will invoke the spring bean using remote (rmi)
        ​private MessageHandler messageHandler;
      
        protected MessageHandler getMessageHandler() {
          return this.messageHandler;
         }
      
         public void setMessageHandler(MessageHandler messageHandler) {
          this.messageHandler = messageHandler;
          }
      
        //constructor 
        public ​MessageProducer() {}
        ​
        public void sendMessage(MessageType type, Order order ){
          ​getMessageHandler().processMessage(type,order);
        ​}
      
        ​public void sendMessage(MessageType type, String message ){
          ​getMessageHandler().processMessage(type,message);
        ​}
      
      • 消息接收器
      
      public class MessageReceiver implements MessageHandler {
       
        @Handler 
        public void processMessage(MessageType type, Order order){
         System.out.println(" received type and ORDER info ...");
         // invoke methods for logical processing
        }
      
        @Handler
        public void processMessage(MessageType type, String message){
         System.out.println(" received type and MESSAGE info for procesing...");
         // invoke methods for logical processing
        }  
      }
      

共有1个答案

闻飞跃
2023-03-14

当我之前尝试时,似乎我的VM没有足够的内存。

free-h表示只剩下500MB。

重新启动VM后,现在将消息发送到Artemis borker。

 类似资料:
  • 我试图将一个Wildfly13实例连接到一个独立的Artemis ActiveMQ,从它生成和使用消息。因此,我遵循了Wildfly13文档和Artemis ActiveMQ文档的相关部分。此外,我尝试使用本教程,并使用JMS2.0Simplified-API对其进行了一些定制。 当我向/Produce发送有效载荷时,不会引发异常。所以我相信JNDI有一个问题。我已经用Wireshark监视了正在

  • 我正在尝试从目录中选取一个文件,拆分一个文件,并将每一个拆分行添加到ActiveMQ中。我在这个过程中遇到了异常处理的问题。假设目录中的文件是一个二进制文件(可执行文件),那么splitter会显示org.apache.camel.runtimeCamelException和java.nio.charset.MalFormedInputException异常。如果出现这种情况,那么我需要捕获这些异

  • 使用Qpid客户端连接Apache Artemis代理以实现高可用性。 代理实例在两个节点中运行,并在broker.xml中列出复制配置 Brokers实例在node1(主)和node2(从)上启动,并且运行时没有任何问题。 驼峰qpid jms客户端使用URL配置为故障切换:(amqp://localhost:5672,amqp://localhost:5673),在执行camel客户端时,上下

  • 我有一个队列系统,骆驼只是其中的一小部分。在此队列系统中,对于某些队列,代理在队列已满时返回 FAIL。为了解决这个问题,我查看我得到的 JMS 异常,从消息中我可以看到原因是否是队列已满。 我想在Camel中实现的是,对于满队列的特定情况,我希望重试传递,而对于任何其他JMS异常(或任何其他异常),我希望将其发送到DLQ。 我假设我必须使用onException(JMSException.cla

  • 我想使用Apache Qpid订阅Java消息服务(JMS)发布订阅服务。然而,我不想使用Java,而是想使用C。我的客户告诉我这是可能的(甚至说是微不足道的)。它们是否正确?有人能给我举个例子吗?我所看到的每一处都表明,要使用JMS,我必须使用Java。这里的要点是,该服务是第三方服务(因此我不能将其更改为使用AMQP或JMS以外的任何其他协议)。

  • 我有一个ASP.NET Core1.0Web API应用程序,并试图弄清楚如果我的控制器调用的函数出错,如何将异常消息传递给客户端。 我确实看到了一些使用的文档,但是为了使用它,我必须安装compat shim。在Core1.0中有没有一种新的方法来做这些事情? 这是我一直在尝试的垫片,但它不起作用: 当抛出时,我查看客户端,在内容中找不到我正在发送的消息。