我是Kafka侦探的新手。目前我遇到了一个问题,span在MessageListenerMethodInterceptor中创建了一个在KafkaListener上触发的拦截器,但如果出现异常,我们将在ErrorHandler中丢失spanId和traceId。是否可以将sleuth配置为在KafkaMessageListenerContainer中启动span?
我已经处理好了。在我的本地pc中,它工作正常,但我有一些疑问。。。
>
我通过在application.properties中设置spring.sleuth.messaging.kafka.enabled=false
来禁用MessageListenermetodInterceptor
我创建了一个记录拦截器,实现了org。springframework。Kafka。侦听器。RecordInterceptor如下所示:
private final Tracer tracer;
private final KafkaTracing kafkaTracing;
private static final ThreadLocal<Span> CURRENT_SPAN = new ThreadLocal<>();
private static final ThreadLocal<Tracer.SpanInScope> CURRENT_SPAN_IN_SCOPE = new ThreadLocal<>();
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
openSpan(record);
return RecordInterceptor.super.intercept(record, consumer);
}
@Override
public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
Span span = CURRENT_SPAN.get();
if (span != null) {
String message = Optional.ofNullable(exception.getMessage()).orElseGet(() -> exception.getClass().getSimpleName());
span.tag("error", message);
}
}
@Override
public void clearThreadState(Consumer<?, ?> consumer) {
closeSpan();
}
private void openSpan(ConsumerRecord<K, V> record) {
Span span = kafkaTracing.nextSpan(record).name("kafka-tracing").start();
Tracer.SpanInScope spanInScope = tracer.withSpanInScope(span);
CURRENT_SPAN.set(span);
CURRENT_SPAN_IN_SCOPE.set(spanInScope);
if (LOG.isDebugEnabled()) {
LOG.debug("created span {}", span);
}
}
private void closeSpan() {
Span span = CURRENT_SPAN.get();
if (span != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("finishing span {}", span);
}
span.finish();
} catch (Exception ex) {
LOG.warn("on finishing Span {} got error {}", span, ex.getMessage());
} finally {
CURRENT_SPAN.set(null);
}
}
Tracer.SpanInScope spanInScope = CURRENT_SPAN_IN_SCOPE.get();
if (spanInScope != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("closing SpanInScope {}", spanInScope);
}
spanInScope.close();
} catch (Exception ex) {
LOG.warn("on closing SpanInScope {} got error {}", spanInScope, ex.getMessage());
} finally {
CURRENT_SPAN_IN_SCOPE.set(null);
}
}
}
其他方法为空。
此拦截器的bean创建如下:
@Bean
public KafkaTracingRecordInterceptor<Object, Object> kafkaRecordInterceptor(Tracer tracer, Tracing tracing) {
return new KafkaTracingRecordInterceptor<>(tracer, KafkaTracing.create(tracing));
}
附言:如果我遗漏了什么或以错误的方式实现了它,请添加评论。
记录拦截器是仪器的最佳位置;最近的改进扩展了它的范围,包括错误处理程序。
https://github.com/spring-projects/spring-kafka/pull/1946
我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!
我正在Wildfly 9.0.1的KeyClope中配置一个事件侦听器。我创造了一个。jar有两个类,实现了一个提供者,如KeyClope在他的github示例中解释的那样。 在本例中,keydape人员解释说,有必要注册提供者编辑“standalone/configuration/standalone.xml”并将模块添加到providers元素中。我在标签“subsystem”中对这个定义进行
在Spring应用程序启动时,我想在Redis中查找一个值,并根据该值关闭或保留消息侦听器。 完全不初始化这些bean也是可以的,但是我也找不到方法。 目前,我正在尝试使用Spring的关闭容器: 容器 消息侦听器: 问题是,如果我用已经在队列中的消息启动应用程序,消息侦听器将在执行之前拾取消息。 有办法实现我的目标吗?即使采用不同的方法
要运行Kafka,需要在文件。有两种设置我不理解。 有人可以解释侦听器和广告侦听器属性之间的区别吗? 留档说: 侦听器:套接字服务器侦听的地址。 和 advertised.listeners:主机名和端口代理将向生产者和消费者做广告。 我什么时候必须使用哪个设置?
我正在使用Spring靴和活动MQ设置一个持久的JMS主题使用者。我能够使用Spring靴@JmsListener注释来使一切正常工作(作为耐用消费者成功运行)。但是,因为我想动态创建侦听器,所以我尝试使用 JmsListener 配置接口来创建它们。 使用主题“消费者”下面的代码可以成功地创建和使用消息。但是,问题是它创造的消费者并不持久。我在工厂中将clientId、setSubscripti
我刚下载了这个项目: 但当我尝试使用推荐的命令运行服务器时: 我得到以下输出: 如你所见,有一行是这样写的: