当前位置: 首页 > 工具软件 > github-spray > 使用案例 >

反应灵敏且性能卓越的Spray + Akka解决方案,以“在Java和Node.js中发挥并发性和性能”...

周锐
2023-12-01

在我以前的文章中,我研究了一个虚拟的交易引擎,并将基于Java的阻止解决方案与基于Node.js的非阻止解决方案进行了比较。 在文章的结尾,我写道:

我怀疑在Node.js近期取得成功之后,越来越多的异步Java库将开始出现。

这样的库已经存在,例如: AkkaSpray和此Mysql异步驱动程序


我给自己设定了一个挑战,即要确切地使用这些库来创建基于Java的非阻塞解决方案,以便将其性能与上一篇文章中创建的Node.js解决方案进行比较。 您可能注意到的第一件事是这些都是基于Scala的库,但是我用Java编写了该解决方案,尽管它在语法上不太优雅。 在上一篇文章中,我介绍了一种基于Akka的解决方案,其中交易引擎封装在actor中。 在这里,我放弃了Tomcat作为HTTP服务器,并用Spray代替了它,后者将HTTP服务器直接集成到Akka中。 从理论上讲,这应该不会对性能造成任何影响,因为Spray是NIO,就像Tomcat 8一样。 但是吸引我到此解决方案的是,总体而言,线程的数量大大减少了,因为Spray,Akka和异步Mysql库都使用相同的执行上下文 。 Tomcat在我的Windows开发计算机上运行,​​有30多个线程,而此处构建的解决方案只有10个以上,或者与Websphere或JBoss相比,有数百个线程。 执行上下文基本上是一个线程池,这些线程运行分配给它的任务。 由于此处介绍的解决方案中使用的所有库都是非阻塞的,因此线程数可以保持较低并接近理论最佳值,从而尽可能少地进行上下文切换 ,从而使过程高效运行。

本文编写的代码在GitHub上 。 该程序的第一部分是启动Spray和Akka的main方法:

public static final ActorSystem system = ActorSystem.create("system");

public static void main(String[] args) {
    ...
    ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); 
    
    InetSocketAddress endpoint = new InetSocketAddress(3000);
    int backlog = 100;
    List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
    Option<ServerSettings> settings = scala.Option.empty();
    ServerSSLEngineProvider sslEngineProvider = null;
    Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
    IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
    
    system.scheduler().schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), ()->{
        System.out.println(new Date() + " - numSales=" + numSales.get());
    }, system.dispatcher());
}

第1行创建了一个公共的actor系统,因此我可以从其他地方访问它,因为它用于访问我想在整个程序中使用的单个执行上下文。 (在存在可维护性问题的代码中,我会写一些东西以便将该对象注入程序的相关部分。)然后,第5行使用该系统实例化一个actor,该actor用于处理所有HTTP买卖请求。命令。 第7-11行仅设置了服务器的配置数据。 第12和13行是我们进行配置和actor的地方,并告诉Akka IO使用它们和HTTP模块将所有HTTP请求作为消息从第5行发送给我们的actor。15-17行是我有效地设置计时器任务的地方每5秒触发一次以输出一些统计信息。 这里的重要部分是要注意,我没有使用Java的Timer来调度任务,因为这只会给进程添加更多不必要的线程。 相反,我使用与Akka相同的执行上下文,因此创建了尽可能少的线程。

接下来是处理HTTP请求的参与者:

private static class HttpActor extends AbstractActor {

    private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();

