在Nuget中搜索DotNetCore.CAP.RabbitMQ并安装
context.Services.AddCap(x =>
{
// 配置数据库链接
//x.UseEntityFramework<OrderDbContext>();
//配置数据库连接
string connectionString = configuration["ConnectionStrings:Order"];
x.UseMySql(connectionString);
// 配置消息队列
x.UseRabbitMQ(option =>
{
option.HostName = configuration["CAPRabbitMQ:HostName"];
string portStr = configuration["CAPRabbitMQ:Port"];
if (!string.IsNullOrEmpty(portStr))
{
option.Port = Convert.ToInt32(portStr);
}
option.VirtualHost = configuration["CAPRabbitMQ:VirtualHost"];
option.UserName = configuration["CAPRabbitMQ:UserName"];
option.Password = configuration["CAPRabbitMQ:Password"];
if (!string.IsNullOrEmpty(configuration["CAPRabbitMQ:ExchangeName"]))
{
option.ExchangeName = configuration["CAPRabbitMQ:ExchangeName"];
}
option.ConnectionFactoryOptions = factory => factory.AutomaticRecoveryEnabled = true;
});
// 设置处理成功的数据在数据库中保存的时间(秒),为了保证系统性能,数据会定期清理
x.SucceedMessageExpiredAfter = Convert.ToInt32(configuration["CAPRabbitMQ:SucceedMessageExpiredAfter"]);
// 重试次数
x.FailedRetryCount = Convert.ToInt32(configuration["CAPRabbitMQ:FailedRetryCount"]);
// 间隔时间 单位:秒
x.FailedRetryInterval = Convert.ToInt32(configuration["CAPRabbitMQ:FailedRetryInterval"]);
//发送消息失败后的回调
x.FailedThresholdCallback = (failedInfo) =>
{
//通知管理人员或其它逻辑 wms
};
x.Version = configuration["CAPRabbitMQ:Version"];
}).AddSubscribeFilter<MyCapFilter>();
可以看到在上面配置Rabbitmq服务的时候为了方便修改配置,读取的是Appsetting中的CAPRabbitMQ节点。所以在配置文件中添加配置节点Appsetting。如下、
"CAPRabbitMQ": {
"HostName": "172.16.7.39", // 多个用逗号隔开(主从)
"Port": 5672,
"UserName": "dev",
"Password": "dev_DKYao2021",
"VirtualHost": "vH_bcs_dev",
"ExchangeName": "e.bcs.dev.exchange", // 交换机.v3
"FailedRetryCount": 5, //失败重试次数
"FailedRetryInterval": 3, // 失败重试间隔时间 单位:秒
"Version": "v1",
"SucceedMessageExpiredAfter": 1440 // 定期清理成功的数据,单位分钟
},
像平时定义接口一样 定义在MQ中要响应的接口
public interface IMqAppService
{
void GetSubscribe(TodoDto<MqOrderInfoDto> orderObj);
}
这里以消费为例
/// <summary>
/// 获取订单信息
/// </summary>
/// <param name="orderObj"></param>
[CapSubscribe(EventConstants.EVENT_ORDER_WAYBILL, Group = "q.topic.order.reviewed")]
public void GetOrderSubscribe(TodoDto<MqOrderInfoDto> orderObj)
{
if (orderObj is {data: { }} && !string.IsNullOrEmpty(orderObj.data.OrderNo))
{
// 新版本拆单
var stressful = _pendingOrderAppService.OrderTurnInvoiceNew(orderObj.data.OrderNo).Result; _logger.LogInformation(stressful.message);
}
else
{
_logger.LogError("订单推送数据为空");
}
}
这里需要注意 Group 的值即为MQ服务器中Exchange的队列名称,
而 EventConstants.EVENT_ORDER_WAYBILL 是已经定义好的消息名称,需要注意应该与推送的消息名称一致