当前位置: 首页 > 知识库问答 >
问题:

CompletableFuture可在一切完成后进行webservice调用并保存

李谦
2023-03-14

我有一个会话列表,我必须调用一个网络服务来设置每个会话的一些属性。

我尝试使用异步进程调用webservice,并使用completablefuture,以便在完成所有操作后,可以将它们全部保存在db中。

我该怎么做呢?到目前为止,我的代码如下,它不起作用。

sessions.stream()
        .forEach(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor));
sessionService.saveAll(sessions);

编辑:

我提出了这个解决方案,不确定这是否是正确的方法。

List<CompletableFuture<Void>> futures = sessions.stream()
            .map(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor))
            .collect(Collectors.toList());
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                        .join();
        sessionService.saveAll(sessions);

我正在使用join来确保它在保存会话之前等待响应返回

共有1个答案

夏飞掣
2023-03-14

简而言之-你只需要这样的东西-

CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);

您需要一个调用执行器(线程池)的方法。在我的情况下,我的游泳池大小是100。接下来,你需要打电话给你的供应商,只要你想多次。

每次调用“供应商”都将创建一个任务。我正在创建10000个任务。它们中的每一个都将并行运行,完成后,它们中的每一个都将调用我的“消费者”。

您的供应商应该返回某种保存webservice响应的对象。然后,此对象将成为“使用者”方法的参数。

你可能想在一切完成后(或中间)杀死游泳池。

请参见下面的示例-

package com.sanjeev.java8.thread;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Caller {

public static ExecutorService ex = Executors.newFixedThreadPool(100);

public static void main(String[] args) throws InterruptedException {
    Caller caller = new Caller();

    caller.start();

    ex.shutdown();
    ex.awaitTermination(10, TimeUnit.MINUTES);
}

private void start() {
    for (int i = 0; i < 10000; i++) {
        CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);
    }
}

private int supplySomething() {
    try {
        URL url = new URL("http://www.mywebservice.com");

        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.setDoInput(true);

        connection.connect();

        try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
            wr.write("supply-some-data".getBytes());
        }

        Reader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));

        for (int c; (c = in.read()) >= 0;) {
            System.out.print((char) c);
        }

        in.close();

        // return the response code. I'm return 'int', you should return some sort of object.
        return 200;

    } catch (Exception e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

public void consumer(Integer i) {
    // This parameter should be of type 'your object' that supplier returned.
    // I got the response; add it in the list or whatever....
}

}

另一个可能更适合您需要的示例-

public class Caller2 {

public static ExecutorService ex = Executors.newFixedThreadPool(2);
private static Iterator<String> addresses = Stream.of("www.google.com", "www.yahoo.com", "www.abc.com").collect(Collectors.toList()).iterator();
private static ArrayList<String> results = new ArrayList<>();

public static void main(String[] args) throws InterruptedException {
    Caller2 caller = new Caller2();

    caller.start();

    ex.shutdown();
    ex.awaitTermination(1, TimeUnit.HOURS);

    System.out.println(results);
}

private void start() {
    while (addresses.hasNext()) {
        CompletableFuture.supplyAsync(this::supplyURL, ex).thenAccept(this::consumer);
    }
}

private String supplyURL() {
    String url = addresses.next();
    // call this URL and return response;
    return "Success";
}

public void consumer(String result) {
    results.add(result);
}
 类似资料:
  • 我对RxJava并不完全陌生,但我被一项看似简单的任务所阻碍。 我有一个数据源,它公开了一个反应式API,我所要做的就是获取一些数据,返回它,并在没有其他消息发出时自动关闭连接。 这是我的代码: conn.query()和conn.close()在不同的调度程序中异步执行。此代码不起作用,因为conn.close()返回一个没有订阅服务器的Completable。此外,如果我手动订阅doOnCom

  • 两个文件antmodule1.gradle和antmodule2.gradle简单如: 我可以在日志中看到,作为构建gradle-module的一部分,ant-module1目标被执行,然后转移到构建Ant-Module2。但是,这是崩溃的,因为在构建ant-module1时创建的临时文件无法删除。查看java进程,我可以看到第一个ant调用打开的java进程,它仍然保存着我的临时文件(加载一些自

  • 我想有一个清单的comletablefutures我想等待。带有以下代码。 问题是,在所有的期货,其中一些可以是缓慢的,我想在到期时间后,get方法返回与所有完成的结果。有办法做到这一点吗? 多谢.

  • 问题内容: 由于某种原因,我完成后没有被调用。 我的班级干事: 我的onPostExecute(): 一切正常,我成功完成并返回一个布尔值,但随后就结束了。 谢谢 问题答案: 您是否在UI线程上创建了AsyncTask?还要在onPostExecute()方法上添加一个@Override注释,以确保正确声明了它。

  • 我是RxJava新手。我想从给定集合中下载每个TempoAccount实体的一些数据,并将其存储在map accountsWithProjects中。当上一个onNext(TempoAccount TempoAccount)的代码完成时,我想调用filterAccountsWithProjects(accountsWithProjects)方法。有什么简单的方法可以实现吗? 问题:在上面的代码中,