我有两个独立的应用程序运行在同一服务器上。
我已经实现了从用户管理到钱包管理的事件来源。它运行得很好。
但是,当我将从钱包管理应用程序中的事件处理程序发布新事件时,我将收到以下错误消息日志。
以下是我的日志详细信息
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [772c7f69-534e-4ee2-b198-5b4d2edd3497] at sequence [0] could not be persisted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:112)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:223)
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:85)
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:210)
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$4(AbstractEventBus.java:145)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:142)
at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:32)
at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:135)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.lambda$onMessage$1(SpringAMQPMessageSource.java:90)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.onMessage(SpringAMQPMessageSource.java:90)
at com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(AxonConfiguration.java:67)
at sun.reflect.GeneratedMethodAccessor273.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)
... 10 common frames omitted
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:289)
at com.sun.proxy.$Proxy224.persist(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:218)
... 37 common frames omitted
package com.peaas.ngapblueprintdemo.wallet.eventHandlers;
import java.util.UUID;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.peaas.ngapblueprintdemo.events.CreateUserProfileEvent;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;
import com.peaas.ngapblueprintdemo.wallet.domain.Wallet;
import com.peaas.ngapblueprintdemo.wallet.repository.WalletRepository;
@ProcessingGroup("amqpEvents")
@Component
public class UserEventHandler {
@Autowired
private WalletRepository walletRepository;
@Autowired
private transient CommandGateway commandGatway;
@EventHandler
public void onCreateUserProfile(CreateUserProfileEvent event) {
System.out.println("--- Wallet Event Handler ---");
Wallet wallet = new Wallet();
wallet.setAmount(0d);
wallet.setUserid(event.getUserId());
wallet = walletRepository.save(wallet);
String walletID = UUID.randomUUID().toString();
WalletCreatedCommand command = new WalletCreatedCommand(wallet.getId(),walletID,wallet.getAmount(),wallet.getUserid());
command.setId(wallet.getId());
commandGatway.send(command);
}
@EventHandler
public void onCreateWalletEvent(WalletCreatedEvent event) {
System.out.println("--- Wallet Created Successfully ---");
System.out.println(event);
}
}
package com.peaas.ngapblueprintdemo.wallet.aggregate;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.spring.stereotype.Aggregate;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;
@Aggregate
public class WalletAggregate {
private Long id;
@AggregateIdentifier
@GeneratedValue(strategy = GenerationType.IDENTITY)
private String walletId;
private Double amount;
private Long userId;
@CommandHandler
public WalletAggregate(WalletCreatedCommand command){
System.out.println("--- Command Handler Start : WalletCreatedCommand ---");
AggregateLifecycle.apply(new WalletCreatedEvent(command.getUserId(),command.getWalletId(),command.getAmount(),command.getUserId()));
System.out.println("--- Command Handler End : WalletCreatedCommand ---");
}
@EventSourcingHandler
public void handle(WalletCreatedEvent event) {
System.out.println("--- Event Sourcing Handler Start : WalletCreatedEvent ---");
this.id = event.getId();
this.walletId = event.getWalletId();
this.amount = event.getAmount();
this.userId = event.getUserId();
System.out.println("--- Event Sourcing Handler End : WalletCreatedEvent ---");
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "WalletAggregate [id=" + id + ", amount=" + amount + ", userId=" + userId + "]";
}
}
package com.peaas.ngapblueprintdemo.wallet.commands;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
public class WalletCreatedCommand {
private Long id;
@TargetAggregateIdentifier
private String walletId;
private Double amount;
private Long userId;
public WalletCreatedCommand() {
}
public WalletCreatedCommand(Long id,String walletId, Double amount, Long userId) {
super();
this.id = id;
this.walletId = walletId;
this.amount = amount;
this.userId = userId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getWalletId() {
return walletId;
}
public void setWalletId(String walletId) {
this.walletId = walletId;
}
@Override
public String toString() {
return "WalletCreatedCommand [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
+ "]";
}
}
package com.peaas.ngapblueprintdemo.events;
public class WalletCreatedEvent {
private Long id;
private String walletId;
private Double amount;
private Long userId;
public WalletCreatedEvent() {
}
public WalletCreatedEvent(Long id,String walletId, Double amount, Long userId) {
super();
this.id = id;
this.walletId = walletId;
this.amount = amount;
this.userId = userId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getWalletId() {
return walletId;
}
public void setWalletId(String walletId) {
this.walletId = walletId;
}
@Override
public String toString() {
return "WalletCreatedEvent [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
+ "]";
}
}
我的预感是,CommandBus
中缺少TransactionManagingInterceptor
,这将导致命令处理和事件发布无法在事务中执行。
您可以通过执行以下操作将消息处理程序拦截器如TransactionManagingInterceptor
注册到CommandBus
:
public CommandBus commandBus(TransactionManager transactionManager) {
// While you're at it, you could also instantiate an AsynchronousCommandBus if you'd want.
CommandBus commandBus = new SimpleCommandBus();
commandBus.registerHandlerInterceptor(new TransactionManagingInterceptor<>(transactionManager));
return commandBus;
}
一切都很好,但是假设我使用相同的聚合标识符再次发布相同的命令。我确实看到ServiceCreatedEvent被应用于调试器中的聚合,但是由于已经存在带有该键的域事件记录,所以不会向后端持久化任何内容。同样,一切都很好,但我看到ServiceCreatedEvent被发布给Kafka并被我们的监听器使用,这是意想不到的行为。 我想知道这是否是Axon的预期行为,或者我们的Kafka集成应该确保我们
null 我想要达到的目标 我希望由一个实例发布的事件只由同一个实例处理 如果instance1发布eventX,那么只有instance1应该处理eventX null
事件就是用户或浏览器自身执行的某种动作。诸如click、load 和mouseover,都是事件的名字。而响应某个事件的函数就叫做事件处理程序(或事件侦听器)。事件处理程序的名字以"on"开头,因此click 事件的事件处理程序就是onclick,load 事件的事件处理程序就是onload。为事件指定处理程序的方式有好几种。 13.2.1 HTML事件处理程序某个元素支持的每种事件,都可以使用一
事件 事件,就是文档与浏览器发生特定的 交互瞬间。JavaScript与HTML之间的交互就是通过事件实现的。 事件流 所谓事件流,就是接收页面事件的顺序。 事件流分 事件冒泡流 与 事件捕获流。事件冒泡流是IE开发团队提出的,捕获流是Netscape开发团队提出的。 事件冒泡流 所谓事件冒泡,就是事件开始时由 最具体的元素(文档中嵌套最深的那个 节点)接收,然后逐级向上传播到较为不具体的节点(文
我在主页组件中有这个代码,当按下向下/向上箭头键时,什么也没有发生。我刚刚意识到,当用ctrl键按箭头键时,eventhandler可以正常工作。我到处找,但什么也没找到。有人能帮忙吗?我的浏览器是Chrome v.96。React版本17.0.2,节点v.16.13.1