此问题是关于在中使用AMQP消费消息。网文档建议使用amqpnetlite:https://access.redhat.com/documentation/en-us/red_hat_amq/7.0/html-single/using_the_amq_.net_client/index
使用AMQPNetLite订阅地址时,地址和队列将自动创建。不过,自动创建的队列总是“单播”。我无法自动创建
代码:
private async Task RenewSession()
{
Connect = await Connection.Factory.CreateAsync(new Address("amqp://admin:admin@localhost:5672"), new Open() {ContainerId = "client-1"});
MqSession = new Session(Connect);
var receiver = new ReceiverLink(MqSession, DEFAULT_SUBSCRIPTION_NAME, GetSource("test-topic"), null);
receiver.Start(100, OnMessage);
}
private Source GetSource(string address)
{
var source = new Source
{
Address = address,
ExpiryPolicy = new Symbol("never"),
Durable = 2,
DefaultOutcome = new Modified
{
DeliveryFailed = true,
UndeliverableHere = false
}
};
return source;
}
也许我错过了一些旗帜?
在AMQP中,您可以通过设置功能在自动创建队列(选播路由)或主题(多播路由)之间进行选择。
该功能应该是new Symbol(“队列”)
或new Symbol(“主题”)
。
public class SimpleAmqpTest
{
[Fact]
public async Task TestHelloWorld()
{
Address address = new Address("amqp://guest:guest@localhost:5672");
Connection connection = await Connection.Factory.CreateAsync(address);
Session session = new Session(connection);
Message message = new Message("Hello AMQP");
Target target = new Target
{
Address = "q1",
Capabilities = new Symbol[] { new Symbol("queue") }
};
SenderLink sender = new SenderLink(session, "sender-link", target, null);
await sender.SendAsync(message);
Source source = new Source
{
Address = "q1",
Capabilities = new Symbol[] { new Symbol("queue") }
};
ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
message = await receiver.ReceiveAsync();
receiver.Accept(message);
await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
}
看看https://github.com/Azure/amqpnetlite/issues/286,代码的来源。
您可以通过在broker.xml中设置默认路由是多播还是任意广播来选择默认路由,所有内容都记录在https://activemq.apache.org/artemis/docs/2.6.0/address-model.html
代理的multicastPrefix
和anycastPrefix
功能没有为AMQP实现。https://issues.jboss.org/browse/ENTMQBR-795
我知道可以使用多个线程使用SQS队列。我想保证每封邮件都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时时间(例如连接速度慢),则其他线程可以使用相同的消息。 保证消息只处理一次的最佳方法是什么?
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在
我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:
我已经读过一些关于动画片和多个消费者的问题,但我仍然不明白它是如何工作的。 我的用例:我有一个只有一个分片的运动流。我想使用不同的lambda函数使用这个分片,每个函数都独立。就像每个lambda函数都有自己的分片迭代器一样。 有可能吗?是否设置多个lambda使用者(基于流)从同一流/碎片读取?
问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?