    public HttpActor() {
        final Router router = partitionAndCreateRouter();
        
        receive(ReceiveBuilder
            .match(HttpRequest.class, r -> {
                int id = Constants.ID.getAndIncrement();
                String path = String.valueOf(r.uri().path());
                if("/sell".equals(path)){
                    String productId = r.uri().query().get("productId").get();
                    ...
                    SalesOrder so = new SalesOrder(price, productId, quantity, id);
                    so.setSeller(new Seller(who));
                    router.route(so, self());
                    replyOK(id);
                }else if("/buy".equals(path)){
                    ...
                }else{
                    handleUnexpected(r);
                }
            }).match(Tcp.Connected.class, r ->{
                sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!
            }).build());
    }

第3行显示了一个示例,该示例显示如何将Scala集成到Java程序中很丑陋,但是有时您如何通过添加自己的抽象来隐藏那些丑陋的部分。 响应HTTP请求的HTTP actor具有3个作业。 第6行上的第一个工作是在其中创建一个路由器,我将在下面对其进行描述,并将其用于委派工作。 第二项工作是处理24-24行上的所有新连接,这告诉Spray这个参与者也将处理实际的请求,而不仅仅是连接。 该参与者具有的第三项工作在第9-18行中显示,该参与者接受HTTP请求并将一些工作委托(路由)到系统中的另一个参与者。

这个参与者知道HTTP模型,但是HTTP抽象不会泄漏到系统的下一层。 相反,参与者将域对象(或值对象案例类或类似对象)传递给封装了交易引擎的参与者。 使用从HTTP请求中提取的数据(例如在第13行),或者使用请求主体中的JSON对象,可以在第15和16行看到此类域对象的构造。 Spray包含有用的指令 ,可以帮助您从请求中提取数据,如果需要的话,可以从HTTP提取一些内容。 构造哪个域对象取决于我构建并在第9、12和19行处理的类似REST的接口。如果我使用了Scala,则可以在HttpRequest对象上使用模式匹配来编写更精美的代码。 通过从第6行获得路由器以将域对象路由到合适的参与者,将域对象传递到交易引擎,在第17行。最后但并非最不重要的是,第18行是在HTTP响应中确认销售订单请求的位置它将JSON对象以及分配给订单的唯一ID传递回消费者,以便以后可以查询其状态(将其持久化到销售对象中)。

下一个代码片段显示了我们如何划分市场并创建多个参与者来并行处理请求。

private Router partitionAndCreateRouter() {
    Map<String, ActorRef> kids = new HashMap<>();
    java.util.List<Routee> routees = new ArrayList<Routee>();
    int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS;
    for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) {
        String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk);
        LOGGER.info("created engine for products " + temparray);
        ActorRef actor = getContext().actorOf(Props.create(EngineActor.class));
        getContext().watch(actor);
        routees.add(new ActorRefRoutee(actor));

        for (int k = 0; k < temparray.length; k++) {
            LOGGER.debug("mapping productId '" + temparray[k] + "' to engine " + i);
            kids.put(temparray[k], actor);
        }
        LOGGER.info("---started trading");
        actor.tell(EngineActor.RUN, ActorRef.noSender());
    }			
    Router router = new Router(new PartitioningRoutingLogic(kids), routees);
    return router;
}

此代码与上一篇文章中的代码相似。 为了横向扩展并同时使用多个核心,按产品ID对市场进行了划分,并且每个交易引擎针对不同的市场划分同时运行。 在此处提供的解决方案中,在每个分区上创建一个EngineActor并将其包装在第10行的Routee中。第14行还填充了一个由产品ID键控的actor映射。在第19行和第19行使用路由和映射构建了路由器。委派工作时, HttpActor在上一片段中使用的就是这个。 还要注意第17行,它启动了包含在EngineActor的交易引擎,以便启动并运行该引擎,准备在将购买和销售订单传递给这些EngineActor进行交易。

这里没有明确显示EngineActor类,因为它与上一篇文章中使用的actor几乎相同,并且仅封装了一个交易引擎,该引擎处理特定市场分区中的所有产品。 上面的第19行使用RoutingLogic构建路由器,如下所示:

public static class PartitioningRoutingLogic implements RoutingLogic {

    private Map<String, ActorRef> kids;

    public PartitioningRoutingLogic(Map<String, ActorRef> kids) {
        this.kids = kids;
    }

