我使用ActiveMQ Artemis 2.16.0作为我的代理和artemis-jms-client-2.16.0.jar
作为我的JMS客户端。感觉我随机丢失了一些信息,原因我不知道。我调查了我的Java代码,没有发现任何异常。
我有个方法
@JmsListener(destination = "${myQueue}", containerFactory = "jmsListenerContainerFactory")
@Override
public void process(Message message) {
try {
processMessage(Message message);
} catch (Exception ex) {
LOG.error("Error[...]", ex);
responseSender.send(otherQueue, message, ex);
}
}
ProcMessage(消息消息)
方法如下所示:
public void processMessage(Message message) {
try {
byte[] request = message.getBody(byte[].class);
[...]
if (!condition) {
throw new MyBusinessError("error happened");
}
[...]
} finally {
MDC.remove(ID);
}
}
@Bean(name = "jmsListenerContainerFactoryTest")
@Primary
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setSessionTransacted(true);
factory.setConnectionFactory(cachingConnectionFactory());
return factory;
}
public class MyBusinessException extends Exception {
private int code;
[...]
}
broker.xml
:
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 2,17 writes per millisecond
on the current journal configuration.
That translates as a sync write every 490000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>490000</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>1</journal-max-io>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>460000</page-sync-timeout>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq, admin"/>
</security-setting>
</security-settings>
<connection-ttl-override>60000</connection-ttl-override>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<!-- <config-delete-queues>FORCE</config-delete-queues>
<config-delete-addresses>FORCE</config-delete-addresses>-->
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
<auto-delete-queues>false</auto-delete-queues>
</address-setting>
</address-settings>
<addresses>
<address name="MyQueue">
<anycast>
<queue name="MyQueue">
</queue>
</anycast>
</address>
<address name="MyOtherQueue">
<anycast>
<queue name="MyOtherQueue" />
</anycast>
</address>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>
如果MyBusinessError(…)
的想法是捕获异常并将相同的消息发送到myOtherQueue
。如果发送该消息失败(即发生异常),则会再次发送该消息,以此类推最多10次,然后再发送到DLQ。本质上,这是我大部分时间看到的,但在我的日志中的随机时刻,我只看到一个尝试重新传递消息,而在DLQ
中没有消息,接收方抱怨没有消息。它感觉信息丢失了。我用放大镜查看了myOtherQueue,可以说是使用了Artemis控制台和JmsToolbox,但我只看到了一个空队列。我在这个队列中没有消费者。
这样做的目的不是将失败消息发送到DLQ,而是发送到另一个队列(myOtherQueue
)以供以后调查。如果消息无法传递到该队列,它将被放置在DLQ上。我就是这么想的。
一天结束时,很少有信息丢失,这就是我试图理解的。我应该如何调查Artemis并查看是否发生了任何消息丢失?从哪里开始?使用什么工具?
首先,我会在每条消息中放入一个属性,允许对其进行唯一标识,然后记录该值,以便以后可以关联客户端和代理日志。如果您使用的是JMS,那么您可以使用如下内容:
String uuid = java.util.UUID.randomUUID().toString();
message.setStringProperty("UUID", uuid);
logger.info("Sending message with UUID: " + uuid);
当然,您还需要在消费者上登录,例如:
Message message = consumer.receive();
String uuid = message.getStringProperty("UUID");
logger.info("Received message with UUID: " + uuid);
在代理上,您应该激活审计日志记录,或者使用LoggingActiveMQServerPlugin
。
一旦你有了所有的日志记录,你只需等待,直到你认为你已经丢失了一条消息,然后再查看日志,找到已发送但未收到的消息的ID。一旦你知道了这一点,你就可以查看代理日志,看看它是否被代理正确地接收、发送给消费者等等,这将帮助你缩小问题所在的范围。
本文向大家介绍Kafka中的消息是否会丢失和重复消费?相关面试题,主要包含被问及Kafka中的消息是否会丢失和重复消费?时的应答技巧和注意事项,需要的朋友参考一下 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。 1、消息发送 0---表示不进行消息接收是否成功的确认; 1---表示当Leader接收成功时确认; -1---表示Leader和Follower都接收成功
所以我和我的Kafka消费者之间有了一些恼人的矛盾。我使用“Kafka节点”为我的项目。我创造了一个话题。在一个使用者组中通过2台服务器创建了2个使用者。自动提交设置为false。对于我的消费者获得的每一个mesaage,他们会启动一个异步进程,该进程可能需要1~20秒,当进程完成时,消费者会提交偏移量。我的问题是:在一个senarios中,消费者1得到一个消息,需要20秒来处理。在过程中间,他得
本文向大家介绍kafka如何保证不丢失消息?相关面试题,主要包含被问及kafka如何保证不丢失消息?时的应答技巧和注意事项,需要的朋友参考一下 复制因子:创建topic的时候指定复制因子大于1时,一个分区被分配到一个broker上,同时会在其他broker上维护一个分区副本; isr列表:分区及其副本分别为leader和follower,leader对外提供读写服务,follower会向leade
主要内容:文章目录,1.RocketMQ,2.Kafka,3.如何保证RabbitMQ全链路数据100%不丢失mq: rabbitmq, rocketmq, kafka 文章目录 1.RocketMQ 2.Kafka 2.1 消息传递语义剖析 2.2 Producer 端丢失场景剖析 2.3 Broker 端丢失场景剖析 2.4 Consumer 端丢失场景剖析 3.如何保证RabbitMQ全链路数据100%不丢失 3.1 生产端可靠性投递 3.2 消费端消息不丢失 1.RocketMQ Roc
主要内容:一、业务场景,二、意外宕机,问题凸现,三、总结一、业务场景 这篇文章,我们来看看订单服务和消息服务是如何基于MQ来收发消息的。 我们稍微把这个图细化一点,简单来说就是多个订单服务实例给queue推送消息,多个仓储服务每个消费一部分消息。如下图所示: 二、意外宕机,问题凸现 假如你线上对MQ技术的使用就到此为止了,那么基本可以跟offer说拜拜了。。。 因为如果是我的话,作为一个面试官就没法继续往下问了。你这个MQ的使用以及理解的深度仅此而已的
JavaSelenium程序登录到Gmail帐户在搜索框检查文本这是工作正常。如何检查消息是成功还是失败如果我写一个单独的类IM得到会话ID空异常。 错误: