我正在尝试创建一个启动QuickFIX/J接受器服务器(TCP FIX服务器)的版本。启动时,接受器线程在单独的线程上运行,Vert. x不知道这一点(不会阻止事件循环)。然而,我可以从接受器线程访问事件总线,并将消息传递给其他顶点。
问题是,这是一个好的做法吗?
package com.millenniumit.fixgateway.service.impl.quickfix;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.fix42.NewOrderSingle;
public class FIXServerVerticle extends AbstractVerticle {
private DynamicSessionProviderConfigHelper dynamicSessionProviderConfig;
private ThreadedSocketAcceptor threadedSocketAcceptor;
private static final Logger LOGGER = LoggerFactory.getLogger(FIXServerVerticle.class);
/**
* You can’t block waiting for the tcp server to bind in the start method as that would break the Golden Rule.
* To prevent this, implement the asynchronous start method. This version of the method takes a Future as a parameter.
* When the method returns the verticle will not be considered deployed.
* @param startPromise
*/
@Override
public void start(Promise<Void> startPromise) {
Application serverApplication = new Application() {
@Override
public void onCreate(SessionID sessionID) {
LOGGER.info("Session Created : " + sessionID);
}
@Override
public void onLogon(SessionID sessionID) {
}
@Override
public void onLogout(SessionID sessionID) {
}
@Override
public void toAdmin(Message message, SessionID sessionID) {
}
@Override
public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
}
@Override
public void toApp(Message message, SessionID sessionID) throws DoNotSend {
}
@Override
public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
LOGGER.info("Processing in worker thread: " + message);
//Offload processing logic from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
LOGGER.info("Processing in worker thread: " + message);
//processing logic
future.complete(message);
}, res -> {
//non blocking code running on the event loop thread
getVertx().eventBus().request("in.message", res.result(), ar -> {
if (ar.succeeded()) {
Session.lookupSession(sessionID).send((Message) ar.result().body());
}
});
});
}
};
//Offload acceptor initialization from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
MessageStoreFactory messageStoreFactory = new NoopStoreFactory();
MessageFactory messageFactory = new DefaultMessageFactory();
dynamicSessionProviderConfig = new DynamicSessionProviderConfigHelper();
try {
SessionSettings sessionSettings = new SessionSettings("acceptor-config");
threadedSocketAcceptor = new ThreadedSocketAcceptor(serverApplication, messageStoreFactory, sessionSettings, messageFactory);
dynamicSessionProviderConfig.configure(threadedSocketAcceptor, serverApplication, messageStoreFactory, sessionSettings, messageFactory);
threadedSocketAcceptor.start();
future.complete();
} catch (ConfigError | FieldConvertError configError) {
configError.printStackTrace();
future.fail(configError.getMessage());
}
}, res -> {
//non blocking code running on the event loop thread
if(res.succeeded()){
startPromise.complete();
}else{
startPromise.fail(res.cause().getMessage());
}
});
}
public void stop(Promise<Void> stopPromise) {
//Offload acceptor stop method from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
threadedSocketAcceptor.stop();
future.complete();
}, res -> {
//non blocking code running on the event loop thread
if(res.succeeded()){
stopPromise.complete();
}else{
stopPromise.fail(res.cause().getMessage());
}
});
}
}
这种方法在我看来很好,我认为与vert. x的工作方式没有冲突。
顺便说一句,你可以稍微简化你的代码。
这个:
getVertx().executeBlocking(
future -> {},
res -> {
if(res.succeeded()){
startPromise.complete();
}else{
startPromise.fail(res.cause().getMessage());
}
}
)
可替换为:
getVertx().executeBlocking(
future -> {},
startPromise
)
我对vert不熟悉。x、 但是,一般来说,如果需要更高的吞吐量,QuickFIX/J应用程序通常会将传入消息卸载到单独的线程/队列。
QuickFIX/J 是一个实现了金融信息交换协议的Java引擎。 金融信息交换协议(FIX,Financial Information exchange)协议是适用于实时证券、金融电子交易开发的数据通信标准。它是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的 进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协议。FIX 协议的目标是把各类证券金
我不太明白vert. x是如何应用于网络服务器的。 我所知道的webserver的概念是基于线程的。 启动Web服务器,然后该服务器正在运行 然后,对于每个连接的客户端,您都会得到一个套接字,然后将其传递给自己的线程处理程序 然后线程处理程序处理该特定套接字的任务 因此,可以清楚地定义哪个线程为哪个套接字执行工作。然而,对于每个套接字,您都需要一个新线程,从长远来看,对于许多套接字来说,这是非常昂
问题内容: 我正在编写一个使用QuickFIX / J作为FIX框架的应用程序。我的对方向我发送了FIX版本4.4 的消息,但只有某些字段(组件)的版本为5.0 现在,我正在尝试实现可以阅读此组件的功能。 这个 不起作用!消息来自版本4.4,这就是为什么只需要a 而不是版本5.0的原因 如果我尝试这个 我收到以下错误: 这是因为我得到4.4场,但要像对待4.4版本。看这里 也许有人可以帮我…
我对Vert.x非常陌生,就像几天前一样。我来自一个JAX式的,安逸的世界。我可能大错特错,请指正。 我的问题是:如何使顶点公开自己的REST接口(子路由器),以及如何将其子路由器注册到应用程序的主路由器中? 我尝试过类似的东西,但是当我请求/产品/所有:( } }
我正在使用图表.js库来创建折线图,由于需要大量标签,我使用Chartjs折线图上的限制标签数量问题中的方法跳过了一些标签,另见下文。 这是我用来创建这个图表的代码: 我想注意几件事: > < li >如果我使用实际标签(例如< code>[0,1,2,3,4,...])黑线不会出现。这让我相信我使用空字符串(< code>'')时出了问题。 < li> 当我将标签更改为:< code>[0,1,
QuickFIX 是一个 C++ 实现的金融信息交换协议FIX的引擎 金融信息交换协议(FIX,Financial Information exchange)协议是适用于实时证券、金融电子交易开发的数据通信标准。它是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的 进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协议。FIX 协议的目标是把各类证券