当前位置: 首页 > 知识库问答 >
问题:

Java中基于可变属性返回Mono的方法的线程安全性

冯阳云
2023-03-14

在我的Spring Boot应用程序中,我有一个组件,用来监视另一个外部系统的运行状况。该组件还提供了一种公共方法,反应链可以订阅该方法,以等待外部系统启动。

@Component
public class ExternalHealthChecker {
  private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);

  private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted

  private volatile boolean isUp = true;
  private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();

  @Scheduled(cron = "0/10 * * ? * *")
  private void checkExternalSystemHealth() {
    webClient.get() //
        .uri("/health") //
        .retrieve() //
        .bodyToMono(Void.class) //
        .doOnError(this::handleHealthCheckError) //
        .doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
        .subscribe(); //
  }

  private void handleHealthCheckError(final Throwable error) {
    if (this.isUp) {
      LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
    }
    this.isUp = false;
  }

  private void handleHealthCheckSuccess() {
  // the status changed from down -> up, which has to complete the future that might be currently waited on  
  if (!this.isUp) {
      LOG.warn("External System is now UP again.");
      this.isUp = true;
      this.completeWhenUp.complete("UP");
      this.completeWhenUp = new CompletableFuture<>();
    }
  }


  public Mono<String> waitForExternalSystemUPStatus() {
    if (this.isUp) {
      LOG.info("External System is already UP!");
      return Mono.empty();
    } else {
      LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
      return Mono.fromFuture(completeWhenUp);
    }
  }
}

方法waitForExtranalSystemUPState是公共的,可以从许多不同的线程调用。这背后的想法是为应用程序中的一些反应性磁链提供一种暂停其处理的方法,直到外部系统启动。当外部系统关闭时,这些链无法处理其元素

someFlux
  .doOnNext(record -> LOG.info("Next element")
  .delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
  ... // starting processing

这里的问题是,我无法真正理解这段代码的哪一部分需要同步。我认为多个线程同时调用waitForExternalSystemUPStatus不应该有问题,因为这个方法没有写任何东西。所以我觉得这个方法不需要同步。然而,用@Scheduled注释的方法也将在它自己的线程上运行,并且实际上将写入isUp的值,并且还可能将completeWhenUp的引用更改为一个新的、未完成的未来实例。我已经用volatile标记了这两个可变属性,因为在Java中阅读这个关键字,我觉得它有助于确保读取这两个值的线程看到最新的值。但是,我不确定是否还需要在部分代码中添加synchronized关键字。我也不确定synchronized关键字是否与reactor代码配合得很好,我很难找到这方面的信息。也许还有一种方法可以以更完整、反应式的方式提供ExternalHealthChecker的功能,但我想不出任何方法。

共有1个答案

陈富
2023-03-14

我强烈反对这种做法。像这样的线程代码的问题是,它变得非常难以理解

应该有一个更可靠的方法

healthCheckStream = Flux.interval(Duration.ofMinutes(10))
        .flatMap(i ->
                webClient.get().uri("/health")
                        .retrieve()
                        .bodyToMono(String.class)
                        .map(s -> true)
                        .onErrorResume(e -> Mono.just(false)))
        .cache(1);

...其中healthCheckStream通量类型的字段

这实际上是每10分钟创建一个healthcheck响应值流,总是缓存最新的响应,并将其转化为热源。这意味着“在订阅之前什么都不会发生”在这种情况下不适用——流量将立即开始执行,任何线程上出现的任何新订阅都将始终获得最新结果,无论是通过还是失败handlehalthchecksuccess()handlehalthcheckerror()已启动完成当所有的都是多余的时,它们就可以去了,然后你的waitForExternalSystemUPStatus()就可以变成一行:

return healthCheckStream.filter(x -> x).next();

...然后工作完成,你可以从任何地方调用它,你将有一个Mono,只有在系统启动时才能完成。

 类似资料:
  • 问题内容: BlockingQueue的文档说,批量操作不是线程安全的,尽管它没有明确提及方法方法toitTo()。 BlockingQueue实现是线程安全的。所有排队方法都是使用内部锁或其他形式的并发控制来原子地实现其效果的。但是,除非在实现中另行指定,否则批量Collection操作addAll,containsAll,retainAll和removeAll不一定是原子执行的。因此,例如,仅

  • 问题内容: servlet是线程安全的吗?例如,如果我打开5个不同的浏览器并向容器中的一个servlet发送请求,它是否仍然是线程安全的,我特别指的是方法 问题答案: 您的问题可以归结为: 正在从同一对象线程safe上的多个线程调用方法 。答案是: 取决于 。如果您的对象(让它成为servlet)是无状态的或仅具有字段,则这是完全线程安全的。局部变量和参数是线程局部的(驻留在堆栈上,而不是堆上)。

  • 我正在研究Pytorch的线性回归问题 我在单变量情况下取得了成功,但是当我执行多变量线性回归时,我得到了以下错误。我应该如何执行多变量线性回归? TypeError Traceback(最近调用最后一次)in()9optimizer.zero_grad()#渐变10输出=模型(输入)#输出--- /anaconda/envs/tensorflow/lib/python3.6/site-packa

  • 我得到的是它们是线程安全的。 杰里米·曼森博客的片段- 因为this引用存储在lastconstructed中“,因此转义构造函数 请建议。

  • 关于一次从多个线程访问局部变量的几个问题: > < li> 我有多个线程写入和读取变量值,我是否应该同步访问它? 变量每隔几秒钟从 Thread1 更新一次,每隔几秒钟从 Thread2 读取和更新到数据库。 如果我没有任何逻辑并且没有任何并发问题,会出现哪些问题? 我应该为此使用volatile吗? 编辑: 我想强调的是,我没有任何并发问题。这是我的具体方案: 一个。我的变量名称是 ,它测量 p

  • 问题内容: 好吧,考虑下面给出的不可变类: 现在,我正在一个类中创建一个对象,该对象的对象将由多个线程共享: 看到as 并移入同步块并创建对象。现在,由于 Java内存模型(JMM)允许多个线程在初始化开始之后但尚未结束之前观察对象。 因此,可以将写入操作视为在写入的字段之前发生。因此,因此可以看到部分构造,该构造很可能处于无效状态,并且其状态以后可能会意外更改。 它不是非线程安全的吗? 编辑 好