当前位置: 首页 > 面试题库 >

使用ReactiveX for Java进行Http调用

商飞航
2023-03-14
问题内容

我是ReactiveX for Java的新手,我有以下代码块可以进行外部http调用,但它不是异步的。我们正在使用rxjava 1.2和Java 1.8

  private ResponseEntity<String> callExternalUrl(String url, String json, HttpMethod method) {

    RestTemplate restTemplate;
    HttpEntity request;

      request = new HttpEntity(jsonContent, httpHeaders);

    return restTemplate.exchange(url, httpMethod, request, String.class);

  }

我有以下在网上找到的代码块,但我无法完全理解它,以及如何将其应用于代码库。

private RxClient<RxObservableInvoker> httpClient;
public <T> Observable<T> fetchResult(String url, Func1<Response, T> mapper) {

    return httpClient.target(url)
        .request()
        .rx()
        .get()
        .subscribeOn(Schedulers.io())
        .map(mapper);
  }

问题答案:

如果我对您的理解正确,则需要使用类似的方法包装现有的内容 callExternalUrl

static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> callExternalUrl(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

代码的简短说明:

  1. 它计划callExternalUrlSchedulers.io
  2. ResponseEntity<T>成功案例T和错误案例的转换最少。它也发生在io调度程序上,但是并不重要,因为它确实很短。(如果内部存在异常callExternalUrl,则按原样传递。)
  3. 使订阅者执行结果 Schedulers.computation

注意事项

  1. 您可能想同时使用自定义计划程序subscribeOnobserveOn
  2. 您可能希望在传递的第一个lambda中具有更好的逻辑,flatMap以区分成功和错误,并且绝对希望有一些更具体的异常类型。

高阶魔术

如果您愿意使用高阶函数并牺牲一点性能来减少重复代码,则可以执行以下操作:

// Universal wrapper method
static <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method)
{
    return Observable.fromCallable(() -> externalCall.call(url, json, method))
            .subscribeOn(Schedulers.io())
            .flatMap(re -> {
                         if (re.hasBody())
                             return Observable.just(re.getBody());
                         else
                             return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode()));
                     },
                     e -> Observable.error(e),
                     (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-(
            .observeOn(Schedulers.computation());
}

static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method)
{
    return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);
}

哪里MyClass是你的地方callExternalUrl是。

更新 (仅异步调用)

私有静态RxClient httpClient = Rx.newClient(RxObservableInvoker.class);
//在这里,您可以传递自定义ExecutorService

private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) {
    return httpClient.target(url)
            .request()
            .headers(httpHeaders) // assuming httpHeaders is something global as in your example
            .rx()
            .method(httpMethod, entity)
            .map(resp -> {
                if (200 != resp.getStatus()) {
                    throw new RuntimeException("Bad status code " + resp.getStatus());
                } else {
                    if (!resp.hasEntity()) {
                        // return null; // or error?
                        throw new RuntimeException("Empty response"); // or empty?
                    } else {
                        try {
                            return resp.readEntity(String.class);
                        } catch (Exception ex) {
                            throw new RuntimeException(ex); // wrap exception into unchecked
                        }
                    }
                }
            })
            .observeOn(Schedulers.computation());
}

private Observable<String> executeGetAsync(String url) {
    return executeHttpAsync(url, "GET", null);
}

private Observable<String> executePostAsync(String url, String json) {
    return executeHttpAsync(url, "POST", Entity.json(json));
}

同样,类似的 警告 适用:

  1. 您可能想将自定义调度程序用于newClient呼叫和observeOn
  2. 您可能希望有一些更好的逻辑来处理错误,而不仅仅是检查它是否为HTTP 200,并且肯定需要一些更特定的异常类型。但这都是特定于业务逻辑的,因此由您决定。

另外,从您的示例中还不清楚请求HttpEntity)的主体是如何构建的,以及您是否始终String像原始示例中一样总是希望作为响应。我还是照原样复制了您的逻辑。如果您还需要其他内容,则可能应该参考https://jersey.java.net/documentation/2.25/media.html#json上的文档



 类似资料:
  • 问题内容: 有什么方法可以使用AngularJS进行同步调用吗? AngularJS文档不是很明确,也不是为了找出一些基本内容而扩展。 在服务上: 问题答案: 不是现在。如果查看源代码(从2012年10月开始),您会发现对XHR open的调用实际上是硬编码为异步的(第三个参数为true): 您需要编写自己的执行同步调用的服务。通常,由于JavaScript执行的性质,您通常不需要执行此操作,最终

  • 我在DialogFlow的官方网站上使用Node.js找到了这个示例,它运行良好,但我不知道如何将其集成到我的web应用程序中。 我可以将它集成到我的其他javascript jquery代码中吗?这里我需要运行节点index.js,但是如果我与代码集成,我还需要这样做吗? DialogFlow v1使用起来非常简单。我有这样的东西:

  • 问题内容: 我是Go的新手,我正在尝试构建一个简单的HTTP服务器。但是我在JSON响应中遇到了一些问题。我编写了以下代码,然后尝试邮递员发送一些JSON数据。但是,我的邮递员总是得到空响应,而is 。然后,我在http://www.alexedwards.net/blog/golang-response- snippets#json中 检查了一个示例。我复制并粘贴了示例,并且效果很好。但是我看不

  • 我们有一个.NET项目,它检查远程计算机上是否存在一个文件。我们需要对一个部门内的多台远程计算机(数千台)执行此操作,每台计算机每天都在预定义的时间执行。执行时间是在数据库中指定的,它经常变化,每台远程计算机的执行时间都是不同的(有些可能是相同的)。为了实现这一点,我们计划使用Quartz调度器。由于我们是石英的新手,我们想知道如何实现这一点。在高层,我们需要这些- 调度程序应该在每天的特定时间启

  • 问题内容: 我正在尝试使用Go登录网站并存储cookie以供以后使用。 您能否提供示例代码来发布表单,存储cookie以及使用cookie访问另一个页面? 我认为我可能需要通过研究http://gotour.golang.org/src/pkg/net/http/client.go来使客户端存储cookie。 问题答案: Go 1.1引入了一个cookie jar实现。

  • 问题内容: 我需要接收仅包含2个参数的HTTP Post Multipart: JSON字符串 二进制文件 设置身体的正确方法是哪一种?我将使用Chrome REST控制台测试HTTP调用,因此我想知道是否正确的解决方案是为JSON参数和二进制文件设置“标签”键。 在服务器端,我正在使用Resteasy 2.x,我将像这样阅读Multipart主体: 这是要走的路吗?使用标识该特定内容处置的键“