当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ Artemis,连接累积

娄鹤轩
2023-03-14
2021-01-28 01:20:39,492 WARN  [io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: io.netty.channel.unix.Errors$NativeIoException: accept(..) failed: Too many open files

2021-01-28 01:20:39,656 WARN  [io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: io.netty.channel.unix.Errors$NativeIoException: accept(..) failed: Too many open files

2021-01-28 01:20:39,937 ERROR [org.apache.activemq.artemis.core.client] AMQ214016: Failed to create netty connection: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.epoll.EpollSocketChannel
    at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:310) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector.createConnection(NettyConnector.java:818) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector.createConnection(NettyConnector.java:785) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.openTransportConnection(ClientSessionFactoryImpl.java:1076) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.createTransportConnection(ClientSessionFactoryImpl.java:1125) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.establishNewConnection(ClientSessionFactoryImpl.java:1336) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.getConnection(ClientSessionFactoryImpl.java:931) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.getConnectionWithRetry(ClientSessionFactoryImpl.java:820) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:252) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:268) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$StaticConnector$Connector.tryConnect(ServerLocatorImpl.java:1813) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$StaticConnector.connect(ServerLocatorImpl.java:1682) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.connect(ServerLocatorImpl.java:536) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.connect(ServerLocatorImpl.java:524) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$4.run(ServerLocatorImpl.java:482) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) [artemis-commons-2.14.0.jar:2.14.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [java.base:]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [java.base:]
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.14.0.jar:2.14.0]
Caused by: java.lang.reflect.InvocationTargetException
    at jdk.internal.reflect.GeneratedConstructorAccessor17.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [java.base:]
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) [java.base:]
    at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    ... 23 more
Caused by: io.netty.channel.ChannelException: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files
    at io.netty.channel.unix.Socket.newSocketStream0(Socket.java:421) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.LinuxSocket.newSocketStream(LinuxSocket.java:319) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.LinuxSocket.newSocketStream(LinuxSocket.java:323) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.EpollSocketChannel.<init>(EpollSocketChannel.java:45) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    ... 27 more
Caused by: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files

此问题看起来类似:SocketException:打开的文件太多

至于我的用例,我从一个网站接收订单,并将它们处理到ERP中,然后将状态传回网站和其他系统。将消息发送回网站API有点慢,在事件发生时,可能有700条消息排队。

网站使用AMQP,我的消息路由使用JMS。

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 63805
max locked memory       (kbytes, -l) 16384
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 63805
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

这是我的broker.xml

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <persistence-enabled>true</persistence-enabled>
      <journal-type>NIO</journal-type>
      <paging-directory>/nfs/amqprod/data/paging</paging-directory>
      <bindings-directory>/nfs/amqprod/data/bindings</bindings-directory>
      <journal-directory>/nfs/amqprod/data/journal</journal-directory>
      <large-messages-directory>/nfs/amqprod/data/large-messages</large-messages-directory>
      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
      <journal-device-block-size>4096</journal-device-block-size>
      <journal-file-size>10M</journal-file-size>
      <journal-buffer-timeout>2628000</journal-buffer-timeout>
      <journal-max-io>1</journal-max-io>
      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>
      <critical-analyzer>true</critical-analyzer>
      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
      <critical-analyzer-policy>HALT</critical-analyzer-policy>
      <page-sync-timeout>2628000</page-sync-timeout>
      <jmx-management-enabled>true</jmx-management-enabled>
      <global-max-size>2G</global-max-size>

      <acceptors>

<!-- keystores will be found automatically if they are on the classpath -->
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:5500?sslEnabled=true;keyStorePath={path}/keystore.ks;keyStorePassword={pasword};protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>


      </acceptors>

      <!-- HA -->
      <connectors>
        <connector name="artemis">tcp://{Primary IP}:61616</connector>
        <connector name="artemis-backup">tcp://{Secondary IP}:61616</connector>
      </connectors>

      <cluster-user>activemq</cluster-user>
      <cluster-password>{cluster password}</cluster-password>

      <ha-policy>
        <shared-store>
          <master>
            <failover-on-shutdown>true</failover-on-shutdown>
          </master>
        </shared-store>
      </ha-policy>

      <cluster-connections>
        <cluster-connection name="cluster-1">
          <connector-ref>artemis</connector-ref>
          <!--<discovery-group-ref discovery-group-name="discovery-group-1"/>-->
          <static-connectors>
            <connector-ref>artemis-backup</connector-ref>
          </static-connectors>
        </cluster-connection>
       </cluster-connections>
      <!-- HA -->

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
          </security-setting>
          <security-setting match="SiteCore.#">
            <!--<permission type="createDurableQueue" roles="ecom"/>
            <permission type="deleteDurableQueue" roles="ecom"/>
            <permission type="createAddress" roles="ecom"/>-->
            <permission type="consume" roles="ecom,amq"/>
            <permission type="browse" roles="ecom,amq"/>
            <permission type="send" roles="ecom,amq"/>
         </security-setting>
         <security-setting match="eCommerce.#">
            <!--<permission type="createDurableQueue" roles="ecom"/>
            <permission type="deleteDurableQueue" roles="ecom"/>
            <permission type="createAddress" roles="ecom"/>-->
            <permission type="consume" roles="ecom,amq"/>
            <permission type="browse" roles="ecom,amq"/>
            <permission type="send" roles="ecom,amq"/>
        </security-setting>
    
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

<addresses>
  <address name="DLQ">
    <anycast>
      <queue name="DLQ" />
    </anycast>
  </address>
  <address name="ExpiryQueue">
    <anycast>
      <queue name="ExpiryQueue" />
    </anycast>
  </address>
  <address name="test1.test">
    <multicast>
      <queue name="test1.test.A">
        <filter string="JMSType='A'" />
      </queue>
      <queue name="test1.test.B" />
    </multicast>
  </address>
  <address name="SiteCore.test.address">
    <multicast>
      <queue name="SiteCore.test.queue" />
    </multicast>
  </address>
  <address name="AX.User.Import.Topic">
    <multicast>
      <queue name="AX.User.Import.Queue" />
    </multicast>
  </address>
  <address name="Boomi.API.eCommerce.SOF.Order.Audit.Queue">
    <anycast>
      <queue name="Boomi.API.eCommerce.SOF.Order.Audit.Queue" />
    </anycast>
  </address>
  <address name="eCommerce.Customer.Information.Topic">
    <anycast>
      <queue name="eCommerce.Customer.Information.Queue" />
    </anycast>
  </address>
  <address name="MAO.SOF.Item.Delete.Topic">
    <multicast>
      <queue name="MAO.SOF.Item.Delete.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Item.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.Item.Import.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Location.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.Location.Import.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Order.Fulfillment.Status.Topic">
    <multicast>
      <queue name="MAO.SOF.Order.Fulfillment.Status.Inventory.Reserve.Queue" />
      <queue name="MAO.SOF.Order.Fulfillment.Status.Sitecore.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.User.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.User.Import.Queue" />
    </multicast>
  </address>
  <address name="Marketing.NRM.New.Neighbor.Topic">
    <multicast>
      <queue name="Marketing.NRM.New.Neighbor.Responsys.Queue" />
    </multicast>
  </address>  
  <address name="Pet.Services.Event.Topic">
    <multicast>
      
      <queue name="Pet.Services.Event.Appointment.Booked.Queue">
        <filter string="JMSType='appointment-booked'" />
      </queue>
      <queue name="Pet.Services.Event.Appointment.Canceled.Queue">
        <filter string="JMSType='appointment-canceled'" />
      </queue>
      <queue name="Pet.Services.Event.Appointment.Rescheduled.Queue">
        <filter string="JMSType='appointment-rescheduled'" />
      </queue>
      <queue name="Pet.Services.Event.Client.Created.Queue">
        <filter string="JMSType='client-created'" /> 
      </queue>
      <queue name="Pet.Services.Event.Client.Deleted.Queue">
        <filter string="JMSType='client-deleted'" />
      </queue>
      <queue name="Pet.Services.Event.Client.Updated.Queue">
        <filter string="JMSType='client-updated'" />
      </queue>
    </multicast>
  </address>
  <address name="Process.Tracking.General.Topic">
    <multicast>
      <queue name="Process.Tracking.General.DB.Writer.Queue" />
    </multicast>
  </address>
  <address name="PSP.Utilities.Email.Send.Queue">
    <multicast>
      <queue name="PSP.Utilities.Email.Send.Queue" />
    </multicast>
  </address>
  <address name="SiteCore.Sales.Order.Submission.Topic">
    <multicast>
      <queue name="SiteCore.Sales.Order.Submission.Queue" />
    </multicast>
  </address>
  <!--<address name="SiteCore.SOF.Order.Fulfillment.Submission.Error.Queue">
    <anycast>
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.Error.Queue" />
    </anycast>
  </address>-->
  <address name="SiteCore.SOF.Order.Fulfillment.Submission.Topic">
    <multicast>
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.ActiveOmni.Queue" />
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.Inventory.Reserve.Queue" />
    </multicast>
  </address>
