当前位置: 首页 > 知识库问答 >
问题:

如何使用故障转移传输处理Activemq的最大帧大小异常

洪飞扬
2023-03-14

我正在开发一个使用activemq交换消息的应用程序,有些消息太大了,我想取消。

我们使用两个activemq实例(主/从)的activemq故障转移传输。代理本身对消息有100mb的帧大小限制。

问题是:如果我尝试发送大于100mb的消息,ActiveMQ服务器将关闭连接。此时,故障转移传输将尝试重新连接并再次发送消息,从而创建无限循环。

客户端记录以下内容:

2017-01-05 09:19:11.910  WARN 14680 --- [0.1:61616@57025] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}

java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]

2017-01-05 09:19:11.921  INFO 14680 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to tcp://localhost:61616
2017-01-05 09:19:11.923  WARN 14680 --- [0.1:61616@57026] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}

java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]

当activeMQ实例记录时:

2017-01-05 09:19:11,909 | WARN  | Transport Connection to: tcp://127.0.0.1:57025 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57025@61616
2017-01-05 09:19:11,922 | WARN  | Transport Connection to: tcp://127.0.0.1:57026 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57026@61616

我试图设置一个TransportListener来验证我是否可以捕获这个案例,但是我只接收到一个transportInterupted事件,没有任何分类器。

