当前位置: 首页 > 知识库问答 >
问题:

如何使用相同的@In的方法并行处理多个AMQP消息

范翰海
2023-03-14

是否可以使用使用夸克和小黑-反应-消息注释的相同方法并行处理多个amqp-消息?

更准确地说,我有以下类:

@ApplicationScoped
public class Receiver {
    @Incoming("test-queue")
    public void process(String input) {
        System.out.println("start processing:" + input);
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end processing:" + input);
    }
}

使用应用程序属性中的配置:

amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue

现在,我想通过配置来定义可以并行处理多少条消息。例如,在4核cpu上,它应该并行运行4个内核。

目前,我只能添加4个具有不同名称的方法副本,以允许这种并行性,但这是不可配置的。

共有2个答案

南宫嘉
2023-03-14

我刚刚遇到了相同的场景,下面是规范打算如何处理并发:来自eclipse Microprofile规范

基本上,不是用这样的方法创建一个类:

@Incoming("test-queue")
public void process(String input) {}

你有2个这样的类:

@ApplicationScoped
public class MessageSubscriberProducer {

    @Incoming("test-queue")
    public Subscriber<String> createSubscriber() {
        return new SubscriberImpl();
    }
}

public class SubsciberImpl implements Subscriber<String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(4);  // this tells how many messages to grab right away
    }

    @Override
    public void onNext(String val) {
        // do processing
        this.subscription.request(1);  // grab 1 more
    }
}

这样做还有一个好处,就是可以将处理代码从vert.x事件循环线程转移到工作线程池中。

于鸿博
2023-03-14

我不确定,但我不认为反应式消息传递支持您的要求。

但是,您可以以另一种方式做您想做的事。我认为这也是使用消息传递的更好整体模式。

http://small rye . io/small rye-reactive-messaging/small rye-reactive-messaging/2.5/amqp/amqp . html # amqp-inbound

查找带有CompletionStage和显式ack()的示例。这种变体是异步的,所以如果您将它与Java现有的并发设施结合起来,您将获得高效的并行处理。

我会将传入的工作发送给执行器,然后在完成时执行任务ack()。

 类似资料:
  • 我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • 想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这

  • RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用。我有一个需求,其中希望从一个队列接收到多个订阅的消费者的相同消息。 下面是我的示例消费者代码。这里有两个侦听器在侦听同一个队列,但是只有一个使用者接收到消息。如何配置它,以便将相同的消息传递给两个消费者?(Consumer1和Consumer2)。任何帮助都将得到高度赞赏。

  • 我是个新手,在尝试触发onClick事件时遇到了问题。我让事件工作,当它被点击时,div出现并重新出现。问题是,如果我按下某个特定项目的按钮,所有div都会出现,而不是我刚才单击按钮的div。如何使我单击的按钮仅在该特定元素上启动。 这是我的代码: 类应用程序扩展了React.Component{

  • 问题内容: 我有多个正在运行的线程,这些线程访问单例对象并调用其方法并在其中传递对象。在该方法中,我仅对接收到的对象进行一些计算。我听说在这种情况下不会有任何问题,因为它是无状态的并且对所有人都是免费的。 我的问题是,它对所有人免费吗?我想知道多个线程如何在自己的线程中调用共享方法而不覆盖其他线程的传递对象吗?请在内存分配方面和堆栈级别进行解释。 问题答案: 我认为您必须区分已经存储在内存和代码执