当前位置: 首页 > 知识库问答 >
问题:

ActiveMQ重复消息检测(Wildfly,Java EE)

叶坚
2023-03-14

首先,对不起我的英语!我有一个使用JMS和ActiveMQ的项目

@Stateless
@LocalBean
public class Producer {

public void produceMessage(List<String> entityIds) {
  try {
    InitialContext initialContext = new InitialContext();
    ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("java:/JmsXA");
    Destination destination = (Destination) initialContext.lookup("jms/queue/cachedAttrs");

    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    entityIds.forEach(entityId -> {                 
                TextMessage message = session.createTextMessage(entityId);
                message.setStringProperty("_AMQ_DUPL_ID", entityId);
                producer.send(message);                     
        }
    );

    connection.close();
    session.close();
    initialContext.close();
  } catch (Exception e) {
    throw new IllegalStateException("..." + e.getMessage());
  }
 }
}

@MessageDriven(
  name = "Consumer",
  activationConfig = {
    @ActivationConfigProperty(propertyName = "destination", propertyValue = 
"jms/queue/cachedAttrs"),
    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = 
  "javax.jms.Queue"),
    @ActivationConfigProperty(propertyName = "useDLQ", propertyValue = "false")
  }
)
public class Consumer implements MessageListener {  

@PersistenceContext
private EntityManager entityManager;

@Override
public void onMessage(Message message) {
  TextMessage id = (TextMessage) message;
}
}

producer.produceMessage()可以同时在项目的许多地方调用。我需要检查重复的ID和不调用消费者的ID,如果这个ID包含在队列中。

我阅读https://activemq.apache.org/artemis/docs/1.0.0/duplicate-detection.html并执行以下操作

message.setStringProperty("_AMQ_DUPL_ID", entityId);

呼叫制片人:

producer.produceMessage(Arrays.asList("1", "2", "1", "3", "2"));

和获取异常:

原因:ActiveMqDuplicateIdException[errorType=Duplicate_ID_Rejected message=Duplicate message=True,UserID=0C03AADC-C07A-11E8-9FB7-775C9C2BDFEB,Priority=4,Bodysize=225,TUE Sep 25 11:18:25 GMT+07:002018,Expiration=0,Durable=True,Address=JMS.Queue.DeviceCachedTRS,Properties=TypedProperties[__AMQ_CID]=F627A880-C079-11E8-9FB7-775C9C2BDFEB,_AMQ_DUPL_ID=1]]@1895765804]在org.apache.activemq.artemis.core.protocol.core.impl.channelimpl.sendblock(channelimpl.java:406)在org.apache.activemq.artemis.core.protocol.core.impl.channelimpl.sendblock(channelimpl.java:304)在)在org.apache.activemq.artemis.core.client.impl.clientsessionimpl.prepare(clientsessionimpl.java:1241)

我做错了什么?我可以在服务中注入我的生产者吗?或者我需要创建新的对象吗?THNX!

共有1个答案

慕容宏邈
2023-03-14

据我所知,你没有做错什么。根据您所说的,您需要代理来拒绝重复的消息。您已经在消息中设置了重复的ID,代理检测到了一个重复的消息,并拒绝了该消息。注意,异常说,“检测到重复的消息-消息将不会被路由。”

 类似资料:
  • ActiveMQ是否支持幂等生产者?我知道Camel有一个幂等消费者模式来检测和处理重复消息,但我想知道是否可以从源头(生产者)防止这种情况。 这里有一点背景。我有水平扩展的应用程序访问同一个数据库。有一个特定的表维护特定进程的状态。这些水平应用程序应该能够读取状态并调用另一个进程,但是只有一个应用程序能够调用它。一旦满足所需条件,该应用程序会定期轮询数据库并将消息发布到消息代理。但我希望其中一个

  • 问题内容: 有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息? 我尝试手动定义JMSMessageID((message.setJMSMessageID(“ uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。 根据规范,我没有找到有关如何删除邮件重复数据的参考。 在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性or

  • 我试图将一个Wildfly13实例连接到一个独立的Artemis ActiveMQ,从它生成和使用消息。因此,我遵循了Wildfly13文档和Artemis ActiveMQ文档的相关部分。此外,我尝试使用本教程,并使用JMS2.0Simplified-API对其进行了一些定制。 当我向/Produce发送有效载荷时,不会引发异常。所以我相信JNDI有一个问题。我已经用Wireshark监视了正在

  • 我需要在我的应用程序中进行单元测试JMSXGroupID。有人能告诉我是否有示例文章或参考文献来实现这一点吗?我试图在ActiveMQ门户中查看参考文献,但看起来下面的Hudson链接坏了。我正在使用Spring JMS进行编程。 http://activemq.apache.org/junit-reports.html

  • 我们在JBoss EAP7.1上使用ActiveMQ Artemis。 我们注意到,一旦具有特定值的消息通过队列传递,如果消息生成器试图再次将具有相同值的消息发送到同一队列,则代理将丢弃该消息。然而,我们的需要是仅当重复消息仍在队列中时才丢弃它们。 有没有办法达到这个目的? 为了解决这个问题,我们认为像Artemis这样的重复消息检测会有所帮助。然而,当外部应用程序挂起我们的消费者时,我们需要定时

  • 我对ActiveMQ有一个类似的问题:http://activemq.2283324.n4.nabble.com/Messages-stuck-in-pending-td4617979.html已经尝试了这里发布的解决方案。 有些消息似乎卡在队列上,可以在那里坐几天而不被消费。我有足够多的消费者大部分时间都是免费的,所以这不是消费者“饱和”的问题。 重新启动ActiveMQ后,一些待处理的消息会立