当前位置: 首页 > 知识库问答 >
问题:

通过侦探进行Hazelcast追踪

韩弘阔
2023-03-14

我想知道Hazelcast中是否有一些侦探的集成。在我的应用程序中,我有Hazelcast队列,其中配置了用于addEntity事件的事件侦听器,问题是一旦该侦听器触发,跨度似乎就会中断。我知道ExecutorService集成了侦探,但com.hazelcast.core.ItemListener是否有类似的东西?提前谢谢。

UPD:提供更多细节。我有一些使用spring cloud sleth和hazelcast队列的示例服务

package com.myapp;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.html" target="_blank">beans.factory.annotation.Autowired;
import org.springframework.cloud.sleuth.DefaultSpanNamer;
import org.springframework.cloud.sleuth.TraceRunnable;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class SomeService {

private HazelcastInstance hazelcastInstance =
    Hazelcast.newHazelcastInstance();
private IQueue<String> queue = hazelcastInstance.getQueue("someQueue");

private Tracer tracing;


@Autowired(required = false)
public void setTracer(Tracer tracer) {
    this.tracing = tracer;
}


{
    queue.addItemListener(new ItemListener<String>() {
    @Override
    public void itemAdded(ItemEvent<String> item) {
        log.info("This is span");
        log.info("This is item " + item);
    }

    @Override
    public void itemRemoved(ItemEvent<String> item) {
    }
    }, true);
}

@Async
public void processRequestAsync() {
    log.info("Processing async");
    log.info("This is span");
    Executors.newSingleThreadExecutor().execute(
        new TraceRunnable(tracing, new DefaultSpanNamer(), () -> log.info("Some Weird stuff")));
    queue.add("some stuff");
}

}

调用processRequestAsync后,我在控制台中收到以下输出:

INFO [-,792a6c3ad3e91280,792a6c3ad3e91280,false] 9996 --- [nio-8080-exec-2] com.myapp.SomeController            : Incoming request!
INFO [-,792a6c3ad3e91280,792a6c3ad3e91280,false] 9996 --- [nio-8080-exec-2] com.myapp.SomeController            : This is current span [Trace: 792a6c3ad3e91280, Span: 792a6c3ad3e91280, Parent: null, exportable:false]
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] com.myapp.SomeService               : Processing async
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] com.myapp.SomeService               : This is span
INFO [-,792a6c3ad3e91280,8a2f0a9028f44979,false] 9996 --- [pool-1-thread-1] com.myapp.SomeService               : Some Weird stuff
INFO [-,792a6c3ad3e91280,7d0c06d3e24a7ba1,false] 9996 --- [cTaskExecutor-1] c.h.i.p.impl.PartitionStateManager       : [10.236.31.22]:5701 [dev] [3.8.3] Initializing cluster partition table arrangement...
INFO [-,,,] 9996 --- [e_1_dev.event-4] com.myapp.SomeService               : This is span
INFO [-,,,] 9996 --- [e_1_dev.event-4] com.myapp.SomeService               : This is item ItemEvent{event=ADDED, item=some stuff, member=Member [10.236.31.22]:5701 - b830dbf0-0977-42a3-a15d-800872221c84 this} 

所以,一旦我们转到eventListener代码,span似乎被中断了,我想知道如何在hazelcast队列中传播或创建新的span

共有2个答案

蓟俊杰
2023-03-14

我无法让它为ItemListeners工作。我认为我们需要能够将Hazelcast的StripedExecutor封装在类似LazyTraceThreadPoolTaskExecutor的东西中(但它接受普通的Executor委托,而不是ThreadPoolTaskExecutor)。

对于EntryProcessors,我已经将其整合在一起。用于创建EntryProcessors的工厂,从创建处理器的线程传入当前范围。当处理器运行时,它将该范围用作执行器线程中的父范围。

@Component
public class SleuthedEntryProcessorFactory {

    private final Tracer tracer;

    public SleuthedEntryProcessorFactory(Tracer tracer) {
        this.tracer = tracer;
    }

    /**
     * Create an entry processor that will continue the Sleuth span of the thread 
     * that invokes this method.
     * Mutate the given value as required. It will then be set on the entry.
     *
     * @param name name of the span
     * @param task task to perform on the map entry
     */
    public <K, V, R> SleuthedEntryProcessor<K, V, R> create(String name, Function<V, R> task) {
        return new SleuthedEntryProcessor<>(name, tracer.getCurrentSpan(), task);
    }
}

/**
 * Copies the MDC context (which contains Sleuth's trace ID, etc.) and the current span
 * from the thread that constructs this into the thread that runs this.

 * @param <K> key type
 * @param <V> value type
 * @param <R> return type
 */
@SpringAware
public class SleuthedEntryProcessor<K, V, R> extends AbstractEntryProcessor<K, V> {