我阅读了关于故障转移传输的文档(http://ActiveMQ . Apache . org/failover-transport-reference . html ),也许我可以使用maxReconnectAttempts,但我知道在更常见的情况下我会有几个缺点(比如服务器暂时不可用)。

如何检测到这种情况,避免客户端和服务器之间的无限连接循环?

共有2个答案

广瑞
2023-03-14

我不相信这是可能的。您正在尝试对故障转移后不会冒泡的异常进行错误处理分类:传输。如果超过最大客户端数,可能会发生相同类型的异常。

在发送之前检查消息大小听起来是一个可行的选项。

尺寸检查不符合您的要求有什么原因吗?

public String mySendMessage(String body) {
....
if(body.length > MAX_ALLOWED) .. 
   throw new Exception.. or log.. or other
else
   producer.send(session.createTextMessage(body));
吴西岭
2023-03-14

正如你这样说的

最大连接尝试 -1 |来自 ActiveMQ 5.6 的 0:默认值为 -1,请永久重试。0 表示禁用重新连接,例如:只需尝试连接一次。在 ActiveMQ 5.6 之前:默认值为 0,请永久重试。所有活动 MQ 版本:一个值

因此,如果您希望传输侦听器在由于消息大小而重试失败后收到传输失败的通知,则需要将maxReconnectAttempts设置为一个值

如果您希望在发送前检查消息大小,您可以在运行时通过jmx访问在代理端的uri中配置maxFrameSize,并获取BrokerViewMBean实例,然后调用getTransportConnectorByType方法http://ActiveMQ . Apache . org/maven/API docs/src-html/org/Apache/ActiveMQ/broker/JMX/BrokerViewMBean . html # line . 304这将返回在activemq.xml中配置的uri,您可以解析该uri以检索maxFrameSize。

JMXServiceURL url = new     JMXServiceURL("service:jmx:rmi:///jndi/rmi://hist:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

ObjectName activeMq = new ObjectName("org.apache.activemq:Type=Broker,BrokerName=localhost");

BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, true);
String uri = mbean.getTransportConnectorByType("tcp");// or ("ssl") 
String[] pairs = uri.split("&");
for (String pair : pairs) {
    if (pair.startsWith("wireFormat.maxFrameSize")) {
        int idx = pair.indexOf("=");
        System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
    }
}

http://activemq.apache.org/maven/apidocs/org/apache/activemq/broker/jmx/BrokerViewMBean.html#getTransportConnectors - 将返回作为键的传输名称和作为值的 uri 的映射

要获得更好大小的消息,您可以这样做:

        OpenWireFormat opf = new OpenWireFormat();
        opf.setTightEncodingEnabled(true);
        ByteSequence tab = opf.marshal(message);
        System.out.println(tab.length);

你的生意一定是这样的:

import java.io.IOException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.ByteSequence;

public class SimpleSenderMaxSizeManager {

    private static Connection conn = null;
    private static boolean transportChanged;
    private static Long MAX_FRAME_SIZE;

    public static void main(String[] args) throws JMSException {
        try {
            SimpleSenderMaxSizeManager.updateMaxSize("host1");
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
                    "failover:(tcp://host1:5670,tcp://host2:5671)?randomize=false");
            cf.setTransportListener(new TransportListener() {

                @Override
                public void transportResumed() {
                    if (transportChanged) {
                        transportChanged = false;
                        try {
                            SimpleSenderMaxSizeManager.updateMaxSize(null);
                        } catch (Exception e) {
                        }
                    }
                }

                @Override
                public void transportInterupted() {
                    transportChanged = true;
                }

                @Override
                public void onException(IOException error) {
                }

                @Override
                public void onCommand(Object command) {
                }
            });
            conn = cf.createConnection();
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(session.createQueue("TEST"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("test");
            OpenWireFormat opf = new OpenWireFormat();
            opf.setTightEncodingEnabled(true);
            ByteSequence tab = opf.marshal(msg);
            System.out.println(tab.length);
            if (tab.length >= MAX_FRAME_SIZE) {
                throw new RuntimeException(tab.length + ">=" + MAX_FRAME_SIZE);
            }
            producer.send(msg);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (Exception e) {
                }
            }
        }
    }

    protected static void updateMaxSize(String host) throws Exception {
        JMXConnector jmxc = null;
        try {
            String jmxHost = host;
            String scheme = null;
            if (conn == null) {
                scheme = "tcp";
            } else {
                org.apache.activemq.transport.TransportFilter responseCorrelator = (TransportFilter) ((ActiveMQConnection) conn)
                        .getTransport();
                TransportFilter mutexTransport = (TransportFilter) responseCorrelator.getNext();
                FailoverTransport failoverTransport = (FailoverTransport) mutexTransport.getNext();
                while (failoverTransport.getConnectedTransportURI() == null) {
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                    }
                }
                scheme = failoverTransport.getConnectedTransportURI().getScheme();
                jmxHost = failoverTransport.getConnectedTransportURI().getHost();
            }
            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + jmxHost + ":1099/jmxrmi");
            Map<String, String[]> env = new HashMap<>();
            String[] creds = { "admin", "admin" };
            env.put(JMXConnector.CREDENTIALS, creds);
            jmxc = JMXConnectorFactory.connect(url, env);
            MBeanServerConnection conn = jmxc.getMBeanServerConnection();
            ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
            BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                    true);
            String value = mbean.getTransportConnectorByType(scheme);
            String[] pairs = value.split("&");
            for (String pair : pairs) {
                if (pair.contains("wireFormat.maxFrameSize")) {
                    int idx = pair.indexOf("=");
                    System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
                    MAX_FRAME_SIZE = Long.valueOf(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
                    MAX_FRAME_SIZE -= 1000;// security for JMS headers added by
                                            // session on sending
                }
            }
        } finally {
            if (jmxc != null) {
                try {
                    jmxc.close();
                } catch (Exception e) {
                }
            }
        }
    }
}
 类似资料:
  • 问题内容: 在我的JMS应用程序上,我们在生产者上使用临时队列,以便能够接收来自消费者应用程序的回复。 我在这个线程上遇到了与我完全相同的问题:http : //activemq.2283324.n4.nabble.com/jira- Created-AMQ-3336-Temporary-Destination-errors-on-HA-failover-in -broker- network-w

  • 我们使用MQ作为传递消息的主要路径。这是我们的制度运作不可或缺的一部分。消息代理有时会失败,所有相关的队列也会随之失败。在camel中,有没有一种方法可以启动故障切换,并在其启动时恢复到主故障切换?

  • 我对ActiveMQ故障转移传输有问题。我使用Spring(3.0.5)和ActiveMQ(5.2.0)。我想使用ActiveMQ主题来广播一些消息。我的配置如下: 我正在创建简单的ActiveMQConnectionFactory,它使用异步发送并用PooledConnetionFactory包装它,以便以后的JMSTemboard可以重用池连接。 在java配置中,我使用池连接工厂定义JmsT

  • 因此,如果我理解正确的话,在检测并重新启动失败代理的环境中运行Artemis代理集群将提供与运行每个活动服务器都与备份配对的集群相同的语义(以及类似的可用性)。对吗?

  • 我注意到,当连接的Artemis节点宕机时,连接到节点2-4的客户机不会故障转移到其他3个可用的主节点,基本上不会发现其他节点。即使在原始节点恢复之后,客户端仍然无法建立连接。我从一个单独的堆栈溢出帖子中看到,不支持主到主故障转移。这是否意味着对于每个主节点,我也需要创建一个从节点来处理故障转移?这是否会导致两个实例点失败,而不是集群中有许多节点? 在一个单独的基本测试中,使用一个主从两个节点的集

  • 目前,我正在使用ActiveMQ,并计划将系统迁移到ActiveMQ Artemis。目前,我有3个生产者和3个消费者,只有一个ActiveMQ服务器/代理。