当前位置: 首页 > 知识库问答 >
问题:

Vertx/RxJava/WebClient/ApiGateway/Reactive

颛孙博易
2023-03-14

我正在使用vert. x和RxJava在Apigateway上工作。我想为2个Apis发送响应请求,从他们两个那里获得响应,并通过HttpServer发送组合JSON。但是onComplete()执行到早期并返回空JSON。我认为问题源于vert. x的异步字符,但我不知道到底出了什么问题。

以下是我的方法:

private void dispatchBoth(RoutingContext routingContext) {

    Observer<String> observer = new Observer<String>() {

        JsonArray jsonArray = new JsonArray();

        @Override
        public void onSubscribe(Disposable disposable) {
            System.out.println("Start");

        }

        @Override
        public void onNext(String s) {
            Thread t = new Thread(() -> {
                if(s=="/api/userApi/selectAllUsers") {

                        WebClient client = WebClient.create(vertx);
                            client
                                .get(8081, "localhost", s)
                                .send(ar->{
                                    if (ar.succeeded()) {
                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                        System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });

                }else if(s=="/api/holidayApi/selectAllHolidays") {
                        WebClient client = WebClient.create(vertx);
                        client
                                .get(8080, "localhost", s)
                                .send(ar -> {

                                    if (ar.succeeded()) {

                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                       //  System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });
                    }
                });
                t.start();
        }
        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {
                System.out.println(jsonArray.encodePrettily());
                routingContext.response().end(jsonArray.encodePrettily());
        }
    };
    Observable.fromArray(com).subscribe(observer);
}

这就是我在控制台上得到的输出:

[ ]
[ {
  "holidayId" : 2,
  "userId" : 3,
  "place" : "Poland",
  "date" : {
    "year" : 2016,
    "month" : "DECEMBER",
    "dayOfMonth" : 29,
    "dayOfWeek" : "THURSDAY",
    "era" : "CE",
    "dayOfYear" : 364,
    "leapYear" : true,
    "monthValue" : 12,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "holidayId" : 10,
  "userId" : 1,
  "place" : "Netherland",
  "date" : {
    "year" : 2020,
    "month" : "JANUARY",
    "dayOfMonth" : 21,
    "dayOfWeek" : "TUESDAY",
    "era" : "CE",
    "dayOfYear" : 21,
    "leapYear" : true,
    "monthValue" : 1,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "userId" : 1,
  "name" : "Kacper",
  "phone_number" : "667667202"
}, {
  "userId" : 3,
  "name" : "Kamil",
  "phone_number" : "6734583443"
}, {
  "userId" : 4,
  "name" : "Janek",
  "phone_number" : "231253575"
}, {
  "userId" : 5,
  "name" : "Grzegorz",
  "phone_number" : "123456789"
}, {
  "userId" : 6,
  "name" : "Justin",
  "phone_number" : "111000111"
}, {
  "userId" : 8,
  "name" : "Mike",
  "phone_number" : "997"
}, {
  "userId" : 9,
  "name" : "Gorge",
  "phone_number" : "991"
} ]

共有2个答案

牧飞鹏
2023-03-14

我看到您正在onNext()中启动一个新线程?当您使用Thread.start()启动一个新线程时,它将并行进行其工作,并且当前的Observable.onNext()将认为工作已完成。这将导致调用on完成()

没有人在等待线程的作业完成。希望你明白我的意思。

现在,我不熟悉vert. x,但按照RxJava的方式,似乎不需要使用Thread。您可以将其删除并将其余逻辑直接保留在onNext()方法中。

黄永怡
2023-03-14

onComplete按时执行:当字符串的输入流完成时。您需要另一种方法来等待所有I/O操作完成的时刻。这是一个棘手的部分,我不知道Vertx或RxJava是否可以做到这一点,但标准Java API可以,使用CompletableFuture。因此,我们创建了从CompletableFuture到Handler的适配器,它保存一个I/O操作的结果:

class HandelerFuture extends CompletableFuture<JsonArray> 
         implements Handler<AsyncResult<HttpResponse<Buffer>>> {
    @Override
    public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
        if (ar.succeeded()) {
            JsonArray array = ar.result().bodyAsJsonArray();
            super.complete(array);
        } else {
            super.completeExceptionally(ar.cause());
        }
    }
}

此外,您不需要将onNext方法的主体包装在Tread中。其次,您不需要检查传递的url字符串是什么,因为它没有任何区别。第三,您只使用Observer来处理url字符串列表,这是过度复杂。普通回路就足够了。

CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
    ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
    for (String url : urls) {
        HandelerFuture future = new HandelerFuture();
        futures.add(future);
        WebClient client = WebClient.create(vertx);
        client.get(8081, "localhost", url)
              .send(future);
    }
    CompletableFuture all = new CompletableFuture();
    HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
    CompletableFuture.allOf(array)
            .thenRunAsync(() -> all.complete(array));
    return all;
}

然后可以按如下方式运行:

    CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
    HandelerFuture[] results = future.get();
    JsonArray finalArray;
    for (HandelerFuture result:results) {
        try {
            // extract partial json array
            JsonArray partialArray = result.get();
            // combine partialArray with finalArray somehow
        } catch (Exception e) {
            // this is the exception got in handle() method as ar.cause().
            e.printStackTrace();
        }
    }
    routingContext.response().end(finalArray.encodePrettily());

您没有告诉我如何组合json数组,所以我没有实现它。

 类似资料:
  • 尝试使用Vertx WebClient从此url下载文件但不起作用。我在这里错过了什么吗?

  • 我不熟悉vertx和RxJava。我正在尝试实现一个简单的测试程序。然而,我无法理解这个项目的动态。为什么有些请求需要10秒钟以上才能响应? 下面是我的示例测试应用程序 我想知道的是,是什么让我的响应时间变慢了?

  • 我需要以RxJava风格按顺序链接Vertx CompositeFuture,以获得相关CompositeFuture,避免回调地狱。 用例: 每个CompositeFuture.any/all做一些返回期货的异步操作,比如myList1、myList2、myList3,但是我必须等待CompositeFuture.any(myList1)完成并返回成功,然后再做CompositeFuture.a

  • 我正在尝试在AWS Lambda中运行vertx WebClient。AWS Lambda最多有1024个文件描述符,并且不可调整。我很难找到到底是什么用完了我所有的文件描述符。我只使用vertx的WebClient,不运行任何verticle。这是我的共享WebClient: 此客户端在lambda调用之间重用,并且从不关闭。 我如何使用它: 我有45k-65k请求发送。我得到的是: 当我在本地

  • 我使用vert. x作为api网关将调用路由到下游服务。 到目前为止,我使用的是跨多个垂直链接共享的单个web客户端实例(通过GUI注入) 每个verticle都有自己的webclient有意义吗?它是否有助于提高性能?(我的每个网关实例运行64个Vericle,每秒处理大约1000个请求) 每种方法的优缺点是什么? 有人能帮忙找出同样的理想策略吗? 谢谢

  • 你知道我在这里做错了什么吗?我怎么能让它工作。?!