当前位置: 首页 > 知识库问答 >
问题:

带垂直的QuickFIX/J服务器。十、

欧阳睿范
2023-03-14

我正在尝试创建一个启动QuickFIX/J接受器服务器(TCP FIX服务器)的版本。启动时,接受器线程在单独的线程上运行,Vert. x不知道这一点(不会阻止事件循环)。然而,我可以从接受器线程访问事件总线,并将消息传递给其他顶点。

问题是,这是一个好的做法吗?

package com.millenniumit.fixgateway.service.impl.quickfix;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.fix42.NewOrderSingle;

public class FIXServerVerticle extends AbstractVerticle {

    private DynamicSessionProviderConfigHelper dynamicSessionProviderConfig;
    private ThreadedSocketAcceptor threadedSocketAcceptor;
    private static final Logger LOGGER = LoggerFactory.getLogger(FIXServerVerticle.class);

    /**
     * You can’t block waiting for the tcp server to bind in the start method as that would break the Golden Rule.
     * To prevent this, implement the asynchronous start method. This version of the method takes a Future as a parameter.
     * When the method returns the verticle will not be considered deployed.
     * @param startPromise
     */
    @Override
    public void start(Promise<Void> startPromise) {
        Application serverApplication = new Application() {
            @Override
            public void onCreate(SessionID sessionID) {
                LOGGER.info("Session Created : " + sessionID);
            }

            @Override
            public void onLogon(SessionID sessionID) {

            }

            @Override
            public void onLogout(SessionID sessionID) {

            }

            @Override
            public void toAdmin(Message message, SessionID sessionID) {

            }

            @Override
            public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

            }

            @Override
            public void toApp(Message message, SessionID sessionID) throws DoNotSend {

            }

            @Override
            public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
                LOGGER.info("Processing in worker thread: " + message);
                //Offload processing logic from event loop
                getVertx().executeBlocking(future -> {
                    //blocking code, run on the worker thread
                    LOGGER.info("Processing in worker thread: " + message);
                    //processing logic
                    future.complete(message);
                }, res -> {
                    //non blocking code running on the event loop thread
                    getVertx().eventBus().request("in.message", res.result(), ar -> {
                        if (ar.succeeded()) {
                            Session.lookupSession(sessionID).send((Message) ar.result().body());
                        }
                    });
                });
            }
        };
        //Offload acceptor initialization from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            MessageStoreFactory messageStoreFactory = new NoopStoreFactory();
            MessageFactory messageFactory = new DefaultMessageFactory();
            dynamicSessionProviderConfig = new DynamicSessionProviderConfigHelper();
            try {
                SessionSettings sessionSettings = new SessionSettings("acceptor-config");
                threadedSocketAcceptor = new ThreadedSocketAcceptor(serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                dynamicSessionProviderConfig.configure(threadedSocketAcceptor, serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                threadedSocketAcceptor.start();
                future.complete();
            } catch (ConfigError | FieldConvertError configError) {
                configError.printStackTrace();
                future.fail(configError.getMessage());
            }
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                startPromise.complete();
            }else{
                startPromise.fail(res.cause().getMessage());
            }
        });
    }

    public void stop(Promise<Void> stopPromise) {
        //Offload acceptor stop method from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            threadedSocketAcceptor.stop();
            future.complete();
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                stopPromise.complete();
            }else{
                stopPromise.fail(res.cause().getMessage());
            }
        });
    }

}

共有2个答案

贺浩壤
2023-03-14

这种方法在我看来很好,我认为与vert. x的工作方式没有冲突。

顺便说一句,你可以稍微简化你的代码。

这个:

getVertx().executeBlocking(
  future -> {},
  res -> {
            if(res.succeeded()){
                startPromise.complete();
            }else{
                startPromise.fail(res.cause().getMessage());
            }
        }
)

可替换为:

getVertx().executeBlocking(
  future -> {},
  startPromise
)
东方涛
2023-03-14

我对vert不熟悉。x、 但是,一般来说,如果需要更高的吞吐量,QuickFIX/J应用程序通常会将传入消息卸载到单独的线程/队列。

 类似资料:
  • QuickFIX/J 是一个实现了金融信息交换协议的Java引擎。 金融信息交换协议(FIX,Financial Information exchange)协议是适用于实时证券、金融电子交易开发的数据通信标准。它是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的 进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协议。FIX 协议的目标是把各类证券金

  • 我不太明白vert. x是如何应用于网络服务器的。 我所知道的webserver的概念是基于线程的。 启动Web服务器,然后该服务器正在运行 然后,对于每个连接的客户端,您都会得到一个套接字,然后将其传递给自己的线程处理程序 然后线程处理程序处理该特定套接字的任务 因此,可以清楚地定义哪个线程为哪个套接字执行工作。然而,对于每个套接字,您都需要一个新线程,从长远来看,对于许多套接字来说,这是非常昂

  • 问题内容: 我正在编写一个使用QuickFIX / J作为FIX框架的应用程序。我的对方向我发送了FIX版本4.4 的消息,但只有某些字段(组件)的版本为5.0 现在,我正在尝试实现可以​​阅读此组件的功能。 这个 不起作用!消息来自版本4.4,这就是为什么只需要a 而不是版本5.0的原因 如果我尝试这个 我收到以下错误: 这是因为我得到4.4场,但要像对待4.4版本。看这里 也许有人可以帮我…

  • 我对Vert.x非常陌生,就像几天前一样。我来自一个JAX式的,安逸的世界。我可能大错特错,请指正。 我的问题是:如何使顶点公开自己的REST接口(子路由器),以及如何将其子路由器注册到应用程序的主路由器中? 我尝试过类似的东西,但是当我请求/产品/所有:( } }

  • 我正在使用图表.js库来创建折线图,由于需要大量标签,我使用Chartjs折线图上的限制标签数量问题中的方法跳过了一些标签,另见下文。 这是我用来创建这个图表的代码: 我想注意几件事: > < li >如果我使用实际标签(例如< code>[0,1,2,3,4,...])黑线不会出现。这让我相信我使用空字符串(< code>'')时出了问题。 < li> 当我将标签更改为:< code>[0,1,

  • QuickFIX 是一个 C++ 实现的金融信息交换协议FIX的引擎 金融信息交换协议(FIX,Financial Information exchange)协议是适用于实时证券、金融电子交易开发的数据通信标准。它是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的 进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协议。FIX 协议的目标是把各类证券