    @Override
    public Routee select(Object message, IndexedSeq<Routee> routees) {

        //find which product ID is relevant here
        String productId = null;
        if(message instanceof PurchaseOrder){
            productId = ((PurchaseOrder) message).getProductId();
        }else if(message instanceof SalesOrder){
            productId = ((SalesOrder) message).getProductId();
        }
        ActorRef actorHandlingProduct = kids.get(productId);

        //no go find the routee for the relevant actor
        for(Routee r : JavaConversions.asJavaIterable(routees)){
            ActorRef a = ((ActorRefRoutee) r).ref(); //cast ok, since the are by definition in this program all routees to ActorRefs
            if(a.equals(actorHandlingProduct)){
                return r;
            }
        }
        
        return akka.routing.NoRoutee$.MODULE$; //none found, return NoRoutee
    }
}

路由器在接收到必须路由到正确角色的对象时,会调用第10行的select(...)方法。 使用在上一个清单中创建的地图以及从请求中获得的产品ID,很容易找到包含负责相关市场划分的交易引擎的参与者。 通过返回包裹该参与者的路由,Akka会将订单对象传递给正确的EngineActor ,然后在交易引擎处于交易周期之间且EngineActor下次检查时处理该消息时,将数据放入模型中它的收件箱。

好的,这就是要处理的前端。 上一篇文章的解决方案所需要的第二个主要更改是方法的设计,该方法可以在交易后保持销售。 在基于Java的解决方案中,我同步遍历每笔交易并将insert语句发送到数据库,并且仅在数据库回复后才处理下一次交易。 使用此处提供的解决方案,我选择通过向数据库发出insert请求并立即移至下一个销售并执行相同操作来并行处理销售。 使用我提供的回调在执行上下文中异步处理了响应。 我编写了程序,以等待最后一次插入被确认,然后再继续进行新创建的购买和销售订单的交易,该订单自上次交易时段开始以来就已经到来。 在下面的清单中显示:

private void persistSales(List<Sale> sales, final PersistenceComplete f) {
    if (!sales.isEmpty()) {
        LOGGER.info("preparing to persist sales");

        final AtomicInteger count = new AtomicInteger(sales.size());
        sales.forEach(sale -> {
            List values = Arrays.asList(sale.getBuyer().getName(), 
                                        sale.getSeller().getName(),
                                        sale.getProductId(),
                                        sale.getPrice(),
                                        sale.getQuantity(),
                                        sale.getPurchaseOrder().getId(),
                                        sale.getSalesOrder().getId());
            
            Future<QueryResult> sendQuery = POOL.sendPreparedStatement(SQL, JavaConversions.asScalaBuffer(values));
            sendQuery.onComplete(new JFunction1<Try<QueryResult>, Void>() {
                @Override
                public Void apply(Try<QueryResult> t) {
                    if(t.isSuccess()){
                        QueryResult qr = t.get();
                        //the query result doesnt contain auto generated IDs! library seems immature...
                        //sale.setId(???);
                    }
                    
                    if(count.decrementAndGet() == 0){
                        if(t.isSuccess()){
                            f.apply(null);
                        }else{
                            f.apply(t.failed().get());
                        }
                        
                    }
                    return null; //coz of Void
                }
            }, Main.system.dispatcher());
        });
    }else{
        f.apply(null); //nothing to do, so continue immediately
    }
}

交易引擎在每个交易周期后调用persistSales(...)方法,并向该方法传递在该交易周期内完成的销售清单,并在所有持久性完成后调用一个回调函数。 如果未售出任何东西,则第38行立即调用回调。 否则,在第5行上创建一个计数器,并使用要保留的销售数量进行初始化。 每次销售都在第7-15行异步保存。 请注意,如何在第15行返回一个Future以及如何在第16-35行使用另一个回调来处理future的完成–这里没有阻塞,等待future完成! 上面提到的计数器在第25行递减,一旦销售被持久化,并且所有销售都被持久化,则调用传递给persistSales(...)方法的回调。 请注意,在第16行使用的类JFunction1是一个垫片,可以更轻松地集成JFunction1代码在GitHub上的上面给出的链接上。 第21和22行表明,我使用的异步Mysql库存在一些问题。 它仍然是一个测试版,似乎没有办法掌握销售产生的(自动递增)主键。 还要注意第35行,在这里我传入了Akka使用的执行上下文,以便处理插入语句完成的Future在一个现有线程上进行处理,而不是在某些新线程上进行处理–再次,保持该线程的总数线程越低越好。

