当前位置: 首页 > 知识库问答 >
问题:

是否有可能获得高吞吐量(4-5 TPS)的IO密集型服务在Java与可完成的未来或NodeJs是更好的选择

宗晟
2023-03-14

我们要求开发一种能够处理大容量(约5TPS)的服务。我们必须做3-4个并行的下游Rest呼叫(下面提到的是有2个Rest呼叫的POC)-

我试图使用Completable future to do async rest调用和join方法来实现它,以聚合如下响应:{ResponseWrapper response=null;

    CompletableFuture<Student> s = CompletableFuture.supplyAsync(() -> {
                return studentService.getStudentDetails(requestContext, id, name, correlationId);
            },pool

    ).exceptionally(ex -> {
                log.error("Something went wrong in devicedm: ", ex);
                Student wrapper = new Student();
                wrapper.setStatus("500");
                Errors err = new Errors();
                err.setDescription(ex.getCause().toString());
                wrapper.setError(err);
                return wrapper;
            }

    );

    CompletableFuture<Employee> s1 = CompletableFuture.supplyAsync(() -> {
                return employeeService.getEmployeeDetails(requestContext, id, name, correlationId);
            },pool

    ).exceptionally(ex -> {
                log.error("Something went wrong in Employee: ", ex);
                Employee wrapper = new Employee();
                wrapper.setStatus(500);
                Errors err = new Errors();
                err.setDescription(ex.getCause().toString());
                wrapper.setError(err);
                return wrapper;
            }

    );

    CompletableFuture<ResponseWrapper> combinedDataCompletionStage = CompletableFuture.allOf(s, s1)
            .thenApply(ignoredVoid -> combine(s.join(), s1.join()));

    try {
        response = combinedDataCompletionStage.get();
    } catch (InterruptedException | ExecutionException e) {
        Errors error = new Errors();
        error.setDescription(e.getCause().toString());
        response.setError(error);
    }

    private ResponseWrapper combine(Student s, Employee s1) {
    ResponseWrapper response = new ResponseWrapper();

    response.setStudentInfo(s);
    response.setEmployeeInfo(s1);
    return response;
}

}

所以,我们需要帮助理解,当我们使用completable future并可用于处理其他请求时,rest调用完成后,两个线程s和s1是否会被释放?

join方法的行为是什么,从java文档来看,它似乎会等待s和s1返回响应,然后执行join操作。这是拦截电话吗?

一旦我们聚合了响应,我们必须得到调用,然后返回给我们如下所示的最终响应:

响应=组合数据完成阶段。get();

我曾尝试使用Jmeter进行性能测试,但未能得出结论,我想知道这种方法是否能够实现所需的吞吐量,或者需要对上述代码进行任何html" target="_blank">修改,或者考虑到这么多IO调用,java是否存在限制?如果是,那么在NodeJs中是否可行?

任何建议都将不胜感激!

共有1个答案

有骏奇
2023-03-14

因此,您希望通过低水平的并行性来最大化吞吐量。这样做的最好方法是使用线程,而不是CompletableFuture。当然,Node.js不是一个更好的变体。

让主线程(处理请求)启动并行线程,进行下游的其余调用。让线程将结果保存在特殊对象中,当所有其余调用完成时,该对象通知主线程。然后主线程进行处理,并将响应发送回消费者。

异步I/O的性能较低,只有当线程数量太多,无法装入可用的核心内存时,才必须使用异步I/O。

 类似资料:
  • null 基本上,具有内存缓存和可以响应命令的服务器的机器的普通香草实现和Redis盒子之间有什么区别?我也明白答案需要非常庞大,并且应该包括非常复杂的细节来完成。但是,我要找的是一些通用的技术,而不是所有的细微差别。

  • 如果一个字段在可完成的未来代码中为空,我必须发送一个异常: 这个想法是,如果孩子的字段为空(在本例中为lastName),我必须抛出一个自定义异常,我不太确定如何实现这一点。 我的想法是使用thenAccept方法发送异常,如下所示: 我必须评估数据库中的lastName是否为空,我必须抛出一个异常。 有什么想法吗?

  • 我得到了以下结果,吞吐量没有变化,即使我增加了线程数。 场景#1: 线程数:10 加速期:60 吞吐量:5.8/s 平均值:4025 场景#2: 线程数:20 加速期:60 吞吐量:7.8/s 平均值:5098 场景#3: 线程数:40 加速期:60 吞吐量:6.8/s 平均: 4098 我的JMeter文件包含一个单一的ThreadGroup,其中包含一个GET。 当我执行对响应时间更快(小于3

  • 更新:这个问题现在无效,因为我认为发生的事件并不像我认为的那样发生(详见下文)。我把这个问题保留原样,因为答案和评论可能对其他人有用。 我通过Azure门户创建了一个集合,最初配置为: 存储容量:无限 初始吞吐能力(Ru/s):2500 分区键: 更新:我想我已经搞清楚发生了什么。我手动创建了一个分区集合,然后忘记了我的代码(我正在使用的导入器/迁移工具)在启动时删除数据库并重新创建数据库和集合。

  • 我正在开发一个具有以下特性的实时应用程序: 数百个客户端将同时插入行/文档,每个客户端每隔几秒钟插入一行。 大部分仅追加;几乎所有的行/文档,一旦插入,永远不会改变。 只有当数据被刷新到磁盘时,客户端才会看到成功,此后读写一致性应该保持不变。 客户端愿意等待几秒钟的确认时间足够多的磁盘查找和写入发生。 RAM中的数据太多(排除像Redis这样的选项)。但是写很久以前的行很少被访问,所以在内存中没有

  • 本文向大家介绍Kafka 是如何实现高吞吐率的?相关面试题,主要包含被问及Kafka 是如何实现高吞吐率的?时的应答技巧和注意事项,需要的朋友参考一下 Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失。kafka主要使用了以下几个方式实现了超高的吞吐率: 顺序读写; 零拷贝