然后添加一个setTimeout一个getTimeout,然后为Session添加响应的setter和getter。最后在收到了Logon消息的响应时调用Session的setTimeout设置超时时间。
quickfix.Session.next(message):
首先检查SessionTime,如果超过1秒未刷新则刷新时间戳;如果发现Session不存在,则执行reset,重置Session,并返回。
然后检查消息的header中的beginString,如果和SessionID中记录的beginString不匹配则抛异常,返回。
然后检查msgType,如果是Logon消息,根据版本的不同,设置Session中的targetDefaultApplVerID。targetDefaultApplVerID有什么用处呢?因为FIXT协议和之前版本的FIX协议的ApplVerID有不同的组成格式,这个targetDefaultApplVerID主要是给quickfix.MessageCracker用来解析FIXT消息的。
然后检查dataDictionaryProvider, 根据customApplVerID和applVerID取到数据字典(applicationDataDictionary及sessionDataDictionary,细节请参考16)。
然后根据取到的字典验证message是否合法,不合法的根据情况打印警告日志或者直接抛出异常退出。
然后根据消息类型,分别做相应的状态设置(通过nextLogon,nextHeartBeat,nextTestRequest,nextSequenceReset,nextLogout),然后调用用户call back API(在verify中)。
处理完毕该消息之后,调用nextQueued()递归处理下一个队列中的消息。
请注意verify(message)函数,所有的普通消息通过这个函数去回调application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。关于消息的解析,其中普通Message是通过quickfix.MessageUtils.parse将String类型的消息解析成Message。
next(message)被SingleThreadedEventHandlingStrategy或者ThreadPerSessionEventHandlingStrategy处理每一条收到的消息时调用,是处理收到消息的源头。
quickfix.Session.next(string):就是next(message)的重载函数,将输入的字符串解析成Message,然后调用next(message)处理。
quickfix.Session.verify(msg, checkTooHigh, checkTooLow):收到的消息的MsgSeqNum与预期的不一样时所做的状态检查和操作。操作比如,如果发现丢包,则自动重发ResendRequest请求或者仅仅记录报警日志;如果发现重复MsgSeqNum,如果消息中设置了可能重复标志,则正常返回,否则说明消息有误,则Logout,disconnect。
quickfix.Session.verify(msg):verify(msg, checkTooHigh, checkTooLow)的重载函数。根据是否是Admin类型的消息及从配置中读取的checkGapFieldOnAdminMessage确定checkTooHigh和checkTooLow这两个标志。如果不是Admin消息则checkTooXX,如果checkGapFieldOnAdminMessage为真则checkTooXX。
quickfix.Session.doTargetTooHigh(message):收到的消息MsgSeqNum比预期的大,即丢失了消息,先检查是否需要reset或者disconnect(至于是否需要reset或者disconnect由创建Session时通过构造函数的参数指定,默认均是不需要,也可以通过指定配置ResetOnError和DisconnectOnError),如果需要则reset或者disconnect后返回。如果不需要,则将这条消息入队(enqueue)存入state中,然后生成ResendRequest并发送出去,请求从expectedTargetNum开始到此消息前的所有消息。由此可见,在state的messageQueue中存储的消息都是因为发现消息序号跳跃缺失消息后,临时存储的跳跃之后新消息。而这些临时存储在state中的消息,会在收到并处理完每种消息之后都会去检查并处理,比如next(msg),nextHeartBeat(msg),nextLogon(msg),nextReject(msg),nextTestRequest(msg)这些处理过程的最后都会检查并处理state临时存储的消息。
quickfix.Session.doTargetTooLow(message):收到的消息MsgSeqNum比预期的小,即收到了重复消息,应该做的事情。
quickfix.Session.parseMessage(stringData):将输入的字符串数据解析成Message。实际是调用quickfix.parse(session, stringData)解析的。
quickfix.Session.send(String messageString):真正通过底层Mina发送消息给对方的API。在QuickFix/J中,send(msgString)通过responder发送消息,responder最终调用了Mina的IoSession.write(object)将数据写入网络中。
responder是通过quickfix.Session.setResponder(Responder responder)在quickfix.mina.initiator.InitiatorIoHandler中创建Session时设置的。关于responder的细节请参考15。
quickfix.Session.sendRaw(message, int num):
Session内部的helper方法,首先将传入的消息和该消息的MsgSeqNum拼装好,把消息头中与系统相关的参数设置好,参数比如BeginString,SenderCompID,TargetCompID,SendingTime。
然后根据消息类型回调响应的用户call back API,如果是Admin类型的消息,回调用户的toAdmin,非Admin类型的消息,回调用户的toApp。如果是LOGON消息,还要对是否重置SeqNum做相应的处理(具体处理过程是:如果消息中有ResetSeqNumFlag这个字段,并且这个字段被设置true则重置。如果重置,则首先resetState,取到期望的seqNum设置到消息头的MsgSeqNum字段中。无论是否重置,都将resetSeqNumFlag写入state中)。
最后把拼装好的message调用toString转换成字符串,然后调用send(string)方法真正的将消息发送到网络上去。
整个处理过程需要锁定SenderMsgSeqNum,直到全部操作完成返回。因为如果call back调用失败,则roll back会更高效。
sendRaw被Session中generateXXX调用,比如generateHeartbeat,generateLogon,generateLogout,generateReject,generateBusinessReject,generateResendRequest,generateSequenceReset,generateTestRequest。这些generateXXX中生成了相应消息之后,最后调用sendRaw把消息通过网络发送出去。
quickfix.Session.send(Message message):做了两件事,首先去掉消息头中的PossDupFlag和OrigSendingTime,然后调用sendRaw把消息发送到网络上去(设置seqNum为0)。主意send(message)返回的boolean并不能说明消息是否成功发送出去,仅仅是说明消息已经成功放入发送队列了,因为默认情况下QuickFix/J使用异步IO发送网络数据。
6.quickfix.mina.initiator.IoSessionInitiator:使用MINA提供的传输层的API,建立、维护同服务器之间的传输层的网络连接,而不是应用层的网络连接。这些网络功能都在一个叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的一个私有的TimerTask中实现。具体实现功能有连接(包括普通连接和加密SSL连接)、重连、判断是否应该重连、处理连接异常、启动和停止ConnectTask。
7.quickfix.SocketInitiator:使用单独的线程去为所有的Session处理消息。SocketInitiator提供的功能有:
a) 初始化,即用eventHandlingStrategy创建Initiator,然后注册此SocketInitiator所管理的全部Session,然后启动Initiator,最后调用eventHandlingStrategy.blockInThread()在另外的后台线程中去处理SessionTimer收到的插入队列的消息。启动Initiator做的事情依次是:先启动SessionTimer去监听从传输层过来的消息,如果没有Logon则先Logon,然后在收到消息后回调用户代码处理消息;启动reconnectTask去建立和维护传输层的网络连接。
b) 启动Initiator,在另外的线程中后台(Daemon)处理消息。
c) 阻塞Initiator,在同一线程中处理消息。
d) 停止Initiator。分为强制停止和非强制停止。强制或者非强制Logout所有FixSession,停止连接层的Initiator,取消注册所有此SocketInitiator所管理的全部Session。
e) 关于a) b)如何处理来自底层的消息的逻辑,请参考11和12。因为这里所谓的处理消息实际上是直接或者间接调用了SingleThreadedEventHandlingStrategy的block处理消息。
8.quickfix.ThreadedSocketInitiator:为每一个Session使用一个单独的线程去处理消息。功能参考7。除了线程工作模式不一样,功能和7完全一样。
9.quickfix.SessionState: Session和对方通信过程中使用的helper类。主要功能就是存储了Session的所有状态,并且提供了响应API访问这些状态。状态包括heartBeatInterval,heartBeatMillis,是否需要heartBeat,判断Session所在应用程序的角色(客户端Initiator or 服务器端Acceptor),lastReceivedTime,lastSentTime,获取logger,判断作为客户端的Initiator登陆消息是否已经发出,判断作为服务器端的Acceptor登陆消息是否收到,判断是否需要登陆,判断登陆是否TimeOut,messageStore,testRequestCounter,判断是否需要TestRequest,判断是否处于TimeOut状态,将收到的Message入队(enqueue),出队(dequeue),锁定和解锁发送/接受的Sequence Number,获取、设置自增的下一个Sequence Number,重置(即将Sequence清空,重新从1开始计数),设置、获取Logout的原因。
10.quickfix.mina.EventHandlingStrategy:用于不同版本FIX协议处理事件的策略的接口,是应用级处理消息回调接口的根源。当传输层消息到达时调用此接口onMessage,可以这么理解,onMessage是EventHandlingStrategy的输入,这个输入来自底层。getSessionConnector获取和这个策略相关的SessionConnector,即获取和这个Session相关的Initiator/Acceptor去处理响应的输入消息,一般情况下是逐层将消息向上层传出,回调用户的函数处理该消息。getQueueSize获取当前被处理消息队列的长度。EventHandlingStrategy一般作为AbstractSocketInitiator的成员,建立IoSessionInitiator时传给IoSessionInitiator,请参考3 b)。目前在QuickFix/J中有两个具体实现,分别是quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。这两个具体实现类的说明请参考11,12。
11.quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J处理消息的核心类。处理消息时即便有多个Session也使用单线程模式。
a) 为了不阻塞输入,那么就需要一个eventQueue来临时快速的存储收到的所有消息。
b) onMessage接到底层传入的消息包装成SessionMessageEvent,首先将其存入eventQueue。
c) 那么SessionMessageEvent里面有什么?SessionEvent仅仅是把fixSession和Message包装到一起,并且提供了处理Message的方法processMessage。可以这样理解,onMessage是事件处理策略底层Message的输入,SessionEvent中的processMessage是Message的输出,未来被应用程序级别的函数处理。
d) getSessionConnector获取需要处理应用级的connector以便处理eventQueue中的消息。
e) getMessage从eventQueue中取出SessionMessageEvent待处理。
f) block就是应用程序级别处理消息的入口。block判断HandlingMessage是否应该继续运新,如果是则从消息队列中取出SessionMessageEvent,调用其中的processMessage去处理该Message。
g) processMessage如何处理了收到的消息呢?它会调用fixSession的next方法,将消息传给Session,由fixSession再接力将消息回调到用户手中。请参考5。
h) 也许你会注意到处理消息的block在run中始终被调用,而且没有任何sleep时间,难道它在没有消息的时候始终不停的死循环运行且丝毫不休息?CPU会保持100%?实际上效果不是这样的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在没有消息的时候,这个循环会每休息一秒再执行下一次循环。如何做到这样的效果呢?原因是如果eventQueue中如果没有消息,而该eventQueue设置了阻塞超时1000毫秒,则取消息的操作会等待最多1000毫秒,如果没有等到消息则超时退出不再等待,执行完毕本次循环,如果等到了则按照正常流程处理消息。这样做最大的好处就是,如果eventQueue中有事件,那么就会连续不断的处理,如果没有消息,就会休息timeout毫秒再查看。
i) blockInThread,在新启动的后台线程中处理SessionMessageEvent
12.quickfix.mina.ThreadPerSessionEventHandlingStrategy:同样是QuickFix/J处理消息的核心类。和单线程模式不同的是该策略会为每个session启动一个新的线程去处理消息。
a) 由于是每个Session对应一个线程,因此该策略内部需要一个称之为dispatchersMap作为缓存为每个Session保存响应的处理线程(MessageDispatchingThread)引用。
b) 当onMessage收到来自底层的输入消息时,根据输入的fixSession从dispatchers中取到相应的处理线程,并将该消息加入(enqueue)到该线程内部的消息队列中待处理。
c) 每个dispatcher(MessageDispatchingThread类型)内部均维护了自己的消息队列,和单线程模式不同在于,消息队列中的消息仅仅是Message,不是SessionMessageEvent。处理Message的逻辑从单线程中的SessionMessageEvent中移出到dispatcher中。
13.quickfix.DataDictionaryProvider:是一个接口,为指定的session protocol或者application version提供数据字典。getSessionDataDictionary根据提供的beginString 即协议版本获取相应的数据字典。getApplicationDataDictionary根据提供的application version ID和custom application ID获取数据字典。application version ID在FIXT.1.1之前由BeginString字段确定。custom application ID是可选值,不是必须的。
14.quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默认实现。在DefaultDataDictionaryProvider中,有两种数据字典,一种是传输用的数据字典,一种是应用程序用的数据字典,分别缓存在两个Map中。这个DefaultDataDictionaryProvider是在创建Session时由默认的DefaultSessionFactory根据beginString创建的。addApplicationDictionary和addTransportDictionary分别用于向DefaultDataDictionaryProvider添加新的数据字典。目前QuickFix/J的实现中,在DefaultSessionFactory中初始化Session时添加字典。对于fixt之前版本的数据字典,每个数据字典会被同时添加进入到传输数据字典和应用程序数据字典中。
15.quickfix.Responder:这是个接口,供Session使用,用于发送原始FIX消息(raw FIX message)或者切断和对方的I/O连接。send(stringData)发送原始的FIX消息,disconnect()切断底层的连接,getRemoteIPAddress()提供对方的IP。目前的QuickFix/J中quickfix.mina.IoSessionResponder实现了Responder接口。IoSessionResponder.send(string)调用了Mina的IoSession.write(object)将数据写入网络中。默认是异步写入。(如果同步写入在创建responder时显示传入synchronousWrites为true。同步写入实际是write(object)之后,取到ioSession的WriteFuture,然后向WriteFuture发送join指令。)
16.quickfix.DataDictionary:为不同版本的FIX协议提供消息的元数据(metadata),提供helper方法帮助判断field类型,判断消息的类型,判断组、组件的类型,检查message、field,field等。
总共有3种方式生成DataDictionary,首先可以通过一个系统路径读入配置文件生成,或者载入inputStream生成,或者从已有的DataDictionary拷贝生成。
DataDictionary中最复杂的方法就是load,思想是根据fixXXX.xml中的定义,判断xml文件是否合法,然后根据DTD取到各个section中的信息并存入预先定义好的Map、List、Set中,便于之后的Helper方法能够快速准确的搜索和判断相关信息。
17.quickfix.CompositeLog:将日志信息输出到多个logger中。多个logger在new CompositeLog(Log[] logs)时传入。但是多个logger没有优先级,也不能控制多个logger,仅仅是日志来了,同时写入这多个logger中。CompositeLog由CompositeLogFactory创建。
QuickFix/J中其他的logger还有:
quickfix.ScreenLog,将日志通过System.out打印到屏幕,相应的有quickfix.ScreenLogFactory。
quickfix.FileLog,将日志写入本地文件中。相应的有quickfix.FileLogFactory。
quickfix.JdbcLog,将日志写入数据库中。相应的有quickfix.JdbcLogFactory。
quickfix.SLF4JLog,使用SLF4J wrapper写日志,SLF4J支持JDK logging,log4j等。
另外多数Log的实现都extends了quickfix.AbstractLog。AbstractLog的作用是添加了对HeartBeat是否记录日志的开关。
B).网络数据在QuickFix/J中的流向
ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服务器端的通信。收到网络数据,ioConnector触发相应事件,并把事件交给ioHandler(InitiatorIoHandler)的processMessage -> processMessage中调用eventHandlingStrategy.onMessage(quickfixSession, message) ,将消息向外回调 -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 将收到的消息入队(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中启动单独的后台线程,依次从eventQueue取出消息处理,向session回调 -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根据msgType判断回调 ->逐层回调 (verify -> veriry -> fromCallback -> fromAdmin/fromApp),从fromAdmin/fromApp(msg, sessionID)回调用户处理逻辑。