当前位置: 首页 > 知识库问答 >
问题:

以很小的内存占用执行数百万个可运行的

陆翰学
2023-03-14

我有N个愿望是ID。对于每一个ID,我都需要执行一个Runnable(即,我不关心返回值),并等待它们全部完成。每个Runnable的运行时间从几秒到几分钟不等,并行运行大约100个线程是安全的。

在我们当前的解决方案中,我们使用Executors.NewFixedThreadPool(),对每个ID调用submit(),然后对每个返回的Future调用get()。

代码工作得很好,而且非常简单,因为我不必处理线程、复杂的等待逻辑等。它有一个缺点:内存占用。

所有仍然排队的Runnable都消耗内存(比long所需的多8个字节:这些是我的Java类,有一些内部状态),所有的N个未来实例也消耗内存(这些也是有状态的Java类,我只用于等待,但不需要实际结果)。我查看了一个堆转储,估计N=1000万占用的内存略高于1GiB。一个数组中的1000万个长操作只会消耗76个MIB。

有没有一种方法只在内存中保存ID,最好不诉诸低级并发编程来解决这个问题?

共有1个答案

陆俭
2023-03-14

这是我通常使用生产者/消费者模式和协调两者的BlockingQueue所做的事情,或者使用Akka Actor,如果我手头有项目的话。

但是我想我应该建议一些安静的,有点不同的东西,依赖于Java的流行为。

直觉是,流的惰性执行将被用来抑制工作单元、未来及其结果的创建。

public static void main(String[] args) {
    // So we have a list of ids, I stream it
    // (note : if we have an iterator, you could group it by a batch of, say 100,
    // and then flat map each batch)
    LongStream ids = LongStream.range(0, 10_000_000L);
    // This is were the actual tasks will be dispatched
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // For each id to compute, create a runnable, which I call "WorkUnit"
    Optional<Exception> error = ids.mapToObj(WorkUnit::new)
             // create a parralel stream
             // this allows the stream engine to launch the next instructions concurrently
            .parallel()
            // We dispatch ("parallely") the work units to a thread and have them execute
            .map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
            // And then we wait for the unit of work to complete
            .map(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    // we do care about exceptions
                    return e;
                } finally {
                    System.out.println("Done with a work unit ");
                }
                // we do not care for the result
                return null;
            })
            // Keep exceptions on the stream
            .filter(Objects::nonNull)
            // Stop as soon as one is found
            .findFirst();


    executor.shutdown();
    System.out.println(error.isPresent());
}

老实说,我不是很确定这种行为是由规范保证的,但从我的经验来看,它是有效的。每个并行“chunck”抓取几个ID,将其馈送到管道(映射到工作单元、分派到线程池、等待结果、过滤异常),这意味着活动工作单元的数量与执行器的数量相当快地达到了平衡。

如果要对并行“chunck”的数量进行微调,那么应该在下面进行跟踪:Java 8并行流中的自定义线程池(Custom thread pool in Java 8 parallel stream

 类似资料:
  • 我在表里添加了500W的测试数据,表中数据如下 一次性读取 500w 数据到 JVM 内存中 必然会造成OOM现象,所以我分别试验了2个读取百万数据的方式,并用Junit分析内存占用 分页多次查询,并进行深度分页优化 Mybatis的流式查询 我从网上看了许多博客,说流式查询可以很好避免OOM问题。 但是为什么在分析堆内存占用中,反而是 多次分页查询的内存占用更小,平均只有400MB 而流式查询却

  • 开启十个线程,每个线程都会去查询500W的数据。 单独一个线程,堆内存占用500M。 十个线程,堆内存占用最高也不过1400MB,为什么会这样呢?这些内存占用居然不会叠加的吗?

  • 问题内容: 如何衡量我在Golang中通过程序包运行的可执行文件使用的内存量?最好通过操作系统本身来做到这一点吗? 问题答案: 您需要通过操作系统本身来执行此操作。如果您在plan9或posix上,则Go将以所返回的结构从操作系统返回使用值。 注意:不同的平台可能以字节或千字节为单位返回此值。检查详细信息。

  • 我们有数百万个活动要运行,或者说创建了数百万个工作流实例。我们可以创建多个Worker实例或使用多个线程运行Worker吗? 基本上,我想知道,我们是否有数百万个活动要执行,或者是否有数百万个工作流实例被创建。我们如何自动缩放。

  • 我正在尝试并行运行两个exe程序。我想启动它们,等待它们完成后再继续。以下是一个例子: 第一个进程睡眠5秒,第二个进程睡眠10秒。我希望看到进程启动的两条消息和“等待任务”消息。10秒后,请参见“过程完成”。但是,我立即看到“Procs complete”。 任务管理器显示正在运行的进程,同时显示“进程完成”。 那么,我如何从一个程序中启动两个独立的可执行程序,并行运行它们,然后等待它们完成,然后

  • 我在Eclipse中有一个maven项目。在src/main/resources下,我有一个名为“directoryToCopy”的目录,其中包含文件。一旦我运行了我的项目,我想将“directoryToCopy”复制到桌面上的本地目录“localDirec”下。 我用了这个: 这在本地工作正常,但是当我想将其作为可执行jar文件运行时,我会得到NullPointerException。 请问有什