跟进如何在线程池中使用MDC?如何将MDC与ForkJoinPool
?具体来说,我如何ForkJoinTask
在执行任务之前包装一个MDC值?
以下内容似乎对我有用:
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.MDC;
/**
* A {@link ForkJoinPool} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
*/
public final class MdcForkJoinPool extends ForkJoinPool
{
/**
* Creates a new MdcForkJoinPool.
*
* @param parallelism the parallelism level. For default value, use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value, use
* {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that terminate due to unrecoverable errors encountered
* while executing tasks. For default value, use {@code null}.
* @param asyncMode if true, establishes local first-in-first-out scheduling mode for forked tasks that are never
* joined. This mode may be more appropriate than default locally stack-based mode in applications
* in which worker threads only process event-style asynchronous tasks. For default value, use
* {@code false}.
* @throws IllegalArgumentException if parallelism less than or equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and the caller is not permitted to modify threads
* because it does not hold
* {@link java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public MdcForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,
boolean asyncMode)
{
super(parallelism, factory, handler, asyncMode);
}
@Override
public void execute(ForkJoinTask<?> task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public void execute(Runnable task)
{
// See http://stackoverflow.com/a/19329668/14731
super.execute(wrap(task, MDC.getCopyOfContextMap()));
}
private <T> ForkJoinTask<T> wrap(ForkJoinTask<T> task, Map<String, String> newContext)
{
return new ForkJoinTask<T>()
{
private static final long serialVersionUID = 1L;
/**
* If non-null, overrides the value returned by the underlying task.
*/
private final AtomicReference<T> override = new AtomicReference<>();
@Override
public T getRawResult()
{
T result = override.get();
if (result != null)
return result;
return task.getRawResult();
}
@Override
protected void setRawResult(T value)
{
override.set(value);
}
@Override
protected boolean exec()
{
// According to ForkJoinTask.fork() "it is a usage error to fork a task more than once unless it has completed
// and been reinitialized". We therefore assume that this method does not have to be thread-safe.
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.invoke();
return true;
}
finally
{
afterExecution(oldContext);
}
}
};
}
private Runnable wrap(Runnable task, Map<String, String> newContext)
{
return () ->
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
task.run();
}
finally
{
afterExecution(oldContext);
}
};
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
和
import java.util.Map;
import java.util.concurrent.CountedCompleter;
import org.slf4j.MDC;
/**
* A {@link CountedCompleter} that inherits MDC contexts from the thread that queues a task.
*
* @author Gili Tzabari
* @param <T> The result type returned by this task's {@code get} method
*/
public abstract class MdcCountedCompleter<T> extends CountedCompleter<T>
{
private static final long serialVersionUID = 1L;
private final Map<String, String> newContext;
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*/
protected MdcCountedCompleter()
{
this(null);
}
/**
* Creates a new MdcCountedCompleter instance using the MDC context of the current thread.
*
* @param completer this task's completer; {@code null} if none
*/
protected MdcCountedCompleter(CountedCompleter<?> completer)
{
super(completer);
this.newContext = MDC.getCopyOfContextMap();
}
/**
* The main computation performed by this task.
*/
protected abstract void computeWithContext();
@Override
public final void compute()
{
Map<String, String> oldContext = beforeExecution(newContext);
try
{
computeWithContext();
}
finally
{
afterExecution(oldContext);
}
}
/**
* Invoked before running a task.
*
* @param newValue the new MDC context
* @return the old MDC context
*/
private Map<String, String> beforeExecution(Map<String, String> newValue)
{
Map<String, String> previous = MDC.getCopyOfContextMap();
if (newValue == null)
MDC.clear();
else
MDC.setContextMap(newValue);
return previous;
}
/**
* Invoked after running a task.
*
* @param oldValue the old MDC context
*/
private void afterExecution(Map<String, String> oldValue)
{
if (oldValue == null)
MDC.clear();
else
MDC.setContextMap(oldValue);
}
}
MdcForkJoinPool
而不是普通的ForkJoinPool 运行任务。MdcCountedCompleter
代替扩展CountedCompleter
。Akka docs声明默认调度程序是一个。 我想知道为什么? 从 ForkJoinPool ForkJoinPool与其他类型的ExecutorService的区别主要在于采用了工作窃取:池中的所有线程都试图查找和执行提交给池和/或由其他活动任务创建的任务(如果不存在,则最终阻止等待工作)。这使得(1)当大多数任务产生其他子任务时(就像大多数ForkJoinTasks一样),以及(2)当许多小任务
代码: 我有上面的代码来并行执行一些任务。考虑到已经让调用线程等待完成,不知道它是否应该是而不是块中的。 注意:仅从输入列表中读取。
在Java8中,可以设置一个自定义forkJoinpool,由并行流而不是公共池使用。 我的问题是技术上是如何发生的 流在任何方面都不知道它已提交到自定义forkJoinpool,并且无法直接访问它。那么,最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了一些threadLocal变量,然后流在稍后使用。如果是这样,为什么语言开发人员会选择这样
在Java8中,可以设置一个定制的forkJoinPool供并行流使用,而不是公共池。 我的问题是它在技术上是如何发生的? 流以任何方式都不知道它被提交给了自定义的forkJoinpool并且没有直接访问它的权限。那么最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了某个threadLocal变量,然后在稍后由流使用。如果是这样的话,为什么语言
我比较了CompletableFuture.SupplyAsync()在以下两种情况下的行为:我设置了一个自定义ExecutorService,或者我希望我的供应商由默认的executor(如果没有指定)执行,这两种情况是forkJoinPool.commonpool() 完成了!! 所以“done”会在主执行结束后打印出来。 但如果我用:
是否可以配置使用1个执行线程? 我正在执行在中调用的代码。每次它运行时,我都会遇到不同的运行时行为,这使得研究回归变得很困难。 我希望代码库提供“调试”和“发布”模式。“调试”模式将使用固定种子配置,并使用单个执行线程配置。“释放”模式将使用系统提供的种子,并使用默认数量的线程。 我尝试配置的并行性为1,但它使用2个线程(和第二个工作线程)。有什么想法吗?