当前位置: 首页 > 知识库问答 >
问题:

在AMQP组件的Apache Camel XML DSL中配置Apache Qpid JMS(jakarta.jms)连接工厂

奚光霁
2023-03-14

尝试使用jakarta.jmsApache Qpid AMQP客户端来处理消息。

我正在尝试使用org.messagehub使用可池连接工厂。

独立java代码工作,请参阅java更改。当我尝试在Camel中使用相同的Spring-XML-DSL时,AMQP组件不支持雅加达。jmx连接工厂。

apache-amqp组件是否支持来自Apache Qpid客户端的JMS 2.0?

package org.example;


import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.camel.component.jms.JmsConfiguration;
import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;

import javax.naming.Context;
        import javax.naming.InitialContext;

public class AMQPArtemisClient {
    public static void main(String[] args) throws Exception {
 
        try {
            // The configuration for the Qpid InitialContextFactory has been supplied in
            // a jndi.properties file in the classpath, which results in it being picked
            // up automatically by the InitialContext constructor.
            Context context = new InitialContext();

            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");

            System.setProperty("USER","admin");
            System.setProperty("PASSWORD","admin");

            //added for poolable connection
            JmsPoolConnectionFactory poolConnectionFactory = new JmsPoolConnectionFactory();
            poolConnectionFactory.setMaxConnections(5);
            poolConnectionFactory.setConnectionFactory(factory);

            //  Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            Connection connection = poolConnectionFactory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);

            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);

            if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
            } else {
                System.out.println("No message received within the given timeout!");
            }

            connection.close();

        } catch (Exception exp) {
            System.out.println("Caught exception, exiting.");
            exp.printStackTrace(System.out);
            System.exit(1);
        }
    }

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}
  • 资源/jndi.properties
java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
connectionfactory.myFactoryLookup = amqp://localhost:5672
queue.myQueueLookup = queue
topic.myTopicLookup = topic
  • pom.xml
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-amqp</artifactId> <!--version 3.17.0 is used-->
    </dependency>

<dependency>
          <groupId>org.messaginghub</groupId>
          <artifactId>pooled-jms</artifactId>
          <version>3.0.0</version>  <!-- supports jakarta.jms connecton factory -->
      </dependency>
      <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>qpid-jms-client</artifactId>
          <version>2.0.0</version>
      </dependency>
  • Camel组件我试图配置AMPQ组件
   <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory" >
        <property name="username" value="admin"/>
        <property name="password" value="secret"/>
       <property name="remoteURI" value="amqp://localhost:5672" />
    </bean>

   <bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
        <property name="maxConnections" value="5" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>


    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
        <property name="concurrentConsumers" value="5" />
    </bean>
    <!-- uses the  AMQP component -->
   <bean id="jms" class="org.apache.camel.component.amqp.AMQPComponent">
        <property name="configuration" ref="jmsConfig" />
       <property name="connectionFactory" ref="jmsPooledConnectionFactory"/>
    </bean>

当我尝试下面这样的配置时,我得到异常雅加达。jms期望javax的异常。jms,现在我不得不将消息传递中心的版本降低到2.0.5,并使用qpid jms客户端jar到1.6.0。

<bean id="amqp" class="org.apache.camel.component.amqp.AmqpComponent">
   <property name="connectionFactory">
     <bean class="org.apache.qpid.jms.JmsConnectionFactory" factory-method="createFromURL">
       <property name="remoteURI" value="amqp://localhost:5672" />
       <property name="topicPrefix" value="topic://" />  <!-- only necessary when connecting to ActiveMQ over AMQP 1.0 -->
     </bean>
   </property>
 </bean>

共有1个答案

万知
2023-03-14

您正在使用的Camel AMQP组件不支持Jakarta JMS an,因此您必须恢复到Qpid JMS客户端和池JMS包装器的旧版本,才能使其工作。这是您唯一的选择,直到有一个支持雅加达的驼峰比特版本,然后您继续。

 类似资料:
  • 我为3节点Kafka集群设置了ACL,并能够通过生产者控制台和消费者控制台发送和接收主题。现在我想用ACL配置Kafka连接。我尝试了SASL_PLAINTEXT组合和连接。日志文件,它显示以下错误。它没有从源表同步到主题,请在我缺少任何配置的地方提供帮助。 错误日志 我的配置如下文件所示。我在jaas中提到过用户。conf文件并设置到环境中。 1: zookeeper.properties 2:

  • Hyperledger Composer使用连接配置文件连接到运行时。 创建连接配置文件 1.创建一个名为connection.json的新文件,其中包含Hyperledger Fabric v1.0的以下信息。} 为Hyperledger Fabric v1.0创建连接配置文件,使用以下格式: { "type": "hlfv1", "orderers":

  • 我对配置SJMS2组件的最佳方法感到困惑。我正在一个简单的测试应用程序中使用,并尝试使用SJMS2camel组件从编写到ActiveMQ Artemis。组件文档说它处理连接缓存之类的事情,我通常会在ConnectionFactory bean中配置这些事情,所以我感觉到在配置中应该比不使用Camel时定义的更少。 在使用Camel Spring时,文档似乎缺少如何配置jsms2路由及其Conne

  • 我正在使用RAD9.0,并尝试在WebSphere Application Server8.5中配置队列连接工厂。我在同一台机器(WIN764位)上安装了IBM MQ7.0(32位)。 配置队列连接工厂之后,当我单击Test connection时,会出现一个错误:

  • 低于范围的查询与连接工作正常lap,但不是在Hiveserver2/Hive. CLI。 请建议如何在Hive中使用范围连接查询。 配置单元版本:1.2.1.2.6 HDP版本:2.6.0.3 查询: 下面是在配置单元CLI或配置单元服务器2中运行时引发的错误: 错误:编译语句时出错:失败:SemanticException行0:-1在联接“obsv_stop_ts”(状态=42000,代码=40

  • 问题内容: 我有一组数据是通过将相似的子项目匹配在一起,然后按“类别”将这些相似的项目分组而创建的。 现在,必须以使每个“ group_id”内的相关类别分组在一起的方式匹配结果类别。在下面的示例中,一个匹配项是A-> B-> C-> D-> E-> F-> G,这是通过逐行重复获得的。 我已经发布了当前答案,该答案适用于此简单数据集,但是由于实际数据集最多包含1M行,并且每个“ group_id