当前位置: 首页 > 知识库问答 >
问题:

基于被动Mongo和Web客户端的非阻塞函数方法

陈胤
2023-03-14

我有一个微服务,它使用reactivemongorepository接口从数据库中读取对象。

目标是获取这些对象中的每一个,并将其推送给AWS Lambda函数(在将其转换为DTO之后)。如果lambda函数的结果在200范围内,则将对象标记为成功,否则忽略。

在以前使用简单的Mongo存储库和RestTemplate的时候,这将是一项琐碎的任务。然而,我试图理解这种被动的交易,并避免阻塞。

@Override
public Flux<Video> index() {
    return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        // Blocking call
        final HttpStatus httpStatus = webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO)).exchange()
                .block()
                .statusCode();

        if (httpStatus.is2xxSuccessful()) {
            video.setIndexed(true);
        }

        return videoRepository.save(video);
    });
}
@Scheduled(fixedDelay = 60000)
public void indexTask() {
    indexService
            .index()
            .log()
            .subscribe();
}

我读过很多关于这个主题的博客文章,但它们都只是简单的CRUD操作,中间没有发生任何事情,所以我并没有真正给出一个如何实现这些事情的完整画面。

有人帮忙吗?

共有1个答案

司空繁
2023-03-14

你的解决方案其实相当接近。在这些情况下,您应该尝试分步骤分解反应链,并毫不犹豫地将位转换为独立的方法以求清晰。

@Override
public Flux<Video> index() {

    Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
    return unindexedVideos.flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        Mono<ClientResponse> indexedResponse = webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO)).exchange()
            .filter(res -> res.statusCode().is2xxSuccessful());

        return indexedResponse.flatMap(response -> {
            video.setIndexed(true);
            return videoRepository.save(video);
        });
    });
 类似资料:
  • 问题内容: 我想在Python中运行一个程序,该程序每秒通过Web套接字向Tornado服务器发送一条消息。我一直在websocket-client上使用该示例; 该示例不起作用,因为它将停止while循环的执行。 有人可以给我一个例子,说明如何正确地将其实现为线程类,我既可以调用它的send方法,又可以接收消息? 问题答案: 在他们的github页面上有一个例子可以做到这一点。好像您是从该示例开

  • 我希望客户不会等待4秒然后得到实际的结果。如您所见,服务器在22:44:21.126上开始发出onNext(),客户端在22:44:24.159上获得结果。所以我不明白如果webclient有这种行为,为什么它被称为非阻塞客户端。

  • 我正在尝试了解如何为我的方法编写 WebClient api 调用。下面是一个简单的示例: 我知道当我成功完成请求时,它将执行 /,这是所需的行为。 但是,我拥有的方法的常见做法是什么?我习惯于将一些字符串值返回给调用方,可能是成功/失败消息,或者对于其他情况,我返回 body 以便进一步处理。 对于我目前正在处理的更实际的示例,我调用 WebClient 来获取访问令牌,并在另一个 Web 客户

  • 我有一个顶点,它有一个处理程序,可以在事件循环线程中调用Vertx的Web客户端。实际的底层API调用是同步的还是异步的?它会阻塞我的事件循环线程吗?假设我的API调用需要30秒才能返回。 我是否需要用Vertx.execute阻塞(p-

  • 我想知道,如果可能的话,如何执行在 C 中创建/模拟 java 服务器套接字的任务?我是C的新手,但我相当精通Java。我的服务器(用java编写)需要从所有Java / C客户端接收数据(数据使用JSON Strings传输),但我不确定如何在C中与NIO服务器建立连接。 提前感谢任何帮助!

  • 我完全混淆了,,。 哪个是阻塞,哪个不是? 我的意思是如果我使用父进程是否等待子进程返回/才继续执行。 如何影响这些调用?