</addresses>

      <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>
      -->

   </core>
</configuration>

这就是那个我认为给我带来麻烦的客户。

using (new TimeMeasure("PSP.Commerce.Foundation.Common.Services.BoomiUserService:SendMessage"))
{
    _connection = await GetOrSetQueueConnection();
    var session = new Session(_connection);
    var sender = new SenderLink(session, typeof(T).Name, topicName);
    var serializedData = JsonConvert.SerializeObject(message, Formatting.None,
        new JsonSerializerSettings {NullValueHandling = NullValueHandling.Ignore});
    var serializedMessage = new Message(serializedData)
    {
        Properties = new Properties
        {
            CreationTime = DateTime.Now
        }
    };
    Log.Info($"Message with body {serializedData} sent to {topicName} during attempt {currentAttempt}/{maxNumberOfAttempts}", this);
    await sender.SendAsync(serializedMessage);
    await session.CloseAsync();
    await _connection.CloseAsync();  // This line was missing
    return true;
}

共有1个答案

易刚捷
2023-03-14

ActiveMQ Artemis已经使用Acceptor强制任何AMQP客户端的默认连接超时为60秒,其中AMQPidleTimeout未设置。有关这方面的更多细节,请参见文档。因此,任何“过时”的连接都应该在60秒内删除,您将看到指示连接已被清理的日志消息。

值得注意的是,与中断连接的网络问题相反,造成连接陈旧的最常见原因是编写不良的客户端,它们没有正确地管理它们的资源。

一般来说,我认为对于现代系统来说,打开文件的ulimit1024是相当低的。我建议你大幅提高这个数字。

 类似资料:
  • 代码片段如下所示: 如果有人有决议,请帮忙?

  • 我想知道为什么需要(又名reduce)第3个参数。对于那些不知道是什么的人,它的用法如下: 调用等同于: 还有可选第4个参数,它允许用任何其他操作替换加法。 我听说的一个基本原理是,如果你不需要加起来,而是乘一个向量的元素,我们需要其他的(非零)初始值: 但是为什么不像Python那样-为设置初始值,并使用从开始的范围。类似这样的事情: 这对任何行动都管用。为什么需要第三个参数?

  • 本文向大家介绍HTTP长连接、短连接?相关面试题,主要包含被问及HTTP长连接、短连接?时的应答技巧和注意事项,需要的朋友参考一下 在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个W

  • 问题内容: 我在两个表之间有多对多关系。 表包含我的餐厅。 表包含不同的类别。 表包含两列,每列分别包含两个表的ID。 以下陈述是我能想到的,但没有给我我想要的输出。 我希望输出是有关餐厅的信息,并在最后一列中是类别的连接行。 问题答案: 要串联值,可以使用。xml路径解决方案有误,应使用和特殊字符。 您也可以使用变量解决方案

  • 有人能告诉我在maven中scm连接和developerConnection之间的区别吗? 我正在尝试使用,它需要其中之一。 [错误]未能执行goal org . Apache . maven . plugins:maven-release-plugin:2 . 3 . 2:在项目was-topology-legacy-dsl上准备(default-cli ):缺少必需的设置:必须指定scm连接或

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。