当前位置: 首页 > 知识库问答 >
问题:

通过Spring Cloud Stream Binder进行Spring Cloud Sleuth轨迹ID传播,用于AWS运动

何灼光
2023-03-14

根据我在这里读到的内容,我试图确保Spring Cloud Sleuth跟踪ID作为标题发送到Kinesis上。如果我正确阅读了这里提供的Spring Cloud Kinesis binder文档,我认为下面的配置、生产者和消费者代码将允许X-B3-TraceId头从生产者成功发送到消费者。但是,这目前不起作用,因为标头在使用者端不可用。使用AWS Kinesis活页夹,如何将此标题从生产者传递给消费者?下面缺少什么配置和代码?

Spring护套:2.6.7

Spring Cloud:2021.0.1

AWS运动用Spring云流活页夹:2.2.0

配置:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          headers:
            - X-B3-TraceId

制片人:

accountChannels
        .accountRequest()
        .send(
            MessageBuilder.withPayload(accountEvent)
                .setHeader("X-B3-TraceId", getTraceId())
                .build());
...

消费者:

@StreamListener(ApplicationChannels.ACCOUNT_REQUEST)
  public void processAccountRequest(
      final Message<AccountEvent> message,
      @Header(AwsHeaders.CHECKPOINTER) final IRecordProcessorCheckpointer checkpointer,
      @Header("X-B3-TraceId") final String traceId) {
...

在消费者中记录MessageHeaders的结果:

Received message header b3 and value 3e6c8447beeff928-3e6c8447beeff928-0
Received message header nativeHeaders and value {b3=[3e6c8447beeff928-3e6c8447beeff928-0]}
Received message header x-b3-traceid and value 304f6cc831322a5c
Received message header aws_shard and value shardId-000000000000
Received message header aws_checkpointer and value com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer@338f0685
Received message header id and value 7754e5ff-e09d-886c-a425-8e0068a57a89
Received message header sourceData and value UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49625973706469884904499220492564923628215483380266958850, getData()=java.nio.HeapByteBuffer[pos=0 lim=171 cap=171], getPartitionKey()=8162231937]
Received message header contentType and value application/vnd.accountevent.v1+avro
Received message header aws_receivedPartitionKey and value 8162231937
Received message header aws_receivedStream and value account-request
Received message header aws_receivedSequenceNumber and value 49625973706469884904499220492564923628215483380266958850
Received message header timestamp and value 1650915186028

共有1个答案

蒋联
2023-03-14

我有这样一个Spring云应用程序:

应用属性

logging.level.root=warn

spring.sleuth.tracer.mode=brave

spring.cloud.stream.bindings.input.destination=test-stream
spring.cloud.stream.bindings.input.group=my-group
spring.cloud.stream.bindings.output.destination=test-stream
spring.cloud.stream.kinesis.binder.headers=X-B3-TraceId

所以72003034申请

