当前位置: 首页 > 面试题库 >

如何在ForkJoinPool中使用MDC?

席宜修
2023-03-14
问题内容

跟进如何在线程池中使用MDC?如何将MDC与ForkJoinPool?具体来说,我如何ForkJoinTask在执行任务之前包装一个MDC值?


问题答案:

以下内容似乎对我有用:

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.MDC;

/**
 * A {@link ForkJoinPool} that inherits MDC contexts from the thread that queues a task.
 *
 * @author Gili Tzabari
 */
public final class MdcForkJoinPool extends ForkJoinPool
{
    /**
     * Creates a new MdcForkJoinPool.
     *
     * @param parallelism the parallelism level. For default value, use {@link java.lang.Runtime#availableProcessors}.
     * @param factory     the factory for creating new threads. For default value, use
     *                    {@link #defaultForkJoinWorkerThreadFactory}.
     * @param handler     the handler for internal worker threads that terminate due to unrecoverable errors encountered
     *                    while executing tasks. For default value, use {@code null}.
     * @param asyncMode   if true, establishes local first-in-first-out scheduling mode for forked tasks that are never
     *                    joined. This mode may be more appropriate than default locally stack-based mode in applications
     *                    in which worker threads only process event-style asynchronous tasks. For default value, use
     *                    {@code false}.
     * @throws IllegalArgumentException if parallelism less than or equal to zero, or greater than implementation limit
     * @throws NullPointerException     if the factory is null
     * @throws SecurityException        if a security manager exists and the caller is not permitted to modify threads
     *                                  because it does not hold
     *                                  {@link java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public MdcForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,
        boolean asyncMode)
    {
        super(parallelism, factory, handler, asyncMode);
    }

    @Override
    public void execute(ForkJoinTask<?> task)
    {
        // See http://stackoverflow.com/a/19329668/14731
        super.execute(wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public void execute(Runnable task)
    {
        // See http://stackoverflow.com/a/19329668/14731
        super.execute(wrap(task, MDC.getCopyOfContextMap()));
    }

    private <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task, Map<String, String> newContext)
    {
        return new ForkJoinTask<T>()
        {
            private static final long serialVersionUID = 1L;
            /**
             * If non-null, overrides the value returned by the underlying task.
             */
            private final AtomicReference<T> override = new AtomicReference<>();

            @Override
            public T getRawResult()
            {
                T result = override.get();
                if (result != null)
                    return result;
                return task.getRawResult();
            }

            @Override
            protected void setRawResult(T value)
            {
                override.set(value);
            }

            @Override
            protected boolean exec()
            {
                // According to ForkJoinTask.fork() "it is a usage error to fork a task more than once unless it has completed
                // and been reinitialized". We therefore assume that this method does not have to be thread-safe.
                Map<String, String> oldContext = beforeExecution(newContext);
                try
                {
                    task.invoke();
                    return true;
                }
                finally
                {
                    afterExecution(oldContext);
                }
            }
        };
    }

    private Runnable wrap(Runnable task, Map<String, String> newContext)
    {
        return () ->
        {
            Map<String, String> oldContext = beforeExecution(newContext);
            try
            {
                task.run();
            }
            finally
            {
                afterExecution(oldContext);
            }
        };
    }

    /**
     * Invoked before running a task.
     *
     * @param newValue the new MDC context
     * @return the old MDC context
     */
    private Map<String, String> beforeExecution(Map<String, String> newValue)
    {
        Map<String, String> previous = MDC.getCopyOfContextMap();
        if (newValue == null)
            MDC.clear();
        else
            MDC.setContextMap(newValue);
        return previous;
    }

    /**
     * Invoked after running a task.
     *
     * @param oldValue the old MDC context
     */
    private void afterExecution(Map<String, String> oldValue)
    {
        if (oldValue == null)
            MDC.clear();
        else
            MDC.setContextMap(oldValue);
    }
}

import java.util.Map;
import java.util.concurrent.CountedCompleter;
import org.slf4j.MDC;

