背景
事件发布/订阅是一种非常强大的模式,它可以帮助业务组件间实现完全解耦,不同的业务组件只依赖事件,只关注哪些事件是需要自己处理的,而不用关注谁来处理自己发布事件,事件追溯(Event Sourcing)也是基于事件发布/订阅的。在微服务架构中,事件发布/订阅有非常多的应用场景。今天我给大家分享一个基于ASP.NET Core的单体程序使用事件发布/订阅的例子,针对分布式项目的事件发布/订阅比较复杂,难点是事务处理,后续我会另写一篇博文来演示。
案例说明
当前我们有一个基于ASP.NET Core的电子商务系统,在项目的初期,业务非常简单,只有一个购物车模块和一个订单模块,所有的代码都放在一个项目中。
整个项目使用了一个简单的三层架构。
这里当用户提交购物车的时候,程序会在ShoppingCartManager类的SubmitShoppingCart方法中执行3个操作
代码如下:
public void SubmitShoppingCart(string shoppingCartId) { var shoppingCart = _unitOfWork.ShoppingCartRepository .GetShoppingCart(shoppingCartId); _unitOfWork.ShoppingCartRepository .SubmitShoppingCart(shoppingCartId); _unitOfWork.OrderRepository .CreatOrder(new CreateOrderDTO { Items = shoppingCart.Items .Select(p => new NewOrderItemDTO { ItemId = p.ItemId, Name = p.Name, Price = p.Price }).ToList() }); //这里为了简化代码,我用命令行表示发送邮件的逻辑 Console.WriteLine("Confirm Email Sent."); _unitOfWork.Save(); }
根据SOLID设计原则中的单一责任原则,如果一个类承担的职责过多,就等于把这些职责耦合在一起了。这里生成订单和发送邮件都不应该是当前SubmitShoppingCart需要负责的,所以我们需要它们从这个方法中移出去,使用的方法就是事件订阅/发布。
新的架构图
以下是使用事件发布/订阅之后的系统架构图。
好的,下面让我们来一步一步实现以上描述的代码。
添加事件基类
这里我们首先定义一个事件基类,其中暂时只添加了一个属性OccuredOn,它表示了当前事件的触发时间。
public class EventBase { public EventBase() { OccuredOn = DateTime.Now; } protected DateTime OccuredOn { get; set; } }
定义购物车提交事件
接下来我们就需要创建购物车提交事件类ShoppingCartSubmittedEvent, 它继承自EventBase, 并提供了一个购物项集合
public class ShoppingCartSubmittedEvent : EventBase { public ShoppingCartSubmittedEvent() { Items = new List<ShoppingCartSubmittedItem>(); } public List<ShoppingCartSubmittedItem> Items { get; set; } } public class ShoppingCartSubmittedItem { public string ItemId { get; set; } public string Name { get; set; } public decimal Price { get; set; } }
定义事件处理器接口
为了添加事件处理器,我们首先需要定义一个泛型接口类IEventHandler
public interface IEventHandler<T> where T : EventBase { void Run(T obj); Task RunAsync(T obj); }
这个泛型接口类的是泛型类型必须继承自EventBase类。接口提供了2个方法Run和RunAsync。 它们定义了该接口的实现类必须实现同一个处理逻辑的同步和异步方法。
为购物车提交事件编写事件处理器
有了事件处理器接口,接下来我们就可以开始为购物车提交事件添加事件处理器了。这里我们为了实现前面定义的逻辑,我们需要创建2个处理器CreateOrderHandler和ConfirmEmailSentHandler
CreateOrderHandler.cs
public class CreateOrderHandler : IEventHandler<ShoppingCartSubmittedEvent> { private IOrderManager _orderManager = null; public CreateOrderHandler(IOrderManager orderManager) { _orderManager = orderManager; } public void Run(ShoppingCartSubmittedEvent obj) { _orderManager.CreateNewOrder(new Models.DTOs.CreateOrderDTO { Items = obj.Items.Select(p => new Models.DTOs.NewOrderItemDTO { ItemId = p.ItemId, Name = p.Name, Price = p.Price }).ToList() }); } public Task RunAsync(ShoppingCartSubmittedEvent obj) { return Task.Run(() => { Run(obj); }); } }
代码解释:
ConfirmEmailSentHandler.cs
public class ConfirmEmailSentHandler : IEventHandler<ShoppingCartSubmittedEvent> { public void Run(ShoppingCartSubmittedEvent obj) { Console.WriteLine("Confirm Email Sent."); } public Task RunAsync(ShoppingCartSubmittedEvent obj) { return Task.Run(() => { Console.WriteLine("Confirm Email Sent."); }); } }
代码解释:
为OrderManager类添加创建订单方法
IOrderManager.cs
public interface IOrderManager { string CreateNewOrder(CreateOrderDTO dto); }
OrderManager.cs
public class OrderManager : IOrderManager { private IOrderRepository _orderRepository; public OrderManager(IOrderRepository orderRepository) { _orderRepository = orderRepository; } public string CreateNewOrder(CreateOrderDTO dto) { var orderId = _orderRepository.CreatOrder(dto); Console.WriteLine($"One order created: {JsonConvert.SerializeObject(dto)}"); return orderId; } }
创建EventHandlerContainer
下面我们来编写最核心的事件处理器容器。在这里我们的事件处理器容器完成了3个功能
public class EventHandlerContainer { private IServiceProvider _serviceProvider = null; private static Dictionary<string, List<Type>> _mappings = new Dictionary<string, List<Type>>(); public EventHandlerContainer(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public static void Subscribe<T, THandler>() where T : EventBase where THandler : IEventHandler<T> { var name = typeof(T).Name; if (!_mappings.ContainsKey(name)) { _mappings.Add(name, new List<Type> { }); } _mappings[name].Add(typeof(THandler)); } public static void Unsubscribe<T, THandler>() where T : EventBase where THandler : IEventHandler<T> { var name = typeof(T).Name; _mappings[name].Remove(typeof(THandler)); if (_mappings[name].Count == 0) { _mappings.Remove(name); } } public void Publish<T>(T o) where T : EventBase { var name = typeof(T).Name; if (_mappings.ContainsKey(name)) { foreach (var handler in _mappings[name]) { var service = (IEventHandler<T>)_serviceProvider.GetService(handler); service.Run(o); } } } public async Task PublishAsync<T>(T o) where T : EventBase { var name = typeof(T).Name; if (_mappings.ContainsKey(name)) { foreach (var handler in _mappings[name]) { var service = (IEventHandler<T>)_serviceProvider.GetService(handler); await service.RunAsync(o); } } } }
代码解释:
在程序启动时,注册事件订阅
现在我们来Startup.cs的ConfigureServices方法,这里我们需要进行服务注册,并完成事件订阅。
public void ConfigureServices(IServiceCollection services) { services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); services.AddScoped<IOrderManager, OrderManager>(); services.AddScoped<IShoppingCartManager, ShoppingCartManager>(); services.AddScoped<IShoppingCartRepository, ShoppingCartRepository>(); services.AddScoped<IOrderRepository, OrderRepository>(); services.AddScoped<IUnitOfWork, UnitOfWork>(); services.AddScoped<CreateOrderHandler>(); services.AddScoped<ConfirmEmailSentHandler>(); services.AddScoped<EventHandlerContainer>(); EventHandlerContainer.Subscribe<ShoppingCartSubmittedEvent, CreateOrderHandler>(); EventHandlerContainer.Subscribe<ShoppingCartSubmittedEvent, ConfirmEmailSentHandler>(); }
注意:这里保证一个Api请求中的所有数据库操作在一个事务里,这里我们使用Scoped作用域。这样我们就可以在调用工作单元IUnitOfWork接口的Save代码中启用事务。
修改ShoppingCartManager
最后我们来修改ShoppingCartManager, 改用发布事件的方式来完成后续创建订单和发送邮件的功能。
public void SubmitShoppingCart(string shoppingCartId) { var shoppingCart = _unitOfWork.ShoppingCartRepository .GetShoppingCart(shoppingCartId); _unitOfWork.ShoppingCartRepository .SubmitShoppingCart(shoppingCartId); _container.Publish(new ShoppingCartSubmittedEvent() { Items = shoppingCart .Items .Select(p => new ShoppingCartSubmittedItem { ItemId = p.ItemId, Name = p.Name, Price = p.Price }) .ToList() }); _unitOfWork.Save(); }
这样ShoppingCartManager就只需要关注购物车状态的变更,而不需要关注发送确认邮件和创建订单了。
最终效果
现在让我们启动项目,
首先我们使用[POST] /api/shoppingCarts来添加一个新的购物车, 这个API会返回当前购物车的Id
然后我们使用[PUT] /api/shoppingCarts/ShoppingCart_636872897140555966来模拟提交购物车,程序返回操作成功
最后我们查看一下控制台的输出日志
2个事件处理器都被正确触发了。
总结
至此我们的代码重构完成。 最终的代码中,SubmitShoppingCart方法,仅负责修改购物车状态并发布一个购物车提交的事件。生成订单和发送邮件的功能代码都被移动到了独立的处理类中。
这样的方式的好处不仅仅是完成了代码的解耦,针对后续的扩展也非常有利,想想一下,如果在未来当前项目需求追加这样一个功能,当提交购物车的时候,除了要发送确认邮件,还要发送手机短信。这时候你根本不需要去修改ShoppingCartManager类,你只需要针对ShoppingCartSubmittedEvent在再添加一个新的事件处理器即可,这也满足的SOLID的开闭原则。
项目源代码:https://github.com/lamondlu/EventHandlerInSingleApplication
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对小牛知识库的支持。
本文向大家介绍Jquery 自定义事件实现发布/订阅的简单实例,包括了Jquery 自定义事件实现发布/订阅的简单实例的使用技巧和注意事项,需要的朋友参考一下 Jquery 自定义事件实现发布/订阅的简单实例 以上这篇Jquery 自定义事件实现发布/订阅的简单实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持呐喊教程。
问题内容: 我一直在研究不同的nodeJS发布/订阅实现,并想知道哪种方法最适合特定的应用程序。该应用程序的要求涉及多通道,多用户3D环境中对象的实时同步。 我从使用socket.io开始,创建了一个基本的通道数组,当用户发送消息时,它遍历该通道中的用户并将消息发送到用户的客户端。这很好用,我对此没有任何问题。 为了保持对象的持久性,我使用node_redis添加了Redis支持。然后,我将通道数
我在WPF中编写代码,其中有两个视图模型。在一个视图模型中,我正在关闭一个prism弹出窗口,在关闭时,我将调用我的发布方法,如: 而我的subscribe事件如下所示: 以下是这两个类接收事件聚合器的方式: 这是我的第二堂课: 每次取消交互时,发布都会被发布,但不会被调用,因为订阅。 在具有subscribe方法的其他类之前被调用。会有问题吗?我只想在交互完成时初始化我的集合视图,而不破坏MVV
当使用 发布/订阅 API 时,需要决定使用同一连接的消息应该是顺序处理 还是并行处理 。 顺序处理意味着你(很大程度上)不需要担心线程安全问题,并且这意味着你保持了事件的顺序。它们会完全按照(通过队列)接受的顺序来处理,但是结果这也意味着消息会延迟彼此。 另一种选择是 concurrent(并行) 处理。这使得工作的处理顺序 没有特定的保证 并且你的代码完全负责确保并发的消息不应该破坏内部的状态
本文向大家介绍Redis发布订阅和实现.NET客户端详解,包括了Redis发布订阅和实现.NET客户端详解的使用技巧和注意事项,需要的朋友参考一下 前言 发布订阅在设计模式中也可以说是观察者模式,针对这个模式是处理对象间一对多的依赖关系的,当一个对象发生变化,其它依赖他的对象都要得到通知并更新。 然而它也有自己的缺点,就是当主题发生一系列的变化时,观察者都要做批量的更新,如果这样的更新成本很高,那
本文向大家介绍Vue发布订阅模式实现过程图解,包括了Vue发布订阅模式实现过程图解的使用技巧和注意事项,需要的朋友参考一下 vue项目中不同组件间通信一般使用vuex,通常情况下vuex和EventBus不应该混用,不过某些场景下不同组件间只有消息的交互,这时使用EventBus消息通知的方式就更合适一些。 图解 html Dvue.js 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大