@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class So72003034Application {

    public static void main(String[] args) {
        SpringApplication.run(So72003034Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void processAccountRequest(
            Message<String> message,
            @Header("X-B3-TraceId") final String traceId) {

        System.out.println("Consumer trace: " + traceId);
    }

}

So 72003034应用测试

@SpringBootTest(properties = "cloud.aws.region.static=us-east-1")
@Testcontainers(disabledWithoutDocker = true)
@DirtiesContext
class So72003034ApplicationTests {

    @Container
    public static LocalStackContainer localStack =
            new LocalStackContainer(DockerImageName.parse("localstack/localstack"))
                    .withServices(LocalStackContainer.Service.KINESIS);


    @Autowired
    Source source;

    @Autowired
    Tracer tracer;

    @Test
    void testTracingPropagation() throws InterruptedException {
        String traceId = this.tracer.newTrace().context().traceIdString();
        System.out.println("Producer trace: " + traceId);
        this.source.output()
                .send(MessageBuilder.withPayload("tet")
                        .setHeader("X-B3-TraceId", traceId)
                        .build());

        Thread.sleep(10_000);
    }

    @TestConfiguration
    public static class LocalStackConfiguration {

        @Bean
        public AmazonKinesisAsync amazonKinesis() {
            return AmazonKinesisAsyncClientBuilder.standard()
                    .withEndpointConfiguration(localStack.getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
                    .withCredentials(localStack.getDefaultCredentialsProvider())
                    .build();
        }

        @Bean
        public LockRegistry lockRegistry() {
            return new DefaultLockRegistry();
        }

        @Bean
        public ConcurrentMetadataStore checkpointStore() {
            return new SimpleMetadataStore();
        }

    }

}

我看到,X-B3-TraceId头通过运动流从生产者传播到消费者。

我无法确认是否启用了KCL,因为它的producer库在Windows上不再工作。

我还在调试模式下看到接收到消息容器的b3标头。也许是Spring Cloud Sleuth自动填充的那个。

更新

工作策略是传播整个b3标头:

spring.cloud.stream.kinesis.binder.headers=b3

制片人对我来说是这样的,只是因为目前还没有任何迹象:

    ScopedSpan scopedSpan = this.tracer.startScopedSpan("test");
    String traceId = scopedSpan.context().traceId();
    System.out.println("Producer trace: " + traceId);
    this.source.output().send(new GenericMessage<>("test"));
    scopedSpan.end();

通过这种方式,TracingChannelInterceptor能够从Threadlocal获取跟踪,启动一个新的范围并将其设置到b3标头中。

在消费者方面,我有:

@StreamListener(Sink.INPUT)
public void processAccountRequest(Message<String> message,
        @Header("b3") final String trace) {

    System.out.println("Consumer b3: " + trace);
}

这确实显示了我发送给制作人的相同轨迹:

Producer trace: 957fd8316345f39f
Consumer b3: 957fd8316345f39f-44810ee5bf7cb0b7-0
 类似资料:
  • 本文向大家介绍js实现回放拖拽轨迹从过程上进行分析,包括了js实现回放拖拽轨迹从过程上进行分析的使用技巧和注意事项,需要的朋友参考一下 今天有点小高兴,csdn博客浏览量过万了,在过去还从来没有过这么高的浏览量呢,不得不说,太多时候还是有些矫情,可看到这些鼓励还是忍不住高兴啊,至少,这样让我有一种行内人员的感觉,吾道不孤啊。 闲话不多说,继续今天的记录,记录回放拖拽痕迹,先从过程上进行分析: 1、

  • 轨迹信息为用户的浏览信息(比如首页、商品页、购物车、支付页、支付成功页等),只有调用轨迹方法,客服端的客服人员才能看到用户的浏览内容,提高服务质量。 > 参数说明: 一.标准集成方式 基本集成方式适用于在需要上传的轨迹的界面分别调用以下接口实现轨迹上传功能。 1.首页轨迹 /** 上报轨迹 @param pageName 当前页面名称 @param model 轨迹参数模型 */ NtalkerT

  • 轨迹的集成 轨迹信息为用户的浏览信息(比如首页、商品页、购物车、支付页、支付成功页等),用户可以在以上页面调用轨迹方法,调用成功后,客服人员可以在客服端看到用户的浏览内容,同时可以做客户下单统计,有助提高服务质量。如果客户不需要做统计可以不传轨迹。 参数说明: 参数 类型 是否必传 说明 siteid String 是 企业id title String 是 用户浏览当前页的标题名称 pagele

  •        点击轨迹记录面板的结束记录按钮:        在弹出来的名称输入框中,输入轨迹名称(默认名称为时间),点击确定,即可完成轨迹记录:        保存完成后,主界面中会展示此轨迹线,也可通过数据管理进行其他操作。

  •        轨迹记录开始后,地图中会实时更新位置信息并在轨迹面板中更新当前的轨迹信息。        轨迹记录面板包括一些轨迹记录中的统计信息:        当前速度,来自GPS的实时速度;        平均速度,本次轨迹记录的总平均速度;        最高速度,本次轨迹记录的最快速度;        总里程,本次轨迹记录的总距离;        总时间,本次轨迹记录的总时间;      

  •        确保定位功能完善情况下(可参考文档内 2.3权限设置,2.4电源管理),点击主界面右下方的轨迹图标打开轨迹记录面板。        轨迹记录面板主要包含:当前速度、平均速度、最高速度、总里程、总时间、最高海拔、最低海拔、累计爬升、累计下降、航向、图表等。        点击下方开始按钮即可开启轨迹记录