    private final Map<String, String> copyOfContextMap;
    private final String name;
    private final Span parentSpan;
    private final Function<V, R> task;
    private transient Tracer tracer;

    public SleuthedEntryProcessor(String name, Span parentSpan, Function<V, R> task) {
        this(name, parentSpan, task, true);
    }

    public SleuthedEntryProcessor(
            String name, Span parentSpan, Function<V, R> task, boolean applyOnBackup) {
        super(applyOnBackup);
        this.name = name + "Hz";
        this.parentSpan = parentSpan;
        this.task = task;
        copyOfContextMap = MDC.getCopyOfContextMap();
    }

    @Override
    public final R process(Map.Entry<K, V> entry) {
        if (nonNull(copyOfContextMap)) {
            MDC.setContextMap(copyOfContextMap);
        }

        Span span = tracer.createSpan(toLowerHyphen(name), parentSpan);
        try {
            V value = entry.getValue();
            // The task mutates the value.
            R result = task.apply(value);
            // Set the mutated value back onto the entry.
            entry.setValue(value);
            return result;

        } finally {
            MDC.clear();
            tracer.close(span);
        }
    }

    @Autowired
    public void setTracer(Tracer tracer) {
        this.tracer = tracer;
    }
}

然后将EntryProcessor传递给IMap,如下所示:

Function<V, R> process = ...;
SleuthedEntryProcessor<K, V, R> entryProcessor = sleuthedEntryProcessorFactory.create(label, process);
Map<K, R> results = iMap.executeOnEntries(entryProcessor);
米迪
2023-03-14

Sleuth(撰写本文时)不支持Hazelcast。

这个解决方案比Hazelcast更通用——你需要通过Zipkin的《勇敢》。在客户端和服务器之间跨越,但要勇敢。Span不可序列化。

Zipkin提供了一种解决这个问题的方法。

给定一个勇敢的。客户端上的Span,您可以将其转换为java.util.Map

Span span = ...
Map<String, String> map = new HashMap<>();

tracing.propagation().injector(Map<String, String>::put).inject(span.context(), map);

在服务器上,您可以转换java。util。映射回勇敢者。Span:

Span span = tracer.toSpan(tracing.propagation().extractor(Map<String, String>::get).extract(map).context())

java的使用。util。Map显然可以根据需要更换,但原理是一样的。

 类似资料:
  • 我从sping-cloud-sleuth-core中找到restTemplateInterceptor和feignRequest estInterceptor,但是我们的项目使用的是hessian连接微服务,我发现sping-cloud-sleuth无法注入到hessian客户端。有人可以分享一下如何在hessian中使用sping-cloud-sleuth的代码吗?谢谢~

  • 当应用程序使用maven时,我有一个Spring Cloud Sleuth Stream应用程序正在工作并发送到我的本地OpenZipkin(docker),但是当我尝试运行gradle Spring boot应用程序时,Zipkin不会显示跟踪。有趣的是,Spring日志似乎显示了正确的跟踪信息。。因此,应用程序本身正在识别侦探,但出于某种原因,Zipkin要么没有收到信息,要么没有显示。我进入

  • 我正在尝试实现Slueth,用于spring boot微服务的分布式跟踪,这些微服务通过消息传递通道相互通信。 其中一个微服务是一个调度器,它接收一天内创建的新消费者。然后,它以异步方式为每个消费者的数据运行分组过程。 现在,我使用traceableExeucutorService将为调度程序线程生成的sleuth跟踪传递给每个使用者的子线程。 跟踪配置 调度程序服务 这最终会为每个消费者使用相同

  • 如果该成员被弹出,我会看到MapLoader从数据库重新加载数据时,条目添加的侦听器被激发。 然而,这向客户机提示已经添加了新条目,而实际上,它们只是由于节点引导而被“添加”的。 基本上,我不希望这些侦听器由于MapLoader引导映射而被解雇--它们应该只在之后被解雇。 如何阻止这些MapLoader事件从EntryAdded侦听器中触发?

  • 我的一些微服务使用log4j2作为记录器。Spring cloud Sleuth支持logback。在这个场景中,我如何使用Sleuth来获得分布式跟踪。我明白用log4j2使用sleuth,我必须实现某些类。我试过了,但没有运气。请帮忙

  • 当你运行一个公开站点时,你应该始终关闭DEBUG 设置。这会使你的服务器运行得更快,也会防止恶意用户看到由错误页面展示的一些应用细节。 但是,运行在 DEBUG为False的情况下,你不会看到你的站点所生成的错误 -- 每个人都只能看到公开的错误页面。你需要跟踪部署的站点上的错误,所以可以配置Django来生成带有错误细节的报告。 报告邮件 服务器错误 DEBUG 为 False的时候,无论什么时