当前位置: 首页 > 知识库问答 >
问题:

AMQPNETLITE-ActiveMQ Artemis(Red Hat AMQ)-自动创建多消费者多播队列

龚联
2023-03-14

此问题是关于在中使用AMQP消费消息。网文档建议使用amqpnetlite:https://access.redhat.com/documentation/en-us/red_hat_amq/7.0/html-single/using_the_amq_.net_client/index

使用AMQPNetLite订阅地址时,地址和队列将自动创建。不过,自动创建的队列总是“单播”。我无法自动创建

  1. 多播队列
  2. 这允许任何数量的消费者

代码

private async Task RenewSession()
{
    Connect = await Connection.Factory.CreateAsync(new Address("amqp://admin:admin@localhost:5672"), new Open() {ContainerId = "client-1"});
    MqSession = new Session(Connect);
    var receiver = new ReceiverLink(MqSession, DEFAULT_SUBSCRIPTION_NAME, GetSource("test-topic"), null);
    receiver.Start(100, OnMessage);
}

private Source GetSource(string address)
{
    var source = new Source
    {
        Address = address,
        ExpiryPolicy = new Symbol("never"),
        Durable = 2,
        DefaultOutcome = new Modified
        {
            DeliveryFailed = true,
            UndeliverableHere = false
        }
    };
    return source;
}

也许我错过了一些旗帜?

共有1个答案

红砚文
2023-03-14

在AMQP中,您可以通过设置功能在自动创建队列(选播路由)或主题(多播路由)之间进行选择。

该功能应该是new Symbol(“队列”)new Symbol(“主题”)

public class SimpleAmqpTest
{
    [Fact]
    public async Task TestHelloWorld()
    {
        Address address = new Address("amqp://guest:guest@localhost:5672");
        Connection connection = await Connection.Factory.CreateAsync(address);
        Session session = new Session(connection);

        Message message = new Message("Hello AMQP");

        Target target = new Target
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        SenderLink sender = new SenderLink(session, "sender-link", target, null);
        await sender.SendAsync(message);

        Source source = new Source
        {
            Address = "q1",
            Capabilities = new Symbol[] { new Symbol("queue") }
        };

        ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
        message = await receiver.ReceiveAsync();
        receiver.Accept(message);

        await sender.CloseAsync();
        await receiver.CloseAsync();
        await session.CloseAsync();
        await connection.CloseAsync();
    }
}

看看https://github.com/Azure/amqpnetlite/issues/286,代码的来源。

您可以通过在broker.xml中设置默认路由是多播还是任意广播来选择默认路由,所有内容都记录在https://activemq.apache.org/artemis/docs/2.6.0/address-model.html

代理的multicastPrefixanycastPrefix功能没有为AMQP实现。https://issues.jboss.org/browse/ENTMQBR-795

 类似资料:
  • 我知道可以使用多个线程使用SQS队列。我想保证每封邮件都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时时间(例如连接速度慢),则其他线程可以使用相同的消息。 保证消息只处理一次的最佳方法是什么?

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:

  • 我已经读过一些关于动画片和多个消费者的问题,但我仍然不明白它是如何工作的。 我的用例:我有一个只有一个分片的运动流。我想使用不同的lambda函数使用这个分片,每个函数都独立。就像每个lambda函数都有自己的分片迭代器一样。 有可能吗?是否设置多个lambda使用者(基于流)从同一流/碎片读取?

  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?