当前位置: 首页 > 知识库问答 >
问题:

用轴突4从AMQP接收事件

齐昊
2023-03-14

这里是我的application.yml的相关部分

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit

从文档中,我发现应该创建一个SpringAMQPMessageSource bean:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class AxonConfig {

    @Bean
    SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "in.queue")
            @Override
            public void onMessage(final Message message, final Channel channel) {
                log.debug("received external message: {}, channel: {}", message, channel);
                super.onMessage(message, channel);
            }
        };
    }

}

如果我从rabbitmq管理面板向队列发送消息,我会看到日志:

import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(final PlaceOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
    }

    @CommandHandler
    public void handle(final ConfirmOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderConfirmedEvent(orderId));
    }

    @CommandHandler
    public void handle(final ShipOrderCommand command) {
        log.debug("command: {}", command);
        if (!orderConfirmed) {
            throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
        }
        apply(new OrderShippedEvent(orderId));
    }

    @EventSourcingHandler
    public void on(final OrderPlacedEvent event) {
        log.debug("event: {}", event);
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    @EventSourcingHandler
    public void on(final OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    @EventSourcingHandler
    public void on(final OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    protected OrderAggregate() {
    }

}

SpringAMQPMessageSource的JavaDoc是这样说的:

/**
 * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
 * <p>
 * The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
 * to all subscribed processors.
 * <p>
 * Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
 * be consumed from the AMQP Queue without any processor processing them.
 *
 * @author Allard Buijze
 * @since 3.0
 */

但到现在我都找不到在哪里注册,也找不到怎么注册。

配置中的Axon.EventHandling条目和聚合中的@ProcessingGroup(“amqpEvents”)已经来自测试。但是有没有这些条目没有任何区别。也在没有mode=subscribing的情况下尝试

我尝试这样写我自己的类:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {

    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
    private final AMQPMessageConverter messageConverter;

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
        eventProcessors.add(messageProcessor);
        log.debug("subscribe to: {}", messageProcessor);
        return () -> eventProcessors.remove(messageProcessor);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received external message: {}, channel: {}", message, channel);
        log.debug("eventProcessors: {}", eventProcessors);
        if (!eventProcessors.isEmpty()) {
            messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
                            .ifPresent(event -> eventProcessors.forEach(
                                ep -> ep.accept(Collections.singletonList(event))
                            ));
        }
    }

}

结果是相同的,日志现在证明eventProcessors只是空的。

eventProcessors: []

更新2:

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {

        try {
            final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
            eventProcessorsField.setAccessible(true);
            final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
            log.debug("eventProcessors: {}", eventProcessors);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
    @Autowired
    void configure(EventProcessingModule epm,
                   RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
        epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
        epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
    }
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {

    private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();

    @EventHandler
    public void on(OrderPlacedEvent event) {
        log.debug("event: {}", event);
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
    }

    @EventHandler
    public void on(OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderConfirmed();
            return orderedProduct;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderShipped();
            return orderedProduct;
        });
    }

    @QueryHandler
    public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
        log.debug("query: {}", query);
        return new ArrayList<>(orderedProducts.values());
    }

}

当然,我从聚合中删除了@ProcessingGroup。

我的日志:

RabbitMQSpringAMQPMessageSource : received message: ... 
OrderedProductsEventHandler : event: OrderShippedEvent...

共有1个答案

濮阳茂材
2023-03-14

在轴突中,聚合体不接收来自“外部”的事件。聚合中的事件处理程序(更具体地说,它们是EventSourcIngHandler)只处理由同一聚合实例发布的事件,以便它可以重建其先前的状态。

只有外部事件处理程序(例如更新预测的处理程序)才会从外部源接收事件。

要实现这一点,application.yml应该将bean名称作为处理器的源,而不是队列名称。在第一个示例中:

    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
    eventhandling:
        processors:
            amqpEvents:
                source: inputMessageSource
                mode: subscribing
 类似资料:
  • 目标:使用python从EventHub接收消息。 设置: Python 3.5 AMQP 1.0 在新门户中创建的EventHub。 Ubuntu 16.04和Mac OS X。 质子==0.8.8和python-qpid-质子==0.17.0 几天来,我一直在尝试连接AMQP和Azure EventHub。我可以用Java客户端很好地做到这一点(作为确保EventHub工作的测试),但我需要能

  • 我在用轴突事件跟踪处理器。有时事件需要10秒才能处理。 这似乎导致消息再次被处理,并出现在日志“释放令牌X/0的声明失败。它由另一个节点拥有”中。 如果我增加段数,它不会记录此事件,但事件仍被处理两次,所以我认为这可能会引起误解。 (我想我搞错了) 我已经尝试调整fetchDelay、cleanupDelay和TokenClaimInterval。没有一个解决了这个问题。我是不是缺了什么东西? 编

  • 当我切换到XStream作为事件序列化器时,一切都很好。不执行额外的偶数实例,在命令处理程序中创建的事件与在EventHandler中处理的事件相同。 这是怎么回事?

  • 启动日志的尾端如下所示: 因此,PolicyService似乎成功连接到message Broker。 Turbine AMQP服务器的日志结束: 编辑:下面的异常是当我停止收听涡轮流时抛出的,而不是当我尝试用仪表板收听时抛出的。 我对Turbane-AMQP的依赖关系如下:

  • 我正在进行迁移,这个特殊的库在中显示错误,而在中没有错误

  • 在使用SocketIO的时候,消息将被作为活动(event)的两端接收。在客户端使用JavaScript回叫信号。使用Flask-SocketIO服务器,需要为这些活动注册处理器(handler),类似于视图函数怎样处理路由。 下面的例子是为一个未命名的活动创建了一个服务端的活动处理器(event handler): @socketio.on('message') def handle_mess