当前位置: 首页 > 知识库问答 >
问题:

Vert.x Kafka 不遵守 RxJava 线程分配?

常温文
2023-03-14

给定以下代码:

kafkaConsumer
  .rxSubscription()
  .subscribeOn(Schedulers.io())
  .map(s -> {
    logger.info("Mapping on Thread: " + Thread.currentThread().getName());
    return s;
   })
  .observeOn(Schedulers.computation())
  .subscribe(
     set -> {
       logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
   });

如果kafka消费者是Vert. x Kafka消费者,我希望

.map(s -> {
  logger.info("Mapping on Thread: " + Thread.currentThread().getName());
  return s;
})

会发生在Reactive IO线程上。但是,它在Vert. x事件循环线程上执行。当我运行以下测试类时,相同的场景按照预期在IO线程上运行map方法。

public class ThreadTesting {

public static void main(String args[]) {
  Vertx vertx = Vertx.vertx();
  Observable.fromArray(new String[] {"start"})
    .flatMapSingle(s -> method1())
    .subscribeOn(Schedulers.io())
    .map(
        s -> {
          System.out.println("mapping 2 on Thread: " + Thread.currentThread().getName());
          return s.concat(method2());
        })
    .observeOn(Schedulers.computation())
    .subscribe(
        str -> {
          System.out.println("Subscribing on Thread: " + Thread.currentThread().getName());
        },
        onError -> {
          onError.printStackTrace();
        });
 }

 public static Single<String> method1() {
   System.out.println("Executing method 1 on Thread: " + Thread.currentThread().getName());
   AsyncResultSingle<String> vertxSingle = new AsyncResultSingle<>(
      h -> {
         h.handle(Future.succeededFuture("method 1 string"));
      });
    return vertxSingle;
 }

 public static String method2() {
   System.out.println("Executing method 2 on Thread: " + Thread.currentThread().getName());
   return "method 2 String";
 }
}

是什么导致线程执行中出现这种差异?

共有1个答案

酆君墨
2023-03-14

Vert.x KafkaConsumer 在事件循环线程上异步发出项目,即使您在 io 调度程序上订阅了它也是如此。

在您的片段中,您尝试强制在计算调度程序上发出项目。它有效,但不适用于您期望的可观察对象:它适用于map操作返回的可观察对象。

如果希望< code>map在< code >计算调度程序上运行,则需要在以下操作之前应用< code>observeOn运算符:

kafkaConsumer
  .rxSubscription()
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.computation())
  .map(s -> {
    logger.info("Mapping on Thread: " + Thread.currentThread().getName());
    return s;
   })
  .subscribe(
     set -> {
       logger.info("Subscribing on Thread: " +Thread.currentThread().getName());
   });
 类似资料:
  • 我的Spring boot应用程序(V2.1.1.Release)使用以下插件和布局进行打包: 我正在使用下面的启动脚本: 在我的应用程序具有的一个依赖项中,下面的代码用于读取外部xml配置文件(例如hbase-site.xml): 这就是为什么我试图使用loader.path使这些文件可用,但应用程序仍然不能读取提供的dir中的文件。我还遗漏了什么吗? 前面已经提到:Spring Boot:是否

  • 我正在尝试以一种方式管理日志记录,即最早的存档日志文件一旦达到总累积大小限制或达到最大历史记录限制,就会被删除。在Logback 1.1.7中使用时,滚动文件附加器将继续创建新的存档,尽管超过了设置。 这是日志中的一个bug还是我没有正确配置滚动文件附加器?

  • 问题内容: 结果如下: 2011-09-24 14:10:51 -0400 2011年9月24日星期六20:10:51 为什么当我解析来自format()的日期时,它不遵守时区? 问题答案: 您正在打印调用的结果,该调用 始终 使用默认时区。基本上,除了调试之外,您不应该使用其他任何东西。 不要忘记,一个不 具有 时区-它代表着一个时刻,因为自Unix纪元(午夜1970年1月1日UTC)毫秒。 如

  • 问题内容: 在管道级别,我指定代理和节点(带有标签和自定义工作区)。管道启动时,它将在指定的节点上运行,但是当命中“ build job”时,将选择第一个可用节点。我尝试使用NodeLabel插件,但是那也不起作用。 这是我的: 问题答案: 当您使用Jenkinsfile中的指令时,它告诉Jenkins您想要构建一个完全独立的作业。正是 其他工作 需要指定将要构建的代理。如果这是基于Jenkins

  • 线程分为守护线程和非守护线程(即用户线程)。   只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。   守护线程最典型的应用就是 GC (垃圾回收器)

  • 我对一个异步函数有点拘泥于此。 我要完成的是创建一个batchProcessing函数(batchGetSubs),它将循环访问一组文件,读取一个ID,然后发出一个API请求,等待一个响应(问题),然后用格式化的数据写入一个新文件。 问题--我尝试了异步和等待,以及推送承诺和尝试使用承诺.所有这些都是为了等待承诺的解决,但没有成功。当前的行为是,在API调用实际返回所有数据之前,我获取Promis