当前位置: 首页 > 知识库问答 >
问题:

ExecutorService线程安全

宰父智敏
2023-03-14

假设我有一个Executors静态工厂方法的ExecutorService实例。

如果我从某个线程提交了一个调用,其中RetVal不是线程安全的本地实例化对象,那么当我从同一个线程获得()它时,我需要担心retvals的完整性吗?人们说局部变量是线程安全的,但我不确定当您返回一个本地实例化的对象并从其他线程接收它时,它是否适用。

ExecutorService executor = Executors.newFixedThreadPool(5);
Future<List<String>> fut = executor.submit(() -> {
    List<String> ret = new ArrayList<>();
    ret.add("aasdf");
    ret.add("dfls");
    return ret;
});

List<String> myList = fut.get();

下面是我的定制实现,我只是为了测试。您可以忽略EType枚举。

class MyExecutor {

    enum EType {
        NoHolder, Holder1, Holder2
    }

    private ConcurrentLinkedQueue<MyFutureTask<?>> tasksQ;
    private final Thread thread;

    private final EType eType;

    public MyExecutor(EType eType) {
        eType = Objects.requireNonNull(eType);

        tasksQ = new ConcurrentLinkedQueue<>();
        thread = new Thread(new MyRunnable());
        thread.start();
    }

    public <T> Future<T> submit(Callable<T> c) {
        MyFutureTask<T> task = new MyFutureTask<T>(c, eType);
        tasksQ.add(task);
        return task;
    }

    class MyRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                if (tasksQ.isEmpty()) {
                    try {
                        Thread.sleep(1);
                        continue;
                    } catch (InterruptedException ite) {
                        Thread.interrupted();
                        break;
                    }
                }

                MyFutureTask<?> task = tasksQ.poll();
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class MyFutureTask<T> implements RunnableFuture<T> {

        final Callable<?> cb;
        volatile Object outcome;

        static final int STATE_PENDING = 1;
        static final int STATE_EXECUTING = 2;
        static final int STATE_DONE = 3;

        final AtomicInteger atomicState = new AtomicInteger(STATE_PENDING);

        final EType eType;

        public MyFutureTask(Callable<?> cb, EType eType) {
            cb = Objects.requireNonNull(cb);
            eType = Objects.requireNonNull(eType);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new NotImplementedException();
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return atomicState.get() == STATE_DONE;
        }

        @SuppressWarnings("unchecked")
        @Override
        public T get() throws InterruptedException, ExecutionException {
            while (true) {
                switch (atomicState.get()) {
                case STATE_PENDING:
                case STATE_EXECUTING:
//                      Thread.sleep(1);
                    break;
                case STATE_DONE:
                    return (T)outcome;
                default:
                    throw new IllegalStateException();
                }
            }
        }

        @Override
        public T get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            throw new NotImplementedException();
        }

        void set(T t) {
            outcome = t;
        }

        @Override
        public void run() {
            if (atomicState.compareAndSet(STATE_PENDING, STATE_EXECUTING)) {
                Object result;
                try {
                    switch (eType) {
                    case NoHolder:
                        result = cb.call();
                        break;
                    case Holder1:
                        throw new NotImplementedException();
                    case Holder2:
                        throw new NotImplementedException();
                    default:
                        throw new IllegalStateException();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    result = null;
                }

                outcome = result;
                atomicState.set(STATE_DONE);
            }
        }
    }
}

class MyTask implements Callable<List<Integer>> {
    @Override
    public List<Integer> call() throws Exception {
        List<Integer> ret = new ArrayList<>(100);
        IntStream.range(0, 100).boxed().forEach(ret::add);
        return ret;
    }
}

共有1个答案

益楷
2023-03-14

重要的是发生之前的关系。来自ExecutorServiceAPI文档:

内存一致性影响:在将可运行可调用任务提交给ExecutorService之前,线程中的操作发生--在该任务采取的任何操作之前发生--在通过future.get()检索结果之前发生。

所以像这样传输可变对象是安全的。ExecutorService实现通过某种形式的安全发布来传输对象。

如果通过存储在共享的非volatile字段中来在线程之间进行通信,那么这将是不安全的。

 类似资料:
  • 问题内容: 是否保证线程安全? 我将把来自不同线程的作业提交到同一个ThreadPoolExecutor,在交互/提交任务之前,我是否必须同步对执行者的访问? 问题答案: 的确,有问题的JDK类似乎并未明确保证线程安全的任务提交。但是,实际上,该库中的所有ExecutorService实现实际上都是以这种方式线程安全的。我认为依靠这一点是合理的。由于实现这些功能的所有代码都放置在公共领域中,因此任

  • 问题内容: 我试图基于初始化按需持有人习惯用法创建一个线程安全的单例类。这是我的代码 我的期望是以线程安全的方式初始化ExecutorService,并且那里只有一个实例(静态)。 这段代码是实现了这一目标-还是需要任何更改? 问题答案: 根据SEI 指南,您的方法很好。 但是,由于我们有枚举,因此可以使用枚举的简单方法: 而且,如果您想变得真正聪明,还可以定义枚举实现的接口。因为这样您以后可以

  • 我从主线程调用了下面的代码,使用ExecutorService池并启动一个线程来处理找到的每个文件。我正在尝试了解当主线程被kill命令终止时ExecutorService的行为。生成的线程会发生什么?一旦完成工作,它们会立即被杀还是终止? 还有没有更好/更安全的方法来编写下面的代码段,特别是如果我在无限循环中运行这部分,例如等待文件被放到输入目录并分配线程来处理它们?在这种情况下,我应该创建一个

  • 即;每个可调用方调用progressBarUpdate(): 每个doSomeStuff()都有自己的异常处理,如果发生错误或抛出异常,则返回一个空值。这就是为什么返回类型是List,并且在这种情况下返回null的原因。调用项和它们返回的文件列表之间没有交叉,它们都维护自己的文件列表。 我发现它工作得很好,但偶尔会抛出窗体的InterruptedException: 我修改了代码,使条件nv>=m

  • 我有一些线程的问题。 我的剧本 1-从文本文件将1000多万行加载到数组中 2-创建5个固定线程的执行池 3-然后它正在迭代该列表并将一些线程添加到队列中 现在活动线程永远不会绕过5个固定线程,这很好,但我发现我的处理器进入100%负载,我已经调试了一点,我看到正在调用构造函数,女巫意味着无论我声明5个固定线程,仍将尝试创建10万个对象。 主要问题是:我如何防止这种情况?我只是想让线程在没有空间的

  • 问题内容: 假设我有一个利用该框架的应用程序 当我在调试器中运行此应用程序时,将使用以下(默认)名称创建一个线程:。如你所见,这并不是非常有用,而且据我所知,该框架没有提供一种简便的方法来命名已创建的线程或线程池。 那么,如何为线程/线程池提供名称呢?例如,。 问题答案: 你可以提供一个到。工厂将负责创建线程,并将能够为其命名。 引用Javadoc: 创建新线程 使用创建新线程。如果没有另外指定,