当前位置: 首页 > 知识库问答 >
问题:

同时使用ActiveMQ Artemis和5.x侦听器-NullPointerException

弘浩瀚
2023-03-14

我有一个遗留的Spring4.2.1.Release应用程序,它作为侦听器连接到ActiveMQ5.x,现在我们将添加到ActiveMQ Artemis的连接。对于Artemis,我们使用持久订阅,因为我们不希望在订阅服务器关闭时某个主题上的消息丢失,而共享订阅,因为我们希望选择集群或使用并发来异步处理订阅中的消息。我有单独的ConnectionFactoryListenerContainer,但是从这个不断重复的Warn日志来看,Artemis DMLC由于以下NPE无法启动:

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

从表面上看,它似乎找不到方法createSharedDurableConsumer。查看我所拥有的AbstractMessageListenerContainer,第856行调用method.invoke

/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
        Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);

...

Method method = (isSubscriptionDurable() ?
                        createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
    return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
@Configuration
public class ArtemisConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory artemisConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
                .createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());

        artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
        artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
        artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
        artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
        artemisConnectionFactory
                .setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
        artemisConnectionFactory.setInitialConnectAttempts(
                env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
        artemisConnectionFactory
                .setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
        artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
        artemisConnectionFactory
                .setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
        artemisConnectionFactory.setBlockOnAcknowledge(true);
        artemisConnectionFactory.setBlockOnDurableSend(true);
        artemisConnectionFactory.setCacheDestinations(true);
        artemisConnectionFactory.setConsumerWindowSize(0);
        artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

        cachingConnectionFactory
        .setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
        cachingConnectionFactory.setReconnectOnException(true);

        return cachingConnectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
            JmsTransactionManager artemisJmsTransactionManager,
            MappingJackson2MessageConverter mappingJackson2MessageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        factory.setConnectionFactory(artemisConnectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setMessageConverter(mappingJackson2MessageConverter);
        factory.setSubscriptionDurable(Boolean.TRUE);
        factory.setSubscriptionShared(Boolean.TRUE);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(artemisJmsTransactionManager);

        return factory;
    }

    private TransportConfiguration[] createTransportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
        String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
        Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);

        primaryTransportParameters.put("host", primaryHostname);
        primaryTransportParameters.put("port", primaryPort);

        return new TransportConfiguration[] {
                new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
                new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
    }
}

我的pom使用Artemis的2.10.0版本。

我怎么解决这个?

共有1个答案

严宇
2023-03-14

JMS2.0规范与JMS1.1是向后兼容的,因此请确保类路径上只有JMS2规范。我的预感是Spring代码中的反射调用变得混乱,因为它们碰到的是JMS1.1规范类,而不是正确的JMS2规范类。

 类似资料:
  • 我正在尝试创建一个应用程序,使用新的Android服务来读取所有通知,也可以删除它们,但它并不起作用。在我的舱单上: 我有一个类NotificationListener扩展了NotificationListenerService我重写了两个方法OnNotificationPost(StatusBarNotification sbn)、onNotificationRemoved(StatusBarN

  • 问题内容: 以下两段代码之间的区别是什么? 和 问题答案: 的形式被提交时将只被调用 和 提交的值是从初始值不同。因此, 仅触发HTML DOM 事件时不会调用它。如果您想在HTML DOM 事件期间提交表单,则需要向输入组件添加另一个没有listener(!)的表单。它将导致仅处理当前组件的表单提交(如中所述)。 当使用代替时,默认情况下它将在HTML DOM 事件期间执行。在表示复选框或单选按

  • 我试图让队列在laravel 5中工作,队列侦听器正在输出: 未定义索引:表 存在"作业"和"failed_jobs"表,config.php设置为"数据库"。 搜索laravel论坛和google都没有找到解决办法,艾米的想法去哪里找?

  • 我们有web服务(jaxws),它正在调用另一个web服务(aslo jaxws)。Jaxws客户端配置如下所示-- 正如您所看到的,我们有两个拦截器和一个故障侦听器。我们希望在这些拦截器、故障侦听器和web服务代码之间进行通信。正如SO线程中所述,我们使用cxf交换对象在web服务和拦截器之间进行通信。 我们的inFaultInterceptor代码如下所示-- } Web服务代码如下所示- 但

  • 问题说明:

  • 我对处理SpringAMQP监听器超时能力有一个要求,即我们从生产者那里发送一条消息,Spring AMQP的消费者监听器线程收到这条消息,但是说需要很多时间来执行自己并被挂起,这最终会导致监听器线程被呈现无法使用。 那么,有没有办法让Spring AMQP提供任何使用者超时设置,以便在给定超时时间后再次释放侦听器线程