我正在尝试实现Slueth,用于spring boot微服务的分布式跟踪,这些微服务通过消息传递通道相互通信。
其中一个微服务是一个调度器,它接收一天内创建的新消费者。然后,它以异步方式为每个消费者的数据运行分组过程。
现在,我使用traceableExeucutorService将为调度程序线程生成的sleuth跟踪传递给每个使用者的子线程。
跟踪配置
@EnableScheduling
@Configuration
public class TracingConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setScheduler(schedulingExecutor());
}
@Bean(destroyMethod = "shutdown")
public Executor schedulingExecutor() {
return Executors.newScheduledThreadPool(1);
}
@Bean
public Executor traceAbleExecutorService(BeanFactory factory) {
return new TraceableExecutorService(factory, Executors.newFixedThreadPool(10));
}
}
调度程序服务
@Slf4j
@Service
public class ConsumerScheduler {
@Autowired
Executor traceAbleExecutorService;
@Scheduled(cron = "0/5 * * * * *")
public void testScheduler() {
log.info("Running scheduler");
List<String> consumers = new ArrayList<>();
consumers.add("Consumer1");
consumers.add("Consumer2");
consumers.forEach(
consumer -> CompletableFuture.runAsync(() -> getConsumerData(consumer), traceAbleExecutorService));
log.info("Completed scheduler");
}
private void getConsumerData(String consumer) {
log.info("Running {}", consumer);
log.info("Logging Data for {}", consumer);
}
}
这最终会为每个消费者使用相同的traceId,因此所有下游服务都会为每个消费者记录相同的traceId。
我希望每个使用者线程都有自己的traceId的原因是,这些子线程将依次向下游服务发布消息。由于调度程序应该每天只运行一次,因此一天内每个用户的日志都会具有相同的traceId,从而破坏了跟踪的整体目的。
两个调度程序的原始日志运行时,每个用户使用相同的traceId
2020-02-01 13:22:05.025 INFO [,c4b8535556794e6d,c4b8535556794e6d,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Running scheduler
2020-02-01 13:22:05.036 INFO [,c4b8535556794e6d,c4b8535556794e6d,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Completed scheduler
2020-02-01 13:22:05.036 INFO [,c4b8535556794e6d,3a05952293179b5f,false] 6528 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler : Running Consumer2
2020-02-01 13:22:05.036 INFO [,c4b8535556794e6d,cba00b8dd7edc99c,false] 6528 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler : Running Consumer1
2020-02-01 13:22:05.040 INFO [,c4b8535556794e6d,cba00b8dd7edc99c,false] 6528 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler : Logging Data for Consumer1
2020-02-01 13:22:05.040 INFO [,c4b8535556794e6d,3a05952293179b5f,false] 6528 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler : Logging Data for Consumer2
2020-02-01 13:22:10.002 INFO [,5ad7e1a6ddc176e4,5ad7e1a6ddc176e4,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Running scheduler
2020-02-01 13:22:10.003 INFO [,5ad7e1a6ddc176e4,5ad7e1a6ddc176e4,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Completed scheduler
2020-02-01 13:22:10.003 INFO [,5ad7e1a6ddc176e4,e2fe5d0c4abc0f4b,false] 6528 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler : Running Consumer1
2020-02-01 13:22:10.003 INFO [,5ad7e1a6ddc176e4,e2fe5d0c4abc0f4b,false] 6528 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler : Logging Data for Consumer1
2020-02-01 13:22:10.003 INFO [,5ad7e1a6ddc176e4,d3d0d18d896a2602,false] 6528 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler : Running Consumer2
2020-02-01 13:22:10.003 INFO [,5ad7e1a6ddc176e4,d3d0d18d896a2602,false] 6528 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler : Logging Data for Consumer2
所以我在异步操作调用的方法中创建了一个新的跟踪器,但我仍然看到主线程的traceId被记录。所以我必须手动提取traceId和spanId,然后将其添加到ThreadContext中。这是实现这一点的预期方法,还是有更优雅的解决方案,只要启动一个新的跟踪,就会将值添加到ThreadContext中。
更新了get消费者数据
private void getConsumerData(String consumer) {
Span span = Tracing.currentTracer().newTrace().start();
try {
String traceId = span.context().traceIdString();
String spanId = span.context().spanIdString();
String parentId = span.context().parentIdString();
ThreadContext.put("traceId", traceId);
ThreadContext.put("X-B3-TraceId", traceId);
ThreadContext.put("spanId", spanId);
ThreadContext.put("X-B3-SpanId", spanId);
ThreadContext.put("parentId", parentId);
ThreadContext.put("X-B3-ParentId", parentId);
log.info("Running {}", consumer);
log.info("Logging Data for {}", consumer);
} finally {
span.finish();
}
}
使用消费者级TraceId更新日志
2020-02-01 13:30:40.022 INFO [,46fb0ea5afb9accc,46fb0ea5afb9accc,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Running scheduler
2020-02-01 13:30:40.029 INFO [,46fb0ea5afb9accc,46fb0ea5afb9accc,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Completed scheduler
2020-02-01 13:30:40.048 INFO [,06eacdb987cafe11,06eacdb987cafe11,false] 14796 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler : Running Consumer1
2020-02-01 13:30:40.048 INFO [,9273140d294add25,9273140d294add25,false] 14796 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler : Running Consumer2
2020-02-01 13:30:40.052 INFO [,9273140d294add25,9273140d294add25,false] 14796 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler : Logging Data for Consumer2
2020-02-01 13:30:40.052 INFO [,06eacdb987cafe11,06eacdb987cafe11,false] 14796 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler : Logging Data for Consumer1
2020-02-01 13:30:45.002 INFO [,ba63c9342b2bb82d,ba63c9342b2bb82d,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Running scheduler
2020-02-01 13:30:45.003 INFO [,ba63c9342b2bb82d,ba63c9342b2bb82d,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler : Completed scheduler
2020-02-01 13:30:45.003 INFO [,dac9c7bbc7c4a312,dac9c7bbc7c4a312,false] 14796 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler : Running Consumer1
2020-02-01 13:30:45.003 INFO [,dac9c7bbc7c4a312,dac9c7bbc7c4a312,false] 14796 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler : Logging Data for Consumer1
2020-02-01 13:30:45.003 INFO [,4eda4d2c9ec95b50,4eda4d2c9ec95b50,false] 14796 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler : Running Consumer2
2020-02-01 13:30:45.003 INFO [,4eda4d2c9ec95b50,4eda4d2c9ec95b50,false] 14796 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler : Logging Data for Consumer2
问题是跨度不负责使自己成为当前(范围)。跟踪器是。您需要更新它:
Span span = Tracing.currentTracer().newTrace().start();
为此:
Span newSpan = Tracing.currentTracer().newTrace().start();
tracer.withSpanInScope( newSpan );
tracer实例可以自动连接:
@Autowired
private Tracer tracer;
我有一个可完成期货的列表,我想从第一个期货开始,如果有任何完成例外,我想尝试列表中的下一个期货,依此类推,直到我耗尽了我所有的期货。如果任何一个期货成功了,我想就此止步,而不使用列表中的下一个期货。我如何做到这一点?到目前为止,我已经尝试过: 但是当我测试这种方法时,我看到当未来完成失败时,会抛出异常,并且不会尝试下一组期货。 编辑: 这就是样本的样子
我想知道Hazelcast中是否有一些侦探的集成。在我的应用程序中,我有Hazelcast队列,其中配置了用于addEntity事件的事件侦听器,问题是一旦该侦听器触发,跨度似乎就会中断。我知道ExecutorService集成了侦探,但com.hazelcast.core.ItemListener是否有类似的东西?提前谢谢。 UPD:提供更多细节。我有一些使用spring cloud sleth
我有一个关于Java流和链式可完成期货如何执行的问题。 我的问题是:如果我运行下面的代码,调用,列表中有10个项目需要大约11秒才能完成(列表中的项目数加1)。这是因为我有两个线程并行工作:第一个执行操作,一旦完成,第二个执行操作,第一个开始处理列表中的下一个项目。 如果我注释掉第36行(),那么方法需要大约20秒才能完成。Thread不平行运行;对于列表中的每个项目,操作完成,然后在处理列表中的
我对RxJava并不完全陌生,但我被一项看似简单的任务所阻碍。 我有一个数据源,它公开了一个反应式API,我所要做的就是获取一些数据,返回它,并在没有其他消息发出时自动关闭连接。 这是我的代码: conn.query()和conn.close()在不同的调度程序中异步执行。此代码不起作用,因为conn.close()返回一个没有订阅服务器的Completable。此外,如果我手动订阅doOnCom
现在,我有三个函数:updateFieldFromCollection1()、
我从sping-cloud-sleuth-core中找到restTemplateInterceptor和feignRequest estInterceptor,但是我们的项目使用的是hessian连接微服务,我发现sping-cloud-sleuth无法注入到hessian客户端。有人可以分享一下如何在hessian中使用sping-cloud-sleuth的代码吗?谢谢~