当前位置: 首页 > 知识库问答 >
问题:

为可完成的期货触发新的侦探追踪

孙翰墨
2023-03-14

我正在尝试实现Slueth,用于spring boot微服务的分布式跟踪,这些微服务通过消息传递通道相互通信。

其中一个微服务是一个调度器,它接收一天内创建的新消费者。然后,它以异步方式为每个消费者的数据运行分组过程。

现在,我使用traceableExeucutorService将为调度程序线程生成的sleuth跟踪传递给每个使用者的子线程。

跟踪配置

@EnableScheduling
@Configuration
public class TracingConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(schedulingExecutor());
    }

    @Bean(destroyMethod = "shutdown")
    public Executor schedulingExecutor() {
        return Executors.newScheduledThreadPool(1);
    }

    @Bean
    public Executor traceAbleExecutorService(BeanFactory factory) {
        return new TraceableExecutorService(factory, Executors.newFixedThreadPool(10));
    }

} 

调度程序服务

@Slf4j
@Service
public class ConsumerScheduler {

    @Autowired
    Executor traceAbleExecutorService;

    @Scheduled(cron = "0/5 * * * * *")
    public void testScheduler() {

        log.info("Running scheduler");

        List<String> consumers = new ArrayList<>();
        consumers.add("Consumer1");
        consumers.add("Consumer2");

        consumers.forEach(
                consumer -> CompletableFuture.runAsync(() -> getConsumerData(consumer), traceAbleExecutorService));

        log.info("Completed scheduler");

    }

    private void getConsumerData(String consumer) {

        log.info("Running {}", consumer);
        log.info("Logging Data for {}", consumer);

    }

}

这最终会为每个消费者使用相同的traceId,因此所有下游服务都会为每个消费者记录相同的traceId。

我希望每个使用者线程都有自己的traceId的原因是,这些子线程将依次向下游服务发布消息。由于调度程序应该每天只运行一次,因此一天内每个用户的日志都会具有相同的traceId,从而破坏了跟踪的整体目的。

两个调度程序的原始日志运行时,每个用户使用相同的traceId

