我有一个微服务,它使用reactivemongorepository
接口从数据库中读取对象。
目标是获取这些对象中的每一个,并将其推送给AWS Lambda函数(在将其转换为DTO之后)。如果lambda函数的结果在200范围内,则将对象标记为成功,否则忽略。
在以前使用简单的Mongo存储库和RestTemplate的时候,这将是一项琐碎的任务。然而,我试图理解这种被动的交易,并避免阻塞。
@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}
@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}
我读过很多关于这个主题的博客文章,但它们都只是简单的CRUD操作,中间没有发生任何事情,所以我并没有真正给出一个如何实现这些事情的完整画面。
有人帮忙吗?
你的解决方案其实相当接近。在这些情况下,您应该尝试分步骤分解反应链,并毫不犹豫地将位转换为独立的方法以求清晰。
@Override
public Flux<Video> index() {
Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
return unindexedVideos.flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
Mono<ClientResponse> indexedResponse = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.filter(res -> res.statusCode().is2xxSuccessful());
return indexedResponse.flatMap(response -> {
video.setIndexed(true);
return videoRepository.save(video);
});
});
问题内容: 我想在Python中运行一个程序,该程序每秒通过Web套接字向Tornado服务器发送一条消息。我一直在websocket-client上使用该示例; 该示例不起作用,因为它将停止while循环的执行。 有人可以给我一个例子,说明如何正确地将其实现为线程类,我既可以调用它的send方法,又可以接收消息? 问题答案: 在他们的github页面上有一个例子可以做到这一点。好像您是从该示例开
我希望客户不会等待4秒然后得到实际的结果。如您所见,服务器在22:44:21.126上开始发出onNext(),客户端在22:44:24.159上获得结果。所以我不明白如果webclient有这种行为,为什么它被称为非阻塞客户端。
我正在尝试了解如何为我的方法编写 WebClient api 调用。下面是一个简单的示例: 我知道当我成功完成请求时,它将执行 /,这是所需的行为。 但是,我拥有的方法的常见做法是什么?我习惯于将一些字符串值返回给调用方,可能是成功/失败消息,或者对于其他情况,我返回 body 以便进一步处理。 对于我目前正在处理的更实际的示例,我调用 WebClient 来获取访问令牌,并在另一个 Web 客户
我有一个顶点,它有一个处理程序,可以在事件循环线程中调用Vertx的Web客户端。实际的底层API调用是同步的还是异步的?它会阻塞我的事件循环线程吗?假设我的API调用需要30秒才能返回。 我是否需要用Vertx.execute阻塞(p-
我想知道,如果可能的话,如何执行在 C 中创建/模拟 java 服务器套接字的任务?我是C的新手,但我相当精通Java。我的服务器(用java编写)需要从所有Java / C客户端接收数据(数据使用JSON Strings传输),但我不确定如何在C中与NIO服务器建立连接。 提前感谢任何帮助!
我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?