当前位置: 首页 > 知识库问答 >
问题:

在Vert. x应用程序中使用项目反应器

公良天逸
2023-03-14

我在用一个垂直的图书馆。返回项目Reactor类型Mono的x应用程序

我有一个verticle,它接收这种反应类型,并打算通过事件总线将内容发送到另一个verticle:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class HelperVerticle extends AbstractVerticle
{
    public static final String ADDRESS = "address_1";

    @Override
    public void start() throws Exception
    {
        vertx.eventBus().consumer(ADDRESS, this::consume);
    }

    private void consume(Message<Object> message)
    {
        Mono.delay(Duration.ofMillis(3000)) 
            .thenReturn("Content of Mono.") // this would come from external library
            .publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
            .subscribe(output ->
            {
                System.out.println("My verticle: " + Thread.currentThread().getName());
                message.reply(output + " " + message.body());
            }, error -> message.fail(1, error.getMessage()));
    }
}

这是正确的方法吗?我应该换成垂直吗。在将消息发送到事件总线之前,x事件循环线程池?在一起使用这些库时,有什么我应该注意的吗?

共有1个答案

荆钱明
2023-03-14

代码在我看来很好,只是不应该使用Netty事件循环组作为执行器,而应该使用verticle上下文:

public class HelperVerticle extends AbstractVerticle
{
    public static final String ADDRESS = "address_1";

    private Scheduler scheduler;

    @Override
    public void start() throws Exception
    {
        scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
        vertx.eventBus().consumer(ADDRESS, this::consume);
    }

    private void consume(Message<Object> message)
    {
        Mono.delay(Duration.ofMillis(3000)) 
            .thenReturn("Content of Mono.") // this would come from external library
            .publishOn(scheduler)
            .subscribe(output ->
            {
                System.out.println("My verticle: " + Thread.currentThread().getName());
                message.reply(output + " " + message.body());
            }, error -> message.fail(1, error.getMessage()));
    }
}

使用这样的调度程序,您可以确保顶点状态不会被分配给它的事件循环以外的线程修改。

 类似资料:
  • 当我键入npx create react app project时,突然出现了这个错误 错误是:包config/home/swaraj/中定义的“exports”主目标“index.js”无效。npm/_npx/8451/lib/node_modules/create react app/node_modules/is promise/package。json 谁能告诉我这个错误是什么以及如何解决

  • 创建一个基于Django3的全栈项目似乎是最佳实践(或至少一种常用方法),该项目使用特定于项目的Django应用程序(而不是独立的Django应用程序),其结构如下(参见此处): 应用程序(此处:)通过以最基本的形式(没有规则、视图等)集成 和 在另一个应用程序(例如)中使用一个应用程序的逻辑(例如)可以很好地工作(例如通过中的)。 但是,如果我试图在app2的逻辑中访问app1的模型,我会得到。

  • 我想知道IDE中的Jave EE项目和Java Web项目在方面的主要区别。事实上,如果选择Java web类别,您可以在IDE中基于、和JavaServer Faces创建web应用程序。 另一方面,选择项目类别为Jave EE项目,NetBeans IDE将创建3个子项目,例如:StoreApp(企业应用项目)、StoreApp-ejb(EJB项目)和StoreApp-war(Web项目)。

  • 在我将依赖项添加到JavaFX程序中并创建了一个名为Algorithm wth two classes的新包后,我的应用程序无法启动。 会引发以下异常: 原因:java.lang.RuntimeException:应用程序启动方法中的异常原因:java.lang.NullPointerException:需要位置。 我尝试了: JavaFX“location is required”。即使它在应用

  • 我试图理解我们什么时候需要使用这个应用程序。在我们的node Express中使用 当我在网上搜索时,我在reddit上偶然发现了这个答案,它说明了应用程序之间的区别。获取和应用程序。使用 在此基础上,我总结了以下几点。 充当超级路由或中间件?这意味着它在? 此外,如果有人能添加更多关于app.use.的信息/练习,我将不胜感激