当前位置: 首页 > 工具软件 > Cap Framework > 使用案例 >

CAP

井高峯
2023-12-01
  1. CAP是什么

CAP是一个处理分布式事务的解决方案,遵循最终一致性并基于消息队列实现的分布式事务中间件。

 

  1. CAP原理
  1. 消息持久化
  1. CAP使用本地数据库“Cap.Published”表持久化发布的消息。
  2. 消息成功发布到消息队列后“Cap.Received”表会更改状态为“Successed”,CAP启动消息队列持久化功能。

 

  1. 消息向MQ发送
  2. 消息从MQ消费
  3. 订阅者方法执行

 

  1. 应用场景
  1. 分布式事务中的最终一致性(异步确保)的方案
  2. 具有高可用性的 EventBus

 

  1. 如何使用CAP
  1. 引用CAP的NuGet包

PM> Install-Package DotNetCore.CAP

  1. 根据使用的不同类型的数据库,来引入不同的扩展包:

PM> Install-Package DotNetCore.CAP.SqlServer

PM> Install-Package DotNetCore.CAP.MySql

PM> Install-Package DotNetCore.CAP.PostgreSql

PM> Install-Package DotNetCore.CAP.MongoDB

  1. 根据使用的不同类型的消息队列,来引入不同的扩展包:

PM> Install-Package DotNetCore.CAP.RabbitMQ

PM> Install-Package DotNetCore.CAP.Kafka

  1. 启动配置

在 ASP.NET Core 程序中,你可以在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服务:

public void ConfigureServices(IServiceCollection services)

{

       services.AddCap(x =>

            {

                //配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表

 

                // 此方法默认使用的数据库Schema为Cap;2,要求最低sql server2012(因为使用了Dashboard的sql查询语句使用了Format新函数)

                //x.UseSqlServer("Integrated Security=False;server=服务器;database=cap;User ID=sa;Password=密码;Connect Timeout=30");

 

                // 配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表

                // 此方法可以指定是否使用sql server2008,数据库Schema,链接字符串

                x.UseSqlServer((options) =>

                {

                    //数据库连接字符串

                    options.ConnectionString = "数据库连接字符串";

                    //标记使用的是SqlServer2008引擎(此处设置的是2008,因为192.168.1.109数据库是2008)

                    options.UseSqlServer2008();

                    //Cap默认使用的数据库Schema为Cap;此处可以指定使用自己的数据库Schema

                    options.Schema = "CAPTest";

                });

 

                //使用Kafka作为底层之间的消息发送

                x.UseKafka("192.168.1.100:9092,192.168.1.101:9092,192.168.1.102:9092");

                //x.UseKafka(options =>

                //{

                //    options.Servers = "192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092";

                //});

 

                //使用Dashboard,这是一个Cap的可视化管理界面;默认地址:http://localhost:端口/cap

                x.UseDashboard();

 

                //默认分组名,此值不配置时,默认值为当前程序集的名称

                x.DefaultGroup = "DefaultGroup";

 

                //失败后的重试次数,默认50次;在FailedRetryInterval默认60秒的情况下,即默认重试50*60秒(50分钟)之后放弃失败重试

                //x.FailedRetryCount = 10;

 

                //失败后的重拾间隔,默认60秒

                //x.FailedRetryInterval = 30;

 

                //设置成功信息的删除时间默认24*3600秒

                //x.SucceedMessageExpiredAfter = 60 * 60;

 

                //失败之后的回调函数:达到失败重试的上线时才触发此事件

                x.FailedThresholdCallback = FailCallBack;

            });

}

 

  1. 发布消息,使用ICapPublisher接口中的Publish<T>或者PublishAsync<T>方法来发送消息:

public class PublishController : Controller{

    private readonly ICapPublisher _capBus;

 

    public PublishController(ICapPublisher capPublisher)

    {

        _capBus = capPublisher;

    }

 

    //不使用事务    [Route("~/without/transaction")]

    public IActionResult WithoutTransaction()

    {

        _capBus.Publish("xxx.services.show.time", DateTime.Now);

 

        return Ok();

    }

 

    //Ado.Net 中使用事务,自动提交    [Route("~/adonet/transaction")]

    public IActionResult AdonetWithTransaction()

    {

        using (var connection = new MySqlConnection(ConnectionString))

        {

            using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))

            {

                //业务代码

 

                _capBus.Publish("xxx.services.show.time", DateTime.Now);

            }

        }

        return Ok();

    }

 

    //EntityFramework 中使用事务,自动提交    [Route("~/ef/transaction")]

    public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)

    {

        using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))

        {

            //业务代码

 

            _capBus.Publish("xxx.services.show.time", DateTime.Now);

        }

        return Ok();

    }}

 

(6)消息补偿

有时候当发送一条消息出去之后,希望有一个回调可以获得消费方法的通知,用来补偿发送方做的业务操作,那么可以使用下面这个重载。

PublishAsync<T>(string name,T object, string callBackName)

这个重载中 callbackName 是一个回调的订阅方法名称,当消费端处理完成消息之后CAP会把消费者的处理结果返回并且调用指定的订阅方法。

在一些需要业务补偿的场景中,我们可以利用此特性进行一些还原的补偿操作。例如:电商系统中的付款操作,订单在进行支付调用支付服务的过程中如果发生异常,那么支付服务可以通过返回一个结果来告诉调用方此次业务失败,调用方将支付状态标记为失败。 调用方通过订阅 callbackName(订阅参数为消费方方法的返回值) 即可接收到支付服务消费者方法的返回结果,从而进行补偿的业务处理。

下面是使用方法:

// 发送方_capBus.Publish("xxx.services.show.time",DaateTime.Now,"callback-show-execute-time");

[CapSubscribe("callback-show-execute-time")]   //对应发送的 callbackName

public void ShowPublishTimeAndReturnExecuteTime(DateTime time)

{

Console.WriteLine(time); // 这是订阅方返回的时间

}

//------------------------------------------------------------

//订阅方[CapSubscribe("xxx.services.show.time")]

public DateTime ShowPublishTimeAndReturnExecuteTime(DateTime time)

{

    Console.WriteLine(time); // 这是发送的时间

return DateTime.Now; // 这是消费者返回的时间,CAP会取该方法的返回值用来传递到发送方的回调订阅里面

}

  1. 订阅/消费

注意:框架无法做到100%确保消息只执行一次,所以在一些关键场景消息端在方法实现的过程中自己保证幂等性。

使用 CapSubscribeAttribute 来订阅 CAP 发布出去的消息。

1

2

3

4

5

[CapSubscribe("xxx.services.bar")]public void BarMessageProcessor(){

}

这里,你也可以使用多个 CapSubscribe[""] 来同时订阅多个不同的消息 :

1

2

3

4

5

6

[CapSubscribe("xxx.services.bar")][CapSubscribe("xxx.services.foo")]public void BarAndFooMessageProcessor(){

}

其中,xxx.services.bar 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。

例外情况

这里有几种情况可能需要知道:

① 消息发布的时候订阅方还未启动

Kafka:

当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。

RabbitMQ:

在 RabbitMQ 中,应用程序**首次启动**会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失

针对RabbitMQ的消息丢失的问题,有两种解决方式:

i. 部署应用程序之前,在RabbitMQ中手动创建具有durable特性的Exchange和Queue,默认情况他们的名字分别是(cap.default.topic, cap.default.group)。

ii. 提前运行一遍所有实例,让Exchange和Queue初始化。

我们建议采用第 ii 种方案,因为很容易做到。

文档:

http://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/

源码:

https://github.com/dotnetcore/CAP/tree/master/samples

 

 

 

 类似资料: