Initiator实现
客户端配置文件
[default]
FileStorePath=data/initiator
SocketConnectHost=127.0.0.1
SocketConnectPort=9999
ConnectionType=initiator
TargetCompID=QFJ_ACCEPTOR
StartTime=00:00:00
EndTime=23:00:00
HeartBtInt=30
FileLogPath=logs-initiator
# FIX50SP1.modified.xml的存放位置
AppDataDictionary=D:\Code\myqfjdemo\qfjacceptor\src\main\resources\FIX50SP1.modified.xml
UseDataDictionary=Y
ResetOnLogon=Y
ResetSeqNumFlag=Y
EnableNextExpectedMsgSeqNum=Y
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP1
[session]
SenderCompID=QFJ_INITIATOR1
先定义一个initiator工具类,负责创建Initiator、启动停止
package org.demo.myqfjdemo.initiator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
public class QfjInitiator {
private static final Logger logger = LoggerFactory.getLogger(QfjInitiator.class);
private static final String SESSION_SETTINGS_PATH = "/quickfix.properties";
private static SocketInitiator initiator;
private static SessionSettings sessionSettings;
private static QfjInitiatorApplication application;
private QfjInitiator() {};
public static void startServer() {
try {
sessionSettings = new SessionSettings(QfjInitiator.class.getResourceAsStream(SESSION_SETTINGS_PATH));
} catch (ConfigError configError) {
logger.error("SessionSettings config error " + configError);
}
application = new QfjInitiatorApplication();
MessageStoreFactory msgStoreFactory = new FileStoreFactory(sessionSettings);
LogFactory logFactory = new FileLogFactory(sessionSettings);
MessageFactory msgFactory = new DefaultMessageFactory();
try {
initiator = new SocketInitiator(application,
msgStoreFactory,
sessionSettings,
logFactory,
msgFactory);
initiator.start();
} catch (ConfigError configError) {
configError.printStackTrace();
logger.error("New initiator error " + configError);
}
}
public static void stopServer() {
initiator.stop();
}
}
实现Application
package org.demo.myqfjdemo.initiator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.field.*;
import quickfix.fix50sp1.DemoMsg;
import quickfix.fix50sp1.MessageCracker;
import quickfix.fix50sp1.NewOrderSingle;
import quickfix.fix50sp1.component.OrderQtyData;
import java.time.LocalDateTime;
public class QfjInitiatorApplication extends MessageCracker implements Application {
private static final Logger logger = LoggerFactory.getLogger(QfjInitiatorApplication.class);
@Override
public void onCreate(SessionID sessionId) {
logger.info("onCreate is called " + sessionId);
}
@Override
public void onLogon(SessionID sessionId) {
logger.info("onLogon is called");
}
@Override
public void onLogout(SessionID sessionId) {
logger.info("onLogout is called");
}
@Override
public void toAdmin(Message message, SessionID sessionId) {
logger.info("toAdmin is called");
}
@Override
public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
logger.info("fromAdmin is called");
}
@Override
public void toApp(Message message, SessionID sessionId) throws DoNotSend {
logger.info("toApp is called");
}
@Override
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
logger.info("fromApp is called");
crack(message, sessionId);
}
//收到下单消息的回调
@Override
public void onMessage(NewOrderSingle message, SessionID sessionID) throws FieldNotFound {
logger.info("Received NewOrderSingle Message");
}
//订阅下单消息
public static void sendNewOrderRequest(SessionID sessionID) throws SessionNotFound {
NewOrderSingle order = new NewOrderSingle(
new ClOrdID("mockedClOrdID"), new Side(Side.BUY),
new TransactTime(), new OrdType(OrdType.LIMIT));
order.set(new OrderQty(45.00));
order.set(new Symbol("mockedSymbol"));
order.set(new HandlInst('1'));
logger.info("send order " + order.toString());
while(true){
Session.sendToTarget(order, sessionID);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
启动类
package org.demo.myqfjdemo.main;
import org.demo.myqfjdemo.initiator.QfjInitiator;
import org.demo.myqfjdemo.initiator.QfjInitiatorApplication;
import quickfix.SessionID;
import quickfix.SessionNotFound;
public class InitiatorMain {
public static void main(String[] args) {
QfjInitiator.startServer();
try {
Thread.sleep(5000);
// 启动一个Session,记得参考你的quickfix.properties设定
SessionID sessionID1 = new SessionID("FIXT.1.1", "QFJ_INITIATOR1", "QFJ_ACCEPTOR");
QfjInitiatorApplication.sendNewOrderRequest(sessionID1);
} catch (InterruptedException | SessionNotFound e) {
e.printStackTrace();
}
}
}
Acceptor实现
配置文件
[default]
FileStorePath=data/acceptor
SocketConnectHost=127.0.0.1
SocketAcceptPort=9999
ConnectionType=acceptor
SenderCompID=QFJ_ACCEPTOR
StartTime=00:00:00
EndTime=23:00:00
HeartBtInt=30
FileLogPath=logs-acceptor
AppDataDictionary=D:\Code\myqfjdemo\qfjacceptor\src\main\resources\FIX50SP1.modified.xml
UseDataDictionary=Y
ResetOnLogon=Y
ResetSeqNumFlag=Y
EnableNextExpectedMsgSeqNum=Y
BeginString=FIXT.1.1
DefaultApplVerID=FIX.5.0SP1
[session]
TargetCompID=QFJ_INITIATOR1
工具类
package org.demo.myqfjdemo.acceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
public class QfjAcceptor {
private static final Logger logger = LoggerFactory.getLogger(QfjAcceptor.class);
private static final String SESSION_SETTINGS_PATH = "/quickfix.properties";
private static ThreadedSocketAcceptor acceptor;
private static SessionSettings sessionSettings;
private static QfjAcceptorApplication application;
private QfjAcceptor() {}
public static void startServer() {
try {
sessionSettings = new SessionSettings(QfjAcceptor.class.getResourceAsStream(SESSION_SETTINGS_PATH));
} catch (ConfigError configError) {
logger.error("SessionSettings config error " + configError);
}
application = new QfjAcceptorApplication();
MessageStoreFactory msgStoreFactory = new FileStoreFactory(sessionSettings);
LogFactory logFactory = new FileLogFactory(sessionSettings);
MessageFactory msgFactory = new DefaultMessageFactory();
try {
acceptor = new ThreadedSocketAcceptor(application,
msgStoreFactory,
sessionSettings,
logFactory,
msgFactory);
acceptor.start();
} catch (ConfigError configError) {
configError.printStackTrace();
}
}
public static int getQueueSize() {
return acceptor.getQueueSize();
}
public static void stopServer() {
acceptor.stop();
}
}
实现application
package org.demo.myqfjdemo.acceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.field.*;
import quickfix.fix50sp1.DemoMsg;
import quickfix.fix50sp1.NewOrderSingle;
public class QfjAcceptorApplication extends MessageCracker implements Application {
private static final Logger logger = LoggerFactory.getLogger(QfjAcceptorApplication.class);
public void onCreate(SessionID sessionId) {
logger.info("onCreate is called " + sessionId);
}
public void onLogon(SessionID sessionId) {
logger.info("onLogon is called");
}
public void onLogout(SessionID sessionId) {
logger.info("onLogout is called");
}
public void toAdmin(Message message, SessionID sessionId) {
logger.info("toAdmin is called");
}
public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
logger.info("fromAdmin is called");
}
public void toApp(Message message, SessionID sessionId) throws DoNotSend {
logger.info("toApp is called");
}
public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
logger.info("fromApp is called");
//调用相应的onMessage发送消息给initiator
crack(message, sessionId);
}
public void onMessage(NewOrderSingle message, SessionID sessionID) throws FieldNotFound {
// 返还一个订单回复
NewOrderSingle order = new NewOrderSingle(
new ClOrdID("mockedClOrdID"), new Side(Side.BUY),
new TransactTime(), new OrdType(OrdType.LIMIT));
order.set(new OrderQty(45.00));
order.set(new Symbol("mockedSymbol"));
order.set(new HandlInst('1'));
try {
Session.sendToTarget(order, sessionID);
logger.info("acceptor send msessage " + order);
} catch (SessionNotFound sessionNotFound) {
sessionNotFound.printStackTrace();
}
}
}
启动类
package org.demo.myqfjdemo.main;
import org.demo.myqfjdemo.acceptor.QfjAcceptor;
import quickfix.SessionID;
public class AcceptorMain {
public static void main(String[] args) {
QfjAcceptor.startServer();
while(true) {
//等待消息
}
}
}