在我的Spring Boot应用程序中,我有一个组件,用来监视另一个外部系统的运行状况。该组件还提供了一种公共方法,反应链可以订阅该方法,以等待外部系统启动。
@Component
public class ExternalHealthChecker {
private static final Logger LOG = LoggerFactory.getLogger(ExternalHealthChecker.class);
private final WebClient externalSystemWebClient = WebClient.builder().build(); // config omitted
private volatile boolean isUp = true;
private volatile CompletableFuture<String> completeWhenUp = new CompletableFuture<>();
@Scheduled(cron = "0/10 * * ? * *")
private void checkExternalSystemHealth() {
webClient.get() //
.uri("/health") //
.retrieve() //
.bodyToMono(Void.class) //
.doOnError(this::handleHealthCheckError) //
.doOnSuccess(nothing -> this.handleHealthCheckSuccess()) //
.subscribe(); //
}
private void handleHealthCheckError(final Throwable error) {
if (this.isUp) {
LOG.error("External System is now DOWN. Health check failed: {}.", error.getMessage());
}
this.isUp = false;
}
private void handleHealthCheckSuccess() {
// the status changed from down -> up, which has to complete the future that might be currently waited on
if (!this.isUp) {
LOG.warn("External System is now UP again.");
this.isUp = true;
this.completeWhenUp.complete("UP");
this.completeWhenUp = new CompletableFuture<>();
}
}
public Mono<String> waitForExternalSystemUPStatus() {
if (this.isUp) {
LOG.info("External System is already UP!");
return Mono.empty();
} else {
LOG.warn("External System is DOWN. Requesting process can now wait for UP status!");
return Mono.fromFuture(completeWhenUp);
}
}
}
方法waitForExtranalSystemUPState
是公共的,可以从许多不同的线程调用。这背后的想法是为应用程序中的一些反应性磁链提供一种暂停其处理的方法,直到外部系统启动。当外部系统关闭时,这些链无法处理其元素。
someFlux
.doOnNext(record -> LOG.info("Next element")
.delayUntil(record -> externalHealthChecker.waitForExternalSystemUPStatus())
... // starting processing
这里的问题是,我无法真正理解这段代码的哪一部分需要同步。我认为多个线程同时调用waitForExternalSystemUPStatus
不应该有问题,因为这个方法没有写任何东西。所以我觉得这个方法不需要同步。然而,用@Scheduled
注释的方法也将在它自己的线程上运行,并且实际上将写入isUp
的值,并且还可能将completeWhenUp
的引用更改为一个新的、未完成的未来实例。我已经用volatile
标记了这两个可变属性,因为在Java中阅读这个关键字,我觉得它有助于确保读取这两个值的线程看到最新的值。但是,我不确定是否还需要在部分代码中添加synchronized
关键字。我也不确定synchronized关键字是否与reactor代码配合得很好,我很难找到这方面的信息。也许还有一种方法可以以更完整、反应式的方式提供ExternalHealthChecker
的功能,但我想不出任何方法。
我强烈反对这种做法。像这样的线程代码的问题是,它变得非常难以理解
应该有一个更可靠的方法
healthCheckStream = Flux.interval(Duration.ofMinutes(10))
.flatMap(i ->
webClient.get().uri("/health")
.retrieve()
.bodyToMono(String.class)
.map(s -> true)
.onErrorResume(e -> Mono.just(false)))
.cache(1);
...其中healthCheckStream
是通量类型的字段
这实际上是每10分钟创建一个healthcheck响应值流,总是缓存最新的响应,并将其转化为热源。这意味着“在订阅之前什么都不会发生”在这种情况下不适用——流量将立即开始执行,任何线程上出现的任何新订阅都将始终获得最新结果,无论是通过还是失败
handlehalthchecksuccess()
和handlehalthcheckerror()
,已启动
,完成当所有的
都是多余的时,它们就可以去了,然后你的
waitForExternalSystemUPStatus()
就可以变成一行:
return healthCheckStream.filter(x -> x).next();
...然后工作完成,你可以从任何地方调用它,你将有一个
Mono
,只有在系统启动时才能完成。
问题内容: BlockingQueue的文档说,批量操作不是线程安全的,尽管它没有明确提及方法方法toitTo()。 BlockingQueue实现是线程安全的。所有排队方法都是使用内部锁或其他形式的并发控制来原子地实现其效果的。但是,除非在实现中另行指定,否则批量Collection操作addAll,containsAll,retainAll和removeAll不一定是原子执行的。因此,例如,仅
问题内容: servlet是线程安全的吗?例如,如果我打开5个不同的浏览器并向容器中的一个servlet发送请求,它是否仍然是线程安全的,我特别指的是方法 问题答案: 您的问题可以归结为: 正在从同一对象线程safe上的多个线程调用方法 。答案是: 取决于 。如果您的对象(让它成为servlet)是无状态的或仅具有字段,则这是完全线程安全的。局部变量和参数是线程局部的(驻留在堆栈上,而不是堆上)。
我正在研究Pytorch的线性回归问题 我在单变量情况下取得了成功,但是当我执行多变量线性回归时,我得到了以下错误。我应该如何执行多变量线性回归? TypeError Traceback(最近调用最后一次)in()9optimizer.zero_grad()#渐变10输出=模型(输入)#输出--- /anaconda/envs/tensorflow/lib/python3.6/site-packa
我得到的是它们是线程安全的。 杰里米·曼森博客的片段- 因为this引用存储在lastconstructed中“,因此转义构造函数 请建议。
关于一次从多个线程访问局部变量的几个问题: > < li> 我有多个线程写入和读取变量值,我是否应该同步访问它? 变量每隔几秒钟从 Thread1 更新一次,每隔几秒钟从 Thread2 读取和更新到数据库。 如果我没有任何逻辑并且没有任何并发问题,会出现哪些问题? 我应该为此使用volatile吗? 编辑: 我想强调的是,我没有任何并发问题。这是我的具体方案: 一个。我的变量名称是 ,它测量 p
问题内容: 通常,当使用嘲笑我会做类似 是否可以按照以下方式做点什么? 因此,而不是仅使用参数来确定结果时。它使用参数内的属性值来确定结果。 因此,在执行代码时,其行为如下 当前的解决方案,希望可以提出更好的建议。 问题答案: 这是一种方法。这使用一个对象来检查属性的值。 我实际上更喜欢另一种语法,它可以实现完全相同的效果。由您选择其中之一。这只是方法-测试类的其余部分应与上述相同。