该清单还显示了一个有趣的问题,即调用数据库以插入数据的线程不一定是可能需要关闭连接的线程[1]。 在普通的Java EE和Spring中,经常使用线程本地存储(另请参见此处 )。 如果您从处理将来完成的函数中调用Bean,则注入到其中的资源可能不起作用,因为容器无法确定上下文是什么。 Scala使用隐式参数解决了这个问题,这些参数在后台传递给方法。

上面的清单使用PersistenceComplete回调,如下面第14-16行所示。 它还使用使用以下代码创建的连接池。 再一次,Akka使用的执行上下文将传递到下面第10行的异步Mysql库。 下面的第10行还显示了一个非默认的池配置,其中允许的最大队列大小最大为一千。 在负载测试期间,我收到许多错误消息,表明池已饱和,增加该值可以解决问题。

private static final String SQL = "INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";

private static final ConnectionPool<MySQLConnection> POOL;
static {
    Duration connectTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Duration testTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Configuration configuration = new Configuration("root", Main.DB_HOST, 3306, Option.apply("password"), Option.apply("TRADER"), io.netty.util.CharsetUtil.UTF_8, 16777216, PooledByteBufAllocator.DEFAULT, connectTimeout, testTimeout);
    
    MySQLConnectionFactory factory = new MySQLConnectionFactory(configuration);
    POOL = new ConnectionPool<MySQLConnection>(factory, new PoolConfiguration(1000, 4, 1000, 4000), Main.system.dispatcher());
}


private static interface PersistenceComplete {
    void apply(Throwable failure);
}

传递给persistSales(...)的回调在下一个清单中显示。 以下代码与上一篇文章中显示的原始代码几乎没有什么不同,不同之处在于以下代码现在是异步的。 一旦所有销售都持续存在,就会调用该回调,然后回调才会在下面的第14行上(通过其事件侦听器)向参与者发送一条消息。 在加载大量新的购买和销售订单后,该消息通常位于收件箱的后面。 这些消息中的每一个都会被处理,从而导致在重新开始交易之前,使用新订单更新交易引擎模型。

persistSales(sales, t -> {
    if(t != null){
        LOGGER.error("failed to persist sales: " + sales, t);
    }else{
        LOGGER.info("persisting completed, notifying involved parties...");
        sales.stream().forEach(sale -> {
            if (sale.getBuyer().listener != null)
                sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);
            if (sale.getSeller().listener != null)
                sale.getSeller().listener.onEvent(EventType.SALE, sale);
        });
        ...
    }
    listener.onEvent(EventType.STOPPED, null);
});

最终的代码清单是对Node.js解决方案的修改,该修改使它也可以并行地保持销售,而不是像上一篇文章中那样一个接一个地销售。

function persistSales(sales, callback){
    if(sales.length === 0 || process.env.skipPersistence) {
        callback(); //nothing to do, so continue immediately
    }else{
        resources.dbConnection(function(err, connection) {
            if(err) callback(err); else {
                logger.info('preparing to persist ' + sales.length + ' sales');
                var count = sales.length;
                _.each(sales, function(sale){ //save them in parallel
                    connection.query(
                            'INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) values (?, ?, ?, ?, ?, ?, ?)',
                            [sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],
                            function(err, rows, fields) {
                                if(err) callback(err); else {
                                    sale.id = rows.insertId;
                                    count--;
                                    if(count == 0){
                                        logger.info('persisted all sales');
                                        connection.release();
                                        callback();
                                    }
                                }
                            }
                    );
                });
            }
        });
    }
}

第5行从池中获取一个连接,并且相同的连接“并行”用于所有销售,并且在最后一次销售持续后,仅在第19行中释放,即返回到池中。