2020-02-01 13:22:05.025  INFO [,c4b8535556794e6d,c4b8535556794e6d,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Running scheduler
2020-02-01 13:22:05.036  INFO [,c4b8535556794e6d,c4b8535556794e6d,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Completed scheduler
2020-02-01 13:22:05.036  INFO [,c4b8535556794e6d,3a05952293179b5f,false] 6528 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler       : Running Consumer2
2020-02-01 13:22:05.036  INFO [,c4b8535556794e6d,cba00b8dd7edc99c,false] 6528 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler       : Running Consumer1
2020-02-01 13:22:05.040  INFO [,c4b8535556794e6d,cba00b8dd7edc99c,false] 6528 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler       : Logging Data for Consumer1
2020-02-01 13:22:05.040  INFO [,c4b8535556794e6d,3a05952293179b5f,false] 6528 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler       : Logging Data for Consumer2
2020-02-01 13:22:10.002  INFO [,5ad7e1a6ddc176e4,5ad7e1a6ddc176e4,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Running scheduler
2020-02-01 13:22:10.003  INFO [,5ad7e1a6ddc176e4,5ad7e1a6ddc176e4,false] 6528 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Completed scheduler
2020-02-01 13:22:10.003  INFO [,5ad7e1a6ddc176e4,e2fe5d0c4abc0f4b,false] 6528 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler       : Running Consumer1
2020-02-01 13:22:10.003  INFO [,5ad7e1a6ddc176e4,e2fe5d0c4abc0f4b,false] 6528 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler       : Logging Data for Consumer1
2020-02-01 13:22:10.003  INFO [,5ad7e1a6ddc176e4,d3d0d18d896a2602,false] 6528 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler       : Running Consumer2
2020-02-01 13:22:10.003  INFO [,5ad7e1a6ddc176e4,d3d0d18d896a2602,false] 6528 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler       : Logging Data for Consumer2

所以我在异步操作调用的方法中创建了一个新的跟踪器,但我仍然看到主线程的traceId被记录。所以我必须手动提取traceId和spanId,然后将其添加到ThreadContext中。这是实现这一点的预期方法,还是有更优雅的解决方案,只要启动一个新的跟踪,就会将值添加到ThreadContext中。

更新了get消费者数据

private void getConsumerData(String consumer) {

        Span span = Tracing.currentTracer().newTrace().start();

        try {

            String traceId = span.context().traceIdString();
            String spanId = span.context().spanIdString();
            String parentId = span.context().parentIdString();

            ThreadContext.put("traceId", traceId);
            ThreadContext.put("X-B3-TraceId", traceId);
            ThreadContext.put("spanId", spanId);
            ThreadContext.put("X-B3-SpanId", spanId);
            ThreadContext.put("parentId", parentId);
            ThreadContext.put("X-B3-ParentId", parentId);

            log.info("Running {}", consumer);
            log.info("Logging Data for {}", consumer);

        } finally {
            span.finish();
        }
    }

使用消费者级TraceId更新日志

2020-02-01 13:30:40.022  INFO [,46fb0ea5afb9accc,46fb0ea5afb9accc,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Running scheduler
2020-02-01 13:30:40.029  INFO [,46fb0ea5afb9accc,46fb0ea5afb9accc,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Completed scheduler
2020-02-01 13:30:40.048  INFO [,06eacdb987cafe11,06eacdb987cafe11,false] 14796 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler       : Running Consumer1
2020-02-01 13:30:40.048  INFO [,9273140d294add25,9273140d294add25,false] 14796 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler       : Running Consumer2
2020-02-01 13:30:40.052  INFO [,9273140d294add25,9273140d294add25,false] 14796 --- [pool-1-thread-2] com.example.demo.ConsumerScheduler       : Logging Data for Consumer2
2020-02-01 13:30:40.052  INFO [,06eacdb987cafe11,06eacdb987cafe11,false] 14796 --- [pool-1-thread-1] com.example.demo.ConsumerScheduler       : Logging Data for Consumer1
2020-02-01 13:30:45.002  INFO [,ba63c9342b2bb82d,ba63c9342b2bb82d,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Running scheduler
2020-02-01 13:30:45.003  INFO [,ba63c9342b2bb82d,ba63c9342b2bb82d,false] 14796 --- [pool-2-thread-1] com.example.demo.ConsumerScheduler       : Completed scheduler
2020-02-01 13:30:45.003  INFO [,dac9c7bbc7c4a312,dac9c7bbc7c4a312,false] 14796 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler       : Running Consumer1
2020-02-01 13:30:45.003  INFO [,dac9c7bbc7c4a312,dac9c7bbc7c4a312,false] 14796 --- [pool-1-thread-3] com.example.demo.ConsumerScheduler       : Logging Data for Consumer1
2020-02-01 13:30:45.003  INFO [,4eda4d2c9ec95b50,4eda4d2c9ec95b50,false] 14796 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler       : Running Consumer2
2020-02-01 13:30:45.003  INFO [,4eda4d2c9ec95b50,4eda4d2c9ec95b50,false] 14796 --- [pool-1-thread-4] com.example.demo.ConsumerScheduler       : Logging Data for Consumer2

共有1个答案

刘运浩
2023-03-14

问题是跨度不负责使自己成为当前(范围)。跟踪器是。您需要更新它:

 Span span = Tracing.currentTracer().newTrace().start();

为此:

Span newSpan = Tracing.currentTracer().newTrace().start();
tracer.withSpanInScope( newSpan );

tracer实例可以自动连接:

@Autowired
private Tracer tracer;
 类似资料:
  • 我有一个可完成期货的列表,我想从第一个期货开始,如果有任何完成例外,我想尝试列表中的下一个期货,依此类推,直到我耗尽了我所有的期货。如果任何一个期货成功了,我想就此止步,而不使用列表中的下一个期货。我如何做到这一点?到目前为止,我已经尝试过: 但是当我测试这种方法时,我看到当未来完成失败时,会抛出异常,并且不会尝试下一组期货。 编辑: 这就是样本的样子

  • 我想知道Hazelcast中是否有一些侦探的集成。在我的应用程序中,我有Hazelcast队列,其中配置了用于addEntity事件的事件侦听器,问题是一旦该侦听器触发,跨度似乎就会中断。我知道ExecutorService集成了侦探,但com.hazelcast.core.ItemListener是否有类似的东西?提前谢谢。 UPD:提供更多细节。我有一些使用spring cloud sleth

  • 我有一个关于Java流和链式可完成期货如何执行的问题。 我的问题是:如果我运行下面的代码,调用,列表中有10个项目需要大约11秒才能完成(列表中的项目数加1)。这是因为我有两个线程并行工作:第一个执行操作,一旦完成,第二个执行操作,第一个开始处理列表中的下一个项目。 如果我注释掉第36行(),那么方法需要大约20秒才能完成。Thread不平行运行;对于列表中的每个项目,操作完成,然后在处理列表中的

  • 我对RxJava并不完全陌生,但我被一项看似简单的任务所阻碍。 我有一个数据源,它公开了一个反应式API,我所要做的就是获取一些数据,返回它,并在没有其他消息发出时自动关闭连接。 这是我的代码: conn.query()和conn.close()在不同的调度程序中异步执行。此代码不起作用,因为conn.close()返回一个没有订阅服务器的Completable。此外,如果我手动订阅doOnCom

  • 现在,我有三个函数:updateFieldFromCollection1()、

  • 我从sping-cloud-sleuth-core中找到restTemplateInterceptor和feignRequest estInterceptor,但是我们的项目使用的是hessian连接微服务,我发现sping-cloud-sleuth无法注入到hessian客户端。有人可以分享一下如何在hessian中使用sping-cloud-sleuth的代码吗?谢谢~