CAP是一个处理分布式事务的解决方案,遵循最终一致性并基于消息队列实现的分布式事务中间件。
PM> Install-Package DotNetCore.CAP
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.Kafka
在 ASP.NET Core 程序中,你可以在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服务:
public void ConfigureServices(IServiceCollection services)
{
services.AddCap(x =>
{
//配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表
// 此方法默认使用的数据库Schema为Cap;2,要求最低sql server2012(因为使用了Dashboard的sql查询语句使用了Format新函数)
//x.UseSqlServer("Integrated Security=False;server=服务器;database=cap;User ID=sa;Password=密码;Connect Timeout=30");
// 配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表
// 此方法可以指定是否使用sql server2008,数据库Schema,链接字符串
x.UseSqlServer((options) =>
{
//数据库连接字符串
options.ConnectionString = "数据库连接字符串";
//标记使用的是SqlServer2008引擎(此处设置的是2008,因为192.168.1.109数据库是2008)
options.UseSqlServer2008();
//Cap默认使用的数据库Schema为Cap;此处可以指定使用自己的数据库Schema
options.Schema = "CAPTest";
});
//使用Kafka作为底层之间的消息发送
x.UseKafka("192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092");
//x.UseKafka(options =>
//{
// options.Servers = "192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092";
//});
//使用Dashboard,这是一个Cap的可视化管理界面;默认地址:http://localhost:端口/cap
x.UseDashboard();
//默认分组名,此值不配置时,默认值为当前程序集的名称
x.DefaultGroup = "DefaultGroup";
//失败后的重试次数,默认50次;在FailedRetryInterval默认60秒的情况下,即默认重试50*60秒(50分钟)之后放弃失败重试
//x.FailedRetryCount = 10;
//失败后的重拾间隔,默认60秒
//x.FailedRetryInterval = 30;
//设置成功信息的删除时间默认24*3600秒
//x.SucceedMessageExpiredAfter = 60 * 60;
//失败之后的回调函数:达到失败重试的上线时才触发此事件
x.FailedThresholdCallback = FailCallBack;
});
}
public class PublishController : Controller{
private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
//不使用事务 [Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("xxx.services.show.time", DateTime.Now);
return Ok();
}
//Ado.Net 中使用事务,自动提交 [Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//业务代码
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}
return Ok();
}
//EntityFramework 中使用事务,自动提交 [Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//业务代码
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
return Ok();
}}
(6)消息补偿
有时候当发送一条消息出去之后,希望有一个回调可以获得消费方法的通知,用来补偿发送方做的业务操作,那么可以使用下面这个重载。
PublishAsync<T>(string name,T object, string callBackName)
这个重载中 callbackName 是一个回调的订阅方法名称,当消费端处理完成消息之后CAP会把消费者的处理结果返回并且调用指定的订阅方法。
在一些需要业务补偿的场景中,我们可以利用此特性进行一些还原的补偿操作。例如:电商系统中的付款操作,订单在进行支付调用支付服务的过程中如果发生异常,那么支付服务可以通过返回一个结果来告诉调用方此次业务失败,调用方将支付状态标记为失败。 调用方通过订阅 callbackName(订阅参数为消费方方法的返回值) 即可接收到支付服务消费者方法的返回结果,从而进行补偿的业务处理。
下面是使用方法:
// 发送方_capBus.Publish("xxx.services.show.time",DaateTime.Now,"callback-show-execute-time");
[CapSubscribe("callback-show-execute-time")] //对应发送的 callbackName
public void ShowPublishTimeAndReturnExecuteTime(DateTime time)
{
Console.WriteLine(time); // 这是订阅方返回的时间
}
//------------------------------------------------------------
//订阅方[CapSubscribe("xxx.services.show.time")]
public DateTime ShowPublishTimeAndReturnExecuteTime(DateTime time)
{
Console.WriteLine(time); // 这是发送的时间
return DateTime.Now; // 这是消费者返回的时间,CAP会取该方法的返回值用来传递到发送方的回调订阅里面
}
注意:框架无法做到100%确保消息只执行一次,所以在一些关键场景消息端在方法实现的过程中自己保证幂等性。
使用 CapSubscribeAttribute 来订阅 CAP 发布出去的消息。
1 2 3 4 5 | [CapSubscribe("xxx.services.bar")]public void BarMessageProcessor(){ } |
这里,你也可以使用多个 CapSubscribe[""] 来同时订阅多个不同的消息 :
1 2 3 4 5 6 | [CapSubscribe("xxx.services.bar")][CapSubscribe("xxx.services.foo")]public void BarAndFooMessageProcessor(){ } |
其中,xxx.services.bar 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。
例外情况¶
这里有几种情况可能需要知道:
① 消息发布的时候订阅方还未启动
Kafka:
当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。
RabbitMQ:
在 RabbitMQ 中,应用程序**首次启动**会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失。
针对RabbitMQ的消息丢失的问题,有两种解决方式:
i. 部署应用程序之前,在RabbitMQ中手动创建具有durable特性的Exchange和Queue,默认情况他们的名字分别是(cap.default.topic, cap.default.group)。
ii. 提前运行一遍所有实例,让Exchange和Queue初始化。
我们建议采用第 ii 种方案,因为很容易做到。
文档:
http://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/
源码:
https://github.com/dotnetcore/CAP/tree/master/samples