因此,再次通过一些负载测试比较解决方案的时间到了。 这次,我选择查看以下三个解决方案中的每一个可以达到的最大销售率:

  • 情况1 –此处介绍的解决方案,即Spray + Akka +异步Mysql驱动程序,
  • 情况2 –修改后的Node.js解决方案使用并行持久性,
  • 情况3 –原始的Tomcat非阻塞连接器,但具有同步持久性。

这些案例是使用上一篇文章中的硬件运行的,交易引擎运行在快速硬件上,而数据库运行在慢速硬件上,因为这是显示阻塞I / O如何导致性能问题的最佳设置。 对于每种情况,我可以在调整时调整三个变量。 这些曾经是:

  • 交易引擎(作为参与者或作为子流程)的数量,
  • 客户端调用服务器之间等待的时间,
  • 并发客户端数。

后两个基本上调整了每秒的请求数量,因为连接没有保持打开状态以等待交易结果(请参阅上一篇文章)。 结果如下,最佳性能以粗体显示。

情况1 – Spray + Akka +异步Mysql驱动程序
#交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU
8 100毫秒 60 42,810 25-35%
8 80毫秒 70 62,392 25-35%
8 60毫秒 80 75,600 30-40%
8 40毫秒 90 59,217 30-50%
10 60毫秒 80 太多的数据库连接问题
5 60毫秒 60 67,398 25-35%
6 60毫秒 80 79,536 25-35%

案例2 –具有并行持久性的Node.js
#交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU
8 200毫秒 30 6,684 40-50%
8 100毫秒 60 开始落后
8 100毫秒 40 17,058 25-35%
8 100毫秒 50 开始落后
12 100毫秒 50 20,808 45-60%
16 100毫秒 60 24,960 45-65%
20 100毫秒 80 32,718 45-70%
25 60毫秒 80 51,234 75-85%
30 50毫秒 80 22,026 75-85%
25 10毫秒 70 17,604 75-90%

情况3 – Tomcat 8 NIO,具有同步阻止持久性
#交易引擎 两次通话之间的客户等待时间 并发客户 每分钟销量 大约 交易硬件上的CPU
4 200毫秒 30 9,586 5%
4 150毫秒 30 10,221 5%
8 200毫秒 30 9,510 5%

结果表明,将NIO连接器用螺栓固定在Tomcat上并认为您没有阻塞并且性能很危险,因为与Akka解决方案相比,该解决方案的表现差了近8倍。 结果还表明,通过使用非阻塞库并用Java编写非阻塞解决方案,与Node.js相比,可以创建性能卓越的解决方案。 Java解决方案不仅具有大约50%的吞吐量,而且使用的CPU不到一半。

非常重要:请注意,这是特定于此处使用的算法以及我的体系结构,设计和实现的结果。 它还依赖于使用“非标准” Java库,实际上,我使用的Mysql库缺少功能,例如,从insert结果中读取生成的主键。 在得出Java,Scala和Node.js的相对性能结论之前,请针对您的用例做自己的实验!

比较交易引擎数量变化时的一个值得注意的点:在Node.js中,它直接控制子进程的数量,类似于线程数; 在Akka解决方案中,它对系统中的线程数量没有任何影响–该数量保持不变! 在Akka解决方案中,更改参与者的数量会影响其收件箱中消息的数量。

有关此视频的详细信息,请参见有关使用Akka和Spray的更多信息。 请花时间也快速阅读有关反应式宣言 。 此处介绍的Akka解决方案是反应性的,因为它具有响应能力(这三种情况中的吞吐量都最高),有弹性(Akka提供了处理故障的简便方法,尽管这里没有必要),有弹性(它是自动扩展的,因为Akka管理线程池)它在执行上下文中的大小,并且由于Akka提供了actor的透明位置而扩大了规模,并且它是消息驱动的(由于使用actor模型)。

[1]这里使用的Mysql库不需要关闭连接并返回到池,例如Apache数据库池 。 这样做实际上会引起问题! 我进行的负载测试证明,将其保持打开状态不会造成任何问题。

翻译自: https://www.javacodegeeks.com/2015/01/a-reactive-and-performant-spray-akka-solution-to-playing-with-concurrency-and-performance-in-java-and-node-js.html

 类似资料: