using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace RabbitMQ_Consumer
{
public class RabbitMQHelper
{
readonly ConnectionFactory connectionFactory;
readonly IConnection connection;
readonly IModel channel;
readonly string exchangName;
public RabbitMQHelper(string changeName = "")
{
this.exchangName = changeName;
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
//创建连接
connection = factory.CreateConnection();
//创建通道
channel = connection.CreateModel();
//声明交换机
//channel.ExchangeDeclare(exchangName, ExchangeType.Direct);
}
public void SendMs<T>(string queName, T msg) where T : class
{
//声明一个队列
channel.QueueDeclare(queName, true, false, false, null);
channel.QueueBind(queName, exchangName, queName);
var basicProperties = channel.CreateBasicProperties();
basicProperties.DeliveryMode = 2;
string msgJson = JsonConvert.SerializeObject(msg);
var body = Encoding.UTF8.GetBytes(msgJson);
var address = new PublicationAddress(ExchangeType.Direct, exchangName, queName);
channel.BasicPublish(address, basicProperties, body);
}
public void Receive(string queName, IFreeSql newfreesql)
{
if (queName == "DYHTXX")
{
queName = "DYHT";
}
if (queName == "JGZHXX")
{
queName = "JGZHXY";
}
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
if (!string.IsNullOrWhiteSpace(message))
{
//通过反射跨类库拿实体
Assembly assembly = Assembly.LoadFrom("Models");
//var fullName = $"Models.{queName}";
//var instanceTypes = Type.GetType($"System.Collections.Generic.List`1[[{fullName}]]");
Type instanceType = assembly.GetType($"Models.{queName}");
var instanceTypes = Type.GetType($"System.Collections.Generic.List`1[[{instanceType.AssemblyQualifiedName}]]");
//Type instanceType = Type.GetType($"RabbitMQ_Consumer.{queName}");
//var instanceTypes = Type.GetType($"System.Collections.Generic.List`1[RabbitMQ_Consumer.{queName}]");
//var instanceDatas = JsonConvert.DeserializeObject(message, instanceTypes);
//var info = JsonConvert.DeserializeObject<List<MyBaseClass>>(message);
var instanceDatas = Newtonsoft.Json.JsonConvert.DeserializeObject(message, instanceTypes);
//通过反射拿方法
var fsqlType = newfreesql.GetType();
var insertMethod = fsqlType
.GetMethods()
.Where(o => o.Name == "Insert"
&& o.GetParameters().Where(p => p.ParameterType.Name == "List`1").ToList().Count == 1
&& o.GetGenericArguments().Length == 1
).FirstOrDefault();
var insertGenericMethod = insertMethod.MakeGenericMethod(instanceType);
var iInsert = insertGenericMethod.Invoke(newfreesql, new object[] { instanceDatas });
var iInsertType = iInsert.GetType();
var executeAffrowsMethod = iInsertType.GetMethods()
.Where(o => o.Name == "ExecuteAffrows" && o.GetGenericArguments().Length == 0)
.FirstOrDefault();
var resultInsert = executeAffrowsMethod.Invoke(iInsert, null);
channel.BasicAck(ea.DeliveryTag, false);
}
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queName, false, consumer);
//Console.WriteLine("消费者已启动");
//Console.ReadKey();
//channel.Dispose();
//connection.Close();
}
}
}