Span Data作为消息

优质
小牛编辑
131浏览
2023-12-01

您可以通过将spring-cloud-sleuth-stream jar作为依赖关系来累加并发送跨越Spring Cloud Stream的数据,并为RabbitMQ或spring-cloud-starter-stream-kafka添加通道Binder实现(例如spring-cloud-starter-stream-rabbit)为Kafka)。通过将spring-cloud-sleuth-stream jar作为依赖关系,并添加RabbitMQ或spring-cloud-starter-stream-kafka的Binder通道spring-cloud-starter-stream-rabbit来实现{ 22 /} Stream的累积和发送范围数据。 Kafka)。这将自动将您的应用程序转换为有效载荷类型为Spans的邮件的制作者。

Zipkin消费者

有一个特殊的便利注释,用于为Span数据设置消息使用者,并将其推入Zipkin SpanStore。这个应用程序

@SpringBootApplication
@EnableZipkinStreamServer
public class Consumer {
	public static void main(String[] args) {
		SpringApplication.run(Consumer.class, args);
	}
}

将通过Spring Cloud StreamBinder(例如RabbitMQ包含spring-cloud-starter-stream-rabbit)来收听您提供的任何运输的Span数据,Redis和Kafka的类似起始者) 。如果添加以下UI依赖关系

<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>

然后,您将有一个Zipkin服务器,您的应用程序 在端口9411上承载UI和API。

默认SpanStore是内存中的(适合演示,快速入门)。对于更强大的解决方案,您可以将MySQL和spring-boot-starter-jdbc添加到类路径中,并通过配置启用JDBC SpanStore,例如:

spring:
  rabbitmq:
  host: ${RABBIT_HOST:localhost}
  datasource:
  schema: classpath:/mysql.sql
  url: jdbc:mysql://${MYSQL_HOST:localhost}/test
  username: root
  password: root
# Switch this on to create the schema on startup:
  initialize: true
  continueOnError: true
  sleuth:
  enabled: false
zipkin:
  storage:
  type: mysql
注意@EnableZipkinStreamServer还用@EnableZipkinServer注释,因此该过程还将公开标准的Zipkin服务器端点,以通过HTTP收集spans,并在Zipkin Web UI中进行查询。

定制消费者

也可以使用spring-cloud-sleuth-stream并绑定到SleuthSink来轻松实现自定义消费者。例:

@EnableBinding(SleuthSink.class)
@SpringBootApplication(exclude = SleuthStreamAutoConfiguration.class)
@MessageEndpoint
public class Consumer {
  @ServiceActivator(inputChannel = SleuthSink.INPUT)
  public void sink(Spans input) throws Exception {
    // ... process spans
  }
}
注意上面的示例消费者应用程序明确排除SleuthStreamAutoConfiguration,因此它不会向其自己发送消息,但这是可选的(您可能实际上想要将消息跟踪到消费者应用程序中)。

为了自定义轮询机制,您可以创建名称等于StreamSpanReporter.POLLERPollerMetadata类型的bean。在这里可以找到这样一个配置的例子。

@Configuration
public static class CustomPollerConfiguration {
	@Bean(name = StreamSpanReporter.POLLER)
	PollerMetadata customPoller() {
		PollerMetadata poller = new PollerMetadata();
		poller.setMaxMessagesPerPoll(500);
		poller.setTrigger(new PeriodicTrigger(5000L));
		return poller;
	}
}