Mysql Q4M 队列操作封装(二)
荣德厚
2023-12-01
/// <summary>
/// Q4M队列操作基类
/// </summary>
/// <typeparam name="T"></typeparam>
/// Author:luyifeng
/// Createday:2013 05 02
public abstract class MyQ4M<T> : IQ4M<T>
{
private bool _isGetData = false;
private MySqlConnection _conn = null;
private readonly string _connectionKey = null;
public T QueueItem { get; set; }
protected GanjiQ4M(string connectionKey)
{
_connectionKey = connectionKey;
}
public T First()
{
Dispose();
GetDb();
QueueItem = GetItem();
if (QueueItem != null)
{
_isGetData = true;
}
return QueueItem;
}
protected abstract string GetItemSql();
private T GetItem()
{
string strSql = GetItemSql();
if (string.IsNullOrEmpty(strSql))
{
throw new Exception("没有设置获取队列的sql语句");
}
var cmd = new MySqlCommand(strSql, _conn);
using (IDataReader reader = cmd.ExecuteReader())
{
if (reader.Read())
{
var que = EntityHelper.TransformPoco<T>(reader);
return que;
}
}
return default(T);
}
public void Remove()
{
//第一条数据出队
const string strSql = "select queue_end();";
ExcuteSql(strSql);
//状态恢复
ClearState();
}
public void Rollback()
{
//回滚数据
const string strSql = "select queue_abort();";
ExcuteSql(strSql);
//状态恢复
ClearState();
}
private void ExcuteSql(string sql)
{
var cmd = new MySqlCommand(sql, _conn);
cmd.ExecuteNonQuery();
}
private void GetDb()
{
if (string.IsNullOrEmpty(_connectionKey))
{
throw new Exception("没有配置连接字符串key");
}
var connectionString = System.Configuration.ConfigurationManager.ConnectionStrings[_connectionKey].ConnectionString;
_conn = new MySqlConnection(connectionString);
_conn.Open();
}
private void ClearState()
{
_isGetData = false;
QueueItem = default(T);
if (_conn != null)
{
if (_conn.State == ConnectionState.Open)
{
_conn.Close();
}
_conn.Dispose();
}
}
public void Dispose()
{
if (_isGetData)
{
Rollback();
}
}
}