/**
 * A {@link CountedCompleter} that inherits MDC contexts from the thread that queues a task.
 *
 * @author Gili Tzabari
 * @param <T> The result type returned by this task's {@code get} method
 */
public abstract class MdcCountedCompleter<T> extends CountedCompleter<T>
{
    private static final long serialVersionUID = 1L;
    private final Map<String, String> newContext;

    /**
     * Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
     */
    protected MdcCountedCompleter()
    {
        this(null);
    }

    /**
     * Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
     *
     * @param completer this task's completer; {@code null} if none
     */
    protected MdcCountedCompleter(CountedCompleter<?> completer)
    {
        super(completer);
        this.newContext = MDC.getCopyOfContextMap();
    }

    /**
     * The main computation performed by this task.
     */
    protected abstract void computeWithContext();

    @Override
    public final void compute()
    {
        Map<String, String> oldContext = beforeExecution(newContext);
        try
        {
            computeWithContext();
        }
        finally
        {
            afterExecution(oldContext);
        }
    }

    /**
     * Invoked before running a task.
     *
     * @param newValue the new MDC context
     * @return the old MDC context
     */
    private Map<String, String> beforeExecution(Map<String, String> newValue)
    {
        Map<String, String> previous = MDC.getCopyOfContextMap();
        if (newValue == null)
            MDC.clear();
        else
            MDC.setContextMap(newValue);
        return previous;
    }

    /**
     * Invoked after running a task.
     *
     * @param oldValue the old MDC context
     */
    private void afterExecution(Map<String, String> oldValue)
    {
        if (oldValue == null)
            MDC.clear();
        else
            MDC.setContextMap(oldValue);
    }
}
  1. 针对您的任务MdcForkJoinPool而不是普通的ForkJoinPool 运行任务。
  2. MdcCountedCompleter代替扩展CountedCompleter


 类似资料:
  • Akka docs声明默认调度程序是一个。 我想知道为什么? 从 ForkJoinPool ForkJoinPool与其他类型的ExecutorService的区别主要在于采用了工作窃取:池中的所有线程都试图查找和执行提交给池和/或由其他活动任务创建的任务(如果不存在,则最终阻止等待工作)。这使得(1)当大多数任务产生其他子任务时(就像大多数ForkJoinTasks一样),以及(2)当许多小任务

  • 代码: 我有上面的代码来并行执行一些任务。考虑到已经让调用线程等待完成,不知道它是否应该是而不是块中的。 注意:仅从输入列表中读取。

  • 在Java8中,可以设置一个自定义forkJoinpool,由并行流而不是公共池使用。 我的问题是技术上是如何发生的 流在任何方面都不知道它已提交到自定义forkJoinpool,并且无法直接访问它。那么,最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了一些threadLocal变量,然后流在稍后使用。如果是这样,为什么语言开发人员会选择这样

  • 在Java8中,可以设置一个定制的forkJoinPool供并行流使用,而不是公共池。 我的问题是它在技术上是如何发生的? 流以任何方式都不知道它被提交给了自定义的forkJoinpool并且没有直接访问它的权限。那么最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了某个threadLocal变量,然后在稍后由流使用。如果是这样的话,为什么语言

  • 我比较了CompletableFuture.SupplyAsync()在以下两种情况下的行为:我设置了一个自定义ExecutorService,或者我希望我的供应商由默认的executor(如果没有指定)执行,这两种情况是forkJoinPool.commonpool() 完成了!! 所以“done”会在主执行结束后打印出来。 但如果我用:

  • 是否可以配置使用1个执行线程? 我正在执行在中调用的代码。每次它运行时,我都会遇到不同的运行时行为,这使得研究回归变得很困难。 我希望代码库提供“调试”和“发布”模式。“调试”模式将使用固定种子配置,并使用单个执行线程配置。“释放”模式将使用系统提供的种子,并使用默认数量的线程。 我尝试配置的并行性为1,但它使用2个线程(和第二个工作线程)。有什么想法吗?