前言
代码、源码不重要,重要的是思想,希望大家多给建议。
正文
微软有个叫SSIS,引用了数据流概念,不过更加强大的是,他基于了sql server,能够进行数据分析,构造数据仓库。
数据挖掘的目标的确远了,不过数据引擎我导开发了一个。
先看个demo。
需求:
我有个订单表POS_SALESORDER,
需要生成一张订单的消费凭证:POS_SALESORDERRECEIPT,
其中凭证的一些数据来源于我的顾客表:USR_PROFILE
传统的c#代码:
(取得POS_SALESORDERRECEIPT表,查询客户数据USR_PROFILE,然后再结合POS_SALESORDER生成凭证)
INoebeCommand command = NoebeManager.Instance.NoebeCommand;
command.SQL = " SELECT * FROM USR_PROFILE WHERE USERCODE = :USERCODE " ;
command.Parameters.Add( " USERCODE " , row[ " USERCODE " ].ToString());
DataTable usrtb = command.ExecuteReader();
DataRow userrow = null ;
if (usrtb.Rows.Count == 0 )
userrow = null ;
else
userrow = usrtb.Rows[ 0 ];
double staffcommission = 0 ;
double commission = 0 ;
if (userrow == null )
{
staffcommission = CitiboxGlobalStringHelper.default_staffcommission;
}
else
{
staffcommission = double .Parse(userrow[ " STAFFCOMMISSION " ].ToString());
commission = double .Parse(userrow[ " COMMISSION " ].ToString());
}
DataRow receiptrow = receipt.NewRow();
// receiptrow["ID"] =
receiptrow[ " ORDERBILLCODE " ] = row[ " BILLCODE " ];
receiptrow[ " RECEIPTCODE " ] = CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk();
receiptrow[ " SHOPCODE " ] = row[ " SHOPCODE " ];
receiptrow[ " SHOPNAME " ] = row[ " SHOPNAME " ];
receiptrow[ " MERCHANTCODE " ] = row[ " USERCODE " ];
receiptrow[ " MERCHANTNAME " ] = row[ " USERNAME " ];
receiptrow[ " CREATEDATE " ] = Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow[ " MODIDATE " ] = Pixysoft.Tools.GlobalTimer.Instance.GetGlobalTime();
receiptrow[ " ORDERTEMPLATECODE " ] = row[ " TEMPLATECODE " ];
receiptrow[ " ORDERTEMPLATENAME " ] = row[ " TEMPLATENAME " ];
receiptrow[ " DEPOSITPRICE " ] = row[ " DEPOSITPRICE " ];
receiptrow[ " ITEMPRICE " ] = row[ " ITEMPRICE " ];
receiptrow[ " REALPRICE " ] = row[ " ITEMPRICE " ];
receiptrow[ " STATUS " ] = ( int )BillIntStatus.New;
receiptrow[ " REMARK " ] = " 订单成功 " ;
receiptrow[ " COMMISSION " ] = commission;
receiptrow[ " STAFFCOMMISSION " ] = staffcommission;
receipt.Rows.Add(receiptrow);
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipt);
如果用数据流引擎:
IInput input = dataflow.GetInput();
input.Add(row);
input.Add( " @DEFAULTSTAFFCOMMISSION " , CitiboxGlobalStringHelper.default_staffcommission);
input.Add( " @STATUS " , ( int )BillIntStatus.New);
input.Add( " @RECEIPTCODE " , CitiboxGlobalPkHelper.Instance.GetBillPosOrderReceiptPk());
dataflow.Initialize(input);
IExchanger exchanger = dataflow.GetExchanger( " POS_SALESORDERRECEIPT " );
exchanger.AddScript( " ORDERBILLCODE = POS_SALESORDER.BILLCODE " );
exchanger.AddScript( " RECEIPTCODE = @RECEIPTCODE " );
exchanger.AddScript( " SHOPCODE = POS_SALESORDER.SHOPCODE " );
exchanger.AddScript( " SHOPNAME = POS_SALESORDER.SHOPNAME " );
exchanger.AddScript( " MERCHANTCODE = POS_SALESORDER.USERCODE " );
exchanger.AddScript( " MERCHANTNAME = POS_SALESORDER.USERNAME " );
exchanger.AddScript( " CREATEDATE = SYS.DATETIME " );
exchanger.AddScript( " MODIDATE = SYS.DATETIME " );
exchanger.AddScript( " ORDERTEMPLATECODE = POS_SALESORDER.TEMPLATECODE " );
exchanger.AddScript( " ORDERTEMPLATENAME = POS_SALESORDER.TEMPLATENAME " );
exchanger.AddScript( " DEPOSITPRICE = POS_SALESORDER.DEPOSITPRICE " );
exchanger.AddScript( " ITEMPRICE = POS_SALESORDER.ITEMPRICE " );
exchanger.AddScript( " REALPRICE = POS_SALESORDER.ITEMPRICE " );
exchanger.AddScript( " STATUS = @STATUS " );
exchanger.AddScript( " REMARK = '订单成功' " );
dataflow.Runflow(exchanger);
ILoader loader = dataflow.GetLoader( " USR_PROFILE " );
loader.Sql = " SELECT STAFFCOMMISSION,COMMISSION FROM USR_PROFILE WHERE USERCODE = :USERCODE " ;
loader.AddScript( " USERCODE = POS_SALESORDER.USERCODE " );
dataflow.Runflow(loader);
if (loader.Succeed.IsAlive)
{
IExchanger subexchanger = dataflow.GetExchanger( " POS_SALESORDERRECEIPT " );
subexchanger.AddScript( " COMMISSION = USR_PROFILE.COMMISSION " );
subexchanger.AddScript( " STAFFCOMMISSION = USR_PROFILE.STAFFCOMMISSION " );
dataflow.Runflow(subexchanger);
}
else
{
IExchanger subexchanger = dataflow.GetExchanger( " POS_SALESORDERRECEIPT " );
subexchanger.AddScript( " COMMISSION = 0 " );
subexchanger.AddScript(ScriptType.Number, " STAFFCOMMISSION = @DEFAULTSTAFFCOMMISSION " );
dataflow.Runflow(subexchanger);
}
IOutput output = dataflow.GetOutput();
DataTable receipttb = output.GetInsertTable( " POS_SALESORDERRECEIPT " );
CstNoebeManager.Instance.ClientManager.Session.AutoInsert(receipttb);
似乎代码没有什么节省。不过,如果我的生成的表数据非常复杂,比如:多个表的四则运算、函数运算,那么传统就需要写一大堆的小方法,算好了,再传递给字段。
这个时候,数据流引擎就发挥作用了,所有的函数运算仅需要写好表达式,自动计算。
数据流模块
IExchanger 就是上文的数据交换
ILoader 读取数据库装载数据
Ifer 条件判断,例如当订单价格ITEMPRICE>30的时候,xxx
ISwitcher 值判断,例如根据订单客户类型MERCHANTTYPECODE,进行不同的处理
IMapper 字段值映射,例如把某个占位符映射成一个具体的值,@STATUS = 1
Injector 数据中途注入,除了数据库装载,可以在中途注入新的数据,再次运算。
Isorter 流排序,如果装载了新的数据,和旧的对不上,那么通过排序能够重新接上(例如先后装载表A,表B,但是大家对不上好,那么我根据条件表A.Merchantcode = 表B.merchantcode排序之后,就对上了)
最后还有个Foreach功能,和MergeForeach,把数据流分开处理后,合并。
一个复杂的数据流处理案例(samsara可以做的更多!):
closingreceipt.Merchantcode = webClosingRow[ " MERCHANTCODE " ].ToString();
closingreceipt.Merchantname = webClosingRow[ " MERCHANTNAME " ].ToString();
// 取得本地结算表
string pk = CitiboxGlobalPkHelper.Instance.GetBillSaleClosingPk();
Info( " get primary key for balance bill. pk = " + pk);
IDataflow dataflow = SamsaraManager.Instance.Dataflow;
IInput input = dataflow.GetInput();
input.Add(webClosingRow);
input.Add( " @BILL_PRIMARYKEY " , pk);
input.Add( " @DEFAULT_USRBOXCODE " , CitiboxGlobalStringHelper.default_usrboxcode);
dataflow.Initialize(input);
// 取得网站结算单
Info( " get web_salesclosing detail. " );
ILoader loader = dataflow.GetLoader( " WEB_SALESCLOSINGDETAIL " );
loader.Sql = " SELECT * FROM WEB_SALESCLOSINGDETAIL WHERE BILLCODE = :BILLCODE " ;
loader.AddScript( " BILLCODE = WEB_SALESCLOSING.BILLCODE " );
dataflow.Runflow(loader);
// 生成本地结算单
foreach (IDataflow subflow in dataflow.Foreach( " WEB_SALESCLOSINGDETAIL " ))
{
Ifer ifflow = subflow.If( " WEB_SALESCLOSINGDETAIL.USRBOXCODE == @DEFAULT_USRBOXCODE " );
IDataflow iftrueflow = ifflow.True;
bool hasreceipt = false ;
if (iftrueflow.IsAlive)
{
hasreceipt = GetCurrentNonReceiptTable(iftrueflow).Succeed.IsAlive;
}
IDataflow iffalseflow = ifflow.False;
if (iffalseflow.IsAlive)
{
ILoader usrboxloader = UsrboxIsUnavailable(subflow);
if ( ! usrboxloader.Succeed.IsAlive)
{
continue ;
}
else
{
hasreceipt = GetCurrentReceiptTable(iffalseflow).Succeed.IsAlive;
}
}
Info( " create BIL_SALESCLOSINGDETAIL " );
if (hasreceipt)
{
IExchanger exchangerflow = subflow.GetExchanger( " BIL_SALESCLOSINGDETAIL " );
exchangerflow.AddScript( " BILLCODE = @BILL_PRIMARYKEY " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGPRICE = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY ) " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.COMMISSION ) " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGSTAFFCOMMISSION = SUM( POS_ITEMRECEIPT.SALEPRICE * POS_ITEMRECEIPT.SALEQTY * POS_ITEMRECEIPT.STAFFCOMMISSION ) " );
exchangerflow.AddScript( " CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM " );
exchangerflow.AddScript( " CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO " );
exchangerflow.AddScript( " CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE " );
exchangerflow.AddScript( " USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE " );
subflow.Runflow(exchangerflow);
}
else
{
IExchanger exchangerflow = subflow.GetExchanger( " BIL_SALESCLOSINGDETAIL " );
exchangerflow.AddScript( " BILLCODE = @BILL_PRIMARYKEY " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGPRICE = 0 " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGCOMMISSION = 0 " );
exchangerflow.AddScript(ScriptType.Number, " CLOSINGSTAFFCOMMISSION = 0 " );
exchangerflow.AddScript( " CLOSINGDATEFROM = WEB_SALESCLOSINGDETAIL.CLOSINGDATEFROM " );
exchangerflow.AddScript( " CLOSINGDATETO = WEB_SALESCLOSINGDETAIL.CLOSINGDATETO " );
exchangerflow.AddScript( " CLOSINGDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE " );
exchangerflow.AddScript( " USRBOXCODE = WEB_SALESCLOSINGDETAIL.USRBOXCODE " );
subflow.Runflow(exchangerflow);
}
ILoader blsloader = GetBalanceControlTable(subflow);
IDataflow blstrueflow = blsloader.Succeed;
if (blstrueflow.IsAlive)
{
IExchanger blsexchanger = blstrueflow.GetExchanger( " BLS_COMMODITYACCOUNTCONTROL " );
blsexchanger.AddScript( " CONTROLDATE = WEB_SALESCLOSINGDETAIL.CLOSINGDATE " );
blsexchanger.AddScript( " MODIDATE = SYS.DATETIME " );
blsexchanger.AddScript( " LASTCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE " );
blsexchanger.AddScript( " LASTCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION " );
blsexchanger.AddScript( " LASTCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION " );
blstrueflow.Runflow(blsexchanger);
IExchanger webexchanger = blstrueflow.GetExchanger( " WEB_SALESCLOSINGDETAIL " );
webexchanger.AddScript( " REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE " );
webexchanger.AddScript( " REALCLOSINGCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION " );
webexchanger.AddScript( " REALCLOSINGSTAFFCOMMISSION = BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION " );
webexchanger.AddScript( " REALCLOSINGPRICE = BIL_SALESCLOSINGDETAIL.CLOSINGPRICE " );
blstrueflow.Runflow(webexchanger);
}
IDataflow blsfalseflow = blsloader.Failed;
if (blsfalseflow.IsAlive)
{
Error( string .Format( " missing bls_commodityaccountcontrol. user validation fail. merchantcode = {0} " ,
webClosingRow[ " MERCHANTCODE " ].ToString()));
return null ;
}
IDataflow absreceiptflow = subflow.If( " POS_ITEMRECEIPT.STATUS == " + BillStringStatus.Abnomity).True;
{
if (absreceiptflow.IsAlive)
{
IExchanger absexchanger = absreceiptflow.GetExchanger( " POS_ITEMRECEIPT " );
absexchanger.AddScript( " STATUS = " + BillStringStatus.New);
absexchanger.AddScript( " CREATEDATE = SYS.DATETIME " );
absreceiptflow.Runflow(absexchanger);
}
}
DataTable closingdetailtb = subflow.Peekflow( " WEB_SALESCLOSINGDETAIL " );
DataTable receipttb = subflow.Peekflow( " POS_ITEMRECEIPT " );
if (closingdetailtb.Rows.Count == 0 )
continue ;
DataRow closingdetailrow = closingdetailtb.Rows[ 0 ];
SalesClosingItem closingitem = new SalesClosingItem();
closingitem.Boxlocationcode = closingdetailrow[ " BOXLOCATIONCODE " ].ToString();
closingitem.Datefrom = closingdetailrow[ " CLOSINGDATEFROM " ].ToString();
closingitem.Dateto = closingdetailrow[ " CLOSINGDATETO " ].ToString();
closingitem.Price = ( double )closingdetailrow[ " REALCLOSINGPRICE " ];
closingitem.Commission = ( double )closingdetailrow[ " REALCLOSINGCOMMISSION " ];
closingitem.Receipttb = receipttb;
closingreceipt.Items.Add(closingitem);
}
dataflow.MergeForeach();
Info( " begin change BIL_SALESCLOSING " );
IExchanger mainbillexchanger = dataflow.GetExchanger( " BIL_SALESCLOSING " );
mainbillexchanger.AddScript( " BILLCODE = @BILL_PRIMARYKEY " );
mainbillexchanger.AddScript( " MERCHANTCODE = WEB_SALESCLOSING.MERCHANTCODE " );
mainbillexchanger.AddScript( " CREATEDATE = SYS.DATETIME " );
mainbillexchanger.AddScript( " MODIDATE = SYS.DATETIME " );
mainbillexchanger.AddScript(ScriptType.Number, " CLOSINTTOTALPRICE = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGPRICE) " );
mainbillexchanger.AddScript(ScriptType.Number, " CLOSINTTOTALCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGCOMMISSION) " );
mainbillexchanger.AddScript(ScriptType.Number, " CLOSINGTOTALSTAFFCOMMISSION = SUM (BIL_SALESCLOSINGDETAIL.CLOSINGSTAFFCOMMISSION) " );
dataflow.Runflow(mainbillexchanger);
Info( " begin change web_salesclosing status to pass. " );
IExchanger mainwebexchanger = dataflow.GetExchanger( " WEB_SALESCLOSING " );
mainwebexchanger.AddScript(ScriptType.Number, " STATUS = " + ( int )BillIntStatus.Pass);
dataflow.Runflow(mainwebexchanger);
IOutput output = dataflow.GetOutput();
后记
代码乱了。
说下samsara,是佛教中轮回的意思。
第一阶段:
当时是5年前,开发一个信息系统,被客户搞烦了,整天要修改表结构,因此我想出了一个用脚本去运算表的思路。成为第一代samsara。
当时刚刚接触c#,xml之类的,因此所有的配置用xml,samsara读取xml之后直接运算。
事实上发现了,xml根本不是给人看的,维护起来太麻烦了。而且把企业的业务逻辑绑在xml,debug的时候不知道为什么会有异常。
第二阶段:
因此毕业的时候,开发了samsara 第二代。把脚本简化,使用人读的语言,而不是xml。
能够减少一些开发难度。但是企业业务逻辑还是绑定在xml,维护非常不方便。
第三阶段:
之后工作了,一直没有时间用samsara,自己也没有信心,所以在后来系统里面简单调用了一下之后就荒废了。
现在正好工作没了,有一段空闲的时间,让我好好根据这几年的积累重新修改。
于是提出了脚本与代码结合的方式,成为了现在的samsara第三代。
他的特点是,业务的逻辑由代码完成,我的samsara尽量的接近c#的一些逻辑处理。然后一些复杂的数据运算交给脚本完成。
我个人认为,第三代samsara可以商业化了。接下来,第四代samsara完全可以开发数据仓库了。
或者可以考虑把对象运算加入,成为对象流引擎。我称为
samsara 第四代!