当前位置: 首页 > 知识库问答 >
问题:

从esb的代理向消息代理的主题发布消息时出错

姬俊能
2023-03-14

每当我试图将消息从esb发布到消息代理的主题时,我都会收到此错误

[2013-04-19 14:51:45,930] ERROR - AMQConnection Throwable Received but no listener set: org.wso2.andes.client.AMQNoRoute
Exception: Error: No Route for message [error code 312: no route]

我的代理代码是

    <proxy name="SendMessageProxy" transports="http" startOnLoad="true">    
<target>       
<endpoint>          
<address uri="jms:/myTopic?&amp;transport.jms.DestinationType=topic"/>       
</endpoint>       
<inSequence>          
<log level="custom">             
<property name="STATE" value="message is sent to queue"/>         
 </log>          
<property name="OUT_ONLY" value="true"/>          
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>       
</inSequence>       
<outSequence/>    
</target> 
</proxy>`

我的jndi配置和axis 2配置配置正确。我的MB在端口9444上运行,Publisher_esb在端口9443上运行,subscriber esb在端口9446上运行。当我使我的订阅者保持活动状态时,如果我从我的发布者发布一条消息,该消息会反映到订阅者。

从订户获取消息的代码是

package xml.parser;

import org.w3c.dom.*;
import javax.xml.xpath.*;
import javax.xml.namespace.NamespaceContext;
import javax.xml.parsers.*;

import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;

import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Properties;

public class Parser {

    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory";
    private static final String CF_NAME_PREFIX = "connectionfactory.";
    private static final String CF_NAME = "qpidConnectionfactory";
    String userName = "admin";
    String password = "admin";
    private static String CARBON_CLIENT_ID = "carbon";
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon";
    private static String CARBON_DEFAULT_HOSTNAME = "localhost";
    private static String CARBON_BROKER_PORT = "5673";
    String topicName = "myTopic";

    public static void main(String[] args) throws NamingException,
            JMSException, XPathExpressionException,
            ParserConfigurationException, SAXException, IOException {

        Parser queueReceiver = new Parser();
        String message = queueReceiver.subscribe();

    }


    public String subscribe() throws NamingException, JMSException {

        String messageContent = "";
    Properties properties = new Properties();
    properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF);
    properties.put(CF_NAME_PREFIX + CF_NAME,
            getTCPConnectionURL(userName, password));
    properties.put("topic." + topicName, topicName);
    System.out.println("getTCPConnectionURL(userName,password) = "
            + getTCPConnectionURL(userName, password));
    InitialContext ctx = new InitialContext(properties);
    // Lookup connection factory
    TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
            .lookup(CF_NAME);
    TopicConnection topicConnection = connFactory.createTopicConnection();
    topicConnection.start();
    TopicSession topicSession = topicConnection.createTopicSession(false,
            QueueSession.AUTO_ACKNOWLEDGE);
    // Send message
    //Topic topic = topicSession.createTopic(topicName);
    Topic topic = (Topic) ctx.lookup(topicName);
    javax.jms.TopicSubscriber topicSubscriber = topicSession
            .createDurableSubscriber(topic,"admin");
    Message message = topicSubscriber.receive();
    if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        System.out.println("textMessage.getText() = "
                + textMessage.getText());
        messageContent = textMessage.getText();
    }
    topicSession.close();
    topicConnection.close();

    return messageContent;      }

    public String getTCPConnectionURL(String username, String password) {
        return new StringBuffer().append("amqp://").append(username)
                .append(":").append(password).append("@")
                .append(CARBON_CLIENT_ID).append("/")
                .append(CARBON_VIRTUAL_HOST_NAME).append("?brokerlist='tcp://")
                .append(CARBON_DEFAULT_HOSTNAME).append(":")
                .append(CARBON_BROKER_PORT).append("'").toString();

    }

}

当我第一次运行订阅者时,它给了我结果,但之后它给出了异常:

[2013-04-19 17:24:26,947] ERROR {org.wso2.andes.transport.network.mina.MinaNetworkHandler} -  Exception caught by Mina
java.io.IOException: An existing connection was forcibly closed by the remote host
        at sun.nio.ch.SocketDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
        at sun.nio.ch.IOUtil.read(IOUtil.java:191)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
        at org.apache.mina.transport.socket.nio.SocketIoProcessor.read(SocketIoProcessor.java:218)
        at org.apache.mina.transport.socket.nio.SocketIoProcessor.process(SocketIoProcessor.java:198)
        at org.apache.mina.transport.socket.nio.SocketIoProcessor.access$400(SocketIoProcessor.java:45)
        at org.apache.mina.transport.socket.nio.SocketIoProcessor$Worker.run(SocketIoProcessor.java:485)
        at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:51)
        at java.lang.Thread.run(Thread.java:722)
[2013-04-19 17:24:26,957] ERROR {org.wso2.andes.server.protocol.AMQProtocolEngine} -  IOException caught in/127.0.0.1:16
513(admin), session closed implictly: java.io.IOException: An existing connection was forcibly closed by the remote host

[2013-04-19 17:33:40,283]  INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} -  Closing channel due to: Cannot sub
scribe to queue carbon:admin as it already has an existing exclusive consumer
[2013-04-19 17:33:40,283]  INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} -  Channel[1] awaiting closure - proc
essing close-ok
[2013-04-19 17:33:40,283]  INFO {org.wso2.andes.server.handler.ChannelCloseOkHandler} -  Received channel-close-ok for c
hannel-id 1
[2013-04-19 17:40:48,867]  INFO {org.wso2.andes.server.queue.SimpleAMQQueue} -  Auto-deleteing queue:tmp_127_0_0_1_16587
_1

我不能在不激活订阅者的情况下向主题发布消息吗?如何使我的消息持久或持久?我还有一个问题是:如何获取与我创建的主题关联的队列名称,或者我可以专门为我的主题myTopic创建一个队列吗?期待您的回答。提前感谢。

共有1个答案

蔡鹏程
2023-03-14

MB 2.0.1 中的持久订户存在一些已知问题,这可能是由于这个原因造成的。请从此处尝试 MB 2.1.0 - Alpha 版本,看看是否在此处解决。这些问题将在 MB 2.1.0 中修复,该版本将在几周内发布。

 类似资料:
  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 我们需要将消息从一个ActiveMQ代理复制到另一个代理。这里消息必须只是复制,并且消息应该存在于两个代理中。 我可以想到一个自定义应用程序,它订阅某个目标并读取该消息并将消息重新发布到多个代理中的目标。 我没有权限在经纪人中进行更改,所以我想不出经纪人网络选项。 是否有任何最佳实践或工具可用于将A-MQ消息从一个代理复制到另一个代理?

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 它提供的Swagger UI和rest API规范给了我灵感。 最近,我开始使用消息代理(特别是RabbitMQ),我希望有一些工具来开始开发具有消息和队列规范的面向微服务的系统。 是否有任何工具允许制作这些规范?从代码生成文档还是从模型生成代码? 提前感谢! 更新: 如本文所述。队列,交换,填充,用户和权限可以用RabbitMQ配置文件来描述。 AsyncAPI 看起来是一个很好的解决方案

  • 我有两个具有双工网桥配置的代理。有时会出现以下情况:代理之间存在连接,从代理B到代理a的消息正常转发。但不从代理A转发到代理B。重新启动代理B时,将发送所有消息。日志中没有错误。ActiveMQ版本5.10。 代理 A 的配置: 经纪商B配置: 如何制作可靠的双工桥?

  • 在本公司的最后一个项目中:客户提出身份验证等请求,应用程序第一层得到客户请求并在Kafka上生成消息,核心服务消费该消息后向银行服务提出rest请求,得到响应后在Kafka上生成响应消息,应用程序第一层将消息传递给客户。是真的Kafka用例,还是去掉第一层和Kafka,在客户端和核心之间使用rest服务更好。谢谢