当前位置: 首页 > 面试题库 >

如何在线程池中使用MDC?

濮阳翔
2023-03-14
问题内容

在我们的软件中,我们广泛使用MDC来跟踪Web请求的内容,例如会话ID和用户名。在原始线程中运行时,这可以正常工作。但是,有很多事情需要在后台处理。为此,我们使用java.concurrent.ThreadPoolExecutor和java.util.Timer类以及一些自卷式异步执行服务。所有这些服务都管理自己的线程池。

这是Logback手册关于在这样的环境中使用MDC的内容:

映射的诊断上下文的副本不能始终由工作线程从发起线程继承。当java.util.concurrent.Executors用于线程管理时,就是这种情况。例如,newCachedThreadPool方法创建ThreadPoolExecutor,并且像其他线程池代码一样,它具有复杂的线程创建逻辑。

在这种情况下,建议在将任务提交给执行者之前,在原始(主)线程上调用MDC.getCopyOfContextMap()。当任务运行时,作为第一个动作,它应调用MDC.setContextMapValues()将原始MDC值的存储副本与新的Executor托管线程相关联。

这样做很好,但是忘记添加这些呼叫非常容易,并且直到太晚之前,都没有简便的方法来识别问题。Log4j的唯一标志是您在日志中丢失了MDC信息,而使用Logback则获得了陈旧的MDC信息(因为胎面池中的线程从运行它的第一个任务继承了它的MDC)。两者都是生产系统中的严重问题。

我没有以任何方式看到我们的处境,但在网络上找不到太多有关此问题的信息。显然,这并不是很多人反对的事情,因此必须有一种避免的方法。我们在做什么错呢?


问题答案:

的,这也是我遇到的常见问题。有一些解决方法(如所述手动设置),但理想情况下,您需要一种解决方案

一致地设置MDC;
避免MDC不正确但您不知道的默认错误;和
最小化线程池使用方式的更改(例如Callable,MyCallable随处可见的子类化或类似的丑陋情况)。
这是我使用的满足这三个需求的解决方案。代码应该是不言自明的。


(请注意,MoreExecutors.listeningDecorator()如果您使用Guava的,可以创建此执行程序并将其馈送到Guava的ListanableFuture。)

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;

/**
 * A SLF4J MDC-compatible {@link ThreadPoolExecutor}.
 * <p/>
 * In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate
 * logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a
 * thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately.
 * <p/>
 * Created by jlevy.
 * Date: 6/14/13
 */
public class MdcThreadPoolExecutor extends ThreadPoolExecutor {

    final private boolean useFixedContext;
    final private Map<String, Object> fixedContext;

    /**
     * Pool where task threads take MDC from the submitting thread.
     */
    public static MdcThreadPoolExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    /**
     * Pool where task threads take fixed MDC from the thread that creates the pool.
     */
    @SuppressWarnings("unchecked")
    public static MdcThreadPoolExecutor newWithCurrentMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                          TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(MDC.getCopyOfContextMap(), corePoolSize, maximumPoolSize, keepAliveTime, unit,
                workQueue);
    }

    /**
     * Pool where task threads always have a specified, fixed MDC.
     */
    public static MdcThreadPoolExecutor newWithFixedMdc(Map<String, Object> fixedContext, int corePoolSize,
                                                        int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                                        BlockingQueue<Runnable> workQueue) {
        return new MdcThreadPoolExecutor(fixedContext, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private MdcThreadPoolExecutor(Map<String, Object> fixedContext, int corePoolSize, int maximumPoolSize,
                                  long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.fixedContext = fixedContext;
        useFixedContext = (fixedContext != null);
    }

    @SuppressWarnings("unchecked")
    private Map<String, Object> getContextForTask() {
        return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
    }

    /**
     * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
     * all delegate to this.
     */
    @Override
    public void execute(Runnable command) {
        super.execute(wrap(command, getContextForTask()));
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) {
        return new Runnable() {
            @Override
            public void run() {
                Map previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            }
        };
    }
}


 类似资料:
  • 问题内容: 您能解释一下Netty如何使用线程池工作吗?我是否正确理解,线程池有两种:老板线程和工人线程。老板用于执行I / O,而worker用于调用用户回调(messageReceived)来处理数据? 问题答案: 这是来自NioServerSocketChannelFactory文档 一个ServerSocketChannelFactory,它创建一个基于NIO的服务器端ServerSock

  • 问题内容: 我有一个使用该框架的Java应用程序,并且我的代码看起来像这样 我的理解是,JVM会在内部创建5个线程的池。现在,当我在探查器中检查执行情况时,会得到类似的信息。 , 我需要一种方法来区分由我创建的和由服务器创建的 。 我在想,如果我可以命名线程池,它应该可以解决问题,但是看不到任何允许我执行此操作的API。 提前致谢。 问题答案: 您可以将自己的ThreadFactory传递给Sch

  • 我在很多地方读过,线程池减少了线程创建开销,从而提高了性能。但是一旦线程执行完它的run方法,它就会进入dead/terminated状态,这意味着它可以再次重新启动。 那么,线程池如何处理线程的释放呢?它是否真的以某种方式保存线程以服务下一个任务,或者在每次提交任务时在内部创建新线程?

  • 问题内容: 我在Java中使用A 来管理许多正在运行的线程。我创建了自己的简单名称,以便为线程命名。 问题在于,在首次创建线程池时会在线程中设置名称,并且该名称与线程池实际正在运行的任务无关。我了解这一点…尽管我的Runnable和Callables具有名称,但它们实际上是从的运行线程开始的抽象层。 关于为线程池创建名称,StackOverflow上还有其他一些问题。(请参阅如何为可调用线程命名?

  • 我需要创建一个并行执行多个操作的应用程序。我曾考虑过使用线程或线程池,但我以前从未使用过,所以我发现这相当困难。Thread应按以下方式工作: 所有系统应同时运行。你认为我应该如何实现这一点?

  • 我知道PHP支持处理多个并发连接,并且根据服务器的不同,它可以像这个答案中提到的那样进行配置 服务器是如何管理多个连接的?它是为每个请求派生一个子进程,还是使用线程处理,还是使用线程池处理? 链接的答案说一个进程是分叉的,然后作者在评论中说是线程还是进程,这让人很困惑,如果请求是使用子进程、线程还是线程池提供的?