当前位置: 首页 > 知识库问答 >
问题:

为什么是垂直。从多个线程并发调用x worker verticle?

林俊晖
2023-03-14

我的vertx(4.0.2)应用程序是用Java(11)编写的,它使用了一些数据量大的垂直项,这些垂直项会导致延迟峰值,因为eventloop会被它们阻塞一段时间。出于这个原因,我想将这些垂直链接部署为工作垂直链接,这样eventloop和其他垂直链接就不再被阻塞。

不幸的是,我的应用程序现在崩溃了,因为顶点内的事件处理是由多个线程并发执行的;(

如果我正确理解vertx文档,则不应发生这种情况:

Vert从不并发执行Worker verticle实例。x由多个线程执行,但可以在不同的时间由不同的线程执行。

我用一个简单的例子再现了这个问题:

@Slf4j
public class WorkerTest extends AbstractVerticle {
  private static final String ADDRESS = "address";
  private volatile String currentThread = null;
  private long counter = 0;

  @Override
  public void start(final Promise<Void> startPromise) {
    vertx.eventBus().consumer(ADDRESS, this::handleMessage);
    startPromise.complete();
  }

  private void handleMessage(Message<Object> message) {
    final var _currentThread = this.currentThread;
    final var thisThread = Thread.currentThread().getName();

    if (_currentThread != null) {
      log.error(
          "concurrent callback: current thread={}, this thread={}", _currentThread, thisThread);
      return;
    }

    try {
      this.currentThread = thisThread;
      Thread.sleep(2);
      if (++counter % 100L == 0) {
        log.info("received {} messages (current thread: {})", counter, thisThread);
      }
    } catch (Exception e) {
    } finally {
      this.currentThread = null;
    }
  }

  public static void main(String[] args) {
    final Vertx vertx = Vertx.vertx();

    vertx.deployVerticle(
        new WorkerTest(),
        new DeploymentOptions().setWorker(true),
        result -> {
          if (result.failed()) {
            System.exit(1);
            return;
          }

          for (int i = 0; i < 1000; ++i) {
            vertx.eventBus().send(ADDRESS, "test");
          }
        });
  }
}

执行此操作会导致许多日志错误,因为同时从多个线程调用handleMessage。如果我以非工作方式部署垂直体,这将按预期工作。

我做错了什么?

共有1个答案

邬英武
2023-03-14

vertx 4.0.2似乎是你的情况下的问题。使用vertx 4.0.3和以下代码:


public class WorkerTest extends AbstractVerticle {
    private static final String ADDRESS = "address";

    private volatile boolean handleMessageInExecution = false;

    public static void main(String[] args) {
        final Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(
                WorkerTest::new,
                new DeploymentOptions()
                        .setInstances(2)
                        .setWorkerPoolSize(10)
                        .setWorker(true)
                ,
                result -> {
                    for (int i = 0; i < 100; ++i) {
                        vertx.eventBus().send(ADDRESS, "test " + i);
                    }
                });
    }

    @Override
    public void start(final Promise<Void> startPromise) {
        vertx.eventBus().localConsumer(ADDRESS, this::handleMessage);
        startPromise.complete();
    }

    private void handleMessage(Message<String> message) {
        if (handleMessageInExecution) {
            // this should never happen, since each thread that sets this to true, will also set it to
            // false on exit.
            System.out.println(message.body() + " ERROR");
            return;
        }

        handleMessageInExecution = true; // this thread is now executing handleMessage
        System.out.println(message.body() + " START   " + Thread.currentThread());

        try {
            Thread.sleep(1); // block thread for a moment to simulate heavy load
        } catch (Exception e) {
            // ignore interruption
            e.printStackTrace();
        } finally {
            handleMessageInExecution = false; // we are done executing
            System.out.println(message.body() + " END     " + Thread.currentThread());
        }
    }
}

我们看到这个输出,这是预期的(每个消息由一个线程处理,它从头到尾运行,没有并发,最多2条消息,同时我们有2个实例):

test 1 START   Thread[vert.x-worker-thread-2,5,main]
test 0 START   Thread[vert.x-worker-thread-3,5,main]
test 0 END     Thread[vert.x-worker-thread-3,5,main]
test 1 END     Thread[vert.x-worker-thread-2,5,main]
test 2 START   Thread[vert.x-worker-thread-3,5,main]
test 3 START   Thread[vert.x-worker-thread-2,5,main]
test 3 END     Thread[vert.x-worker-thread-2,5,main]
test 2 END     Thread[vert.x-worker-thread-3,5,main]
test 5 START   Thread[vert.x-worker-thread-2,5,main]
test 4 START   Thread[vert.x-worker-thread-3,5,main]
test 4 END     Thread[vert.x-worker-thread-3,5,main]
test 6 START   Thread[vert.x-worker-thread-3,5,main]
test 5 END     Thread[vert.x-worker-thread-2,5,main]
test 7 START   Thread[vert.x-worker-thread-2,5,main]
test 6 END     Thread[vert.x-worker-thread-3,5,main]
test 8 START   Thread[vert.x-worker-thread-3,5,main]
test 7 END     Thread[vert.x-worker-thread-2,5,main]
test 9 START   Thread[vert.x-worker-thread-2,5,main]
test 8 END     Thread[vert.x-worker-thread-3,5,main]
test 10 START   Thread[vert.x-worker-thread-3,5,main]
test 9 END     Thread[vert.x-worker-thread-2,5,main]
test 11 START   Thread[vert.x-worker-thread-2,5,main]
test 10 END     Thread[vert.x-worker-thread-3,5,main]
...
 类似资料:
  • 在浏览vert.x Java devs guide时,我发现vert.x在每个CPU线程上附加了2个事件循环。与每个CPU线程1个事件循环相比,这对性能有何影响?

  • 我知道,在vert. x中,默认的版本将在每次我们不需要在处理程序中写入线程安全时运行在相同的事件循环中。 例如,如果我有一个运行HttpServer的版本- } 可以保证,在任何时候,我的请求处理程序都会在两个事件循环上被调用两次(针对2个不同的请求)。因此,我不必在请求处理程序中处理线程安全问题。 如果我运行多个HttpServer顶点实例- 我需要注意线程安全吗?多个请求处理程序(最大值=1

  • 问题内容: 我一直在寻找一些方法来轻松地对我的一些简单分析代码进行多线程处理,因为我注意到numpy仅使用一个内核,尽管事实上它应该是多线程的。 我知道numpy是为多个内核配置的,因为我可以看到使用numpy.dot的测试使用了我的所有内核,因此我只是将Mean重新实现为点积,并且运行速度更快。是否有某些原因意味着不能自己快速运行?我发现较大的数组具有类似的行为,尽管该比率比示例中显示的3接近2

  • 主要内容:1 什么是Java多线程,2 Java多线程的优点,3 进程与线程区别,4 Java Thread类,5 Java Thread类的方法1 什么是Java多线程 Java 多线程指的是同时执行多个线程的处理。 线程是轻量级子进程,是最小的处理单元。多处理和多线程都用于实现多任务。 但是,我们使用多线程而不是多进程,因为线程使用共享内存区域。它们不分配单独的内存区域,因此节省了内存,并且线程之间的上下文切换比进程花费的时间更少。 线程是轻量级子进程,是最小的处理单元。这是一条单独的执行路

  • 本文向大家介绍为什么要使用多线程?相关面试题,主要包含被问及为什么要使用多线程?时的应答技巧和注意事项,需要的朋友参考一下 先从总体上来说: 从计算机底层来说:线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。 从当代互联网发展趋势来说:现在的系统动不动就要求百万级甚至千万级的并

  • 并发是什么?引用Rob Pike的经典描述: 并发是同一时间应对多件事情的能力 其实在我们身边就有很多并发的事情,比如一边上课,一边发短信;一边给小孩喂奶,一边看电视,只要你细心留意,就会发现许多类似的事。相应地,在软件的世界里,我们也会发现这样的事,比如一边写博客,一边听音乐;一边看网页,一边下载软件等等。显而易见这样会节约不少时间,干更多的事。然而一开始计算机系统并不能同时处理两件事,这明显满