我有一个会话列表,我必须调用一个网络服务来设置每个会话的一些属性。
我尝试使用异步进程调用webservice,并使用completablefuture,以便在完成所有操作后,可以将它们全部保存在db中。
我该怎么做呢?到目前为止,我的代码如下,它不起作用。
sessions.stream()
.forEach(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor));
sessionService.saveAll(sessions);
编辑:
我提出了这个解决方案,不确定这是否是正确的方法。
List<CompletableFuture<Void>> futures = sessions.stream()
.map(s -> CompletableFuture.runAsync(() -> webServiceCall(s), executor))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
sessionService.saveAll(sessions);
我正在使用join来确保它在保存会话之前等待响应返回
简而言之-你只需要这样的东西-
CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);
您需要一个调用执行器(线程池)的方法。在我的情况下,我的游泳池大小是100。接下来,你需要打电话给你的供应商,只要你想多次。
每次调用“供应商”都将创建一个任务。我正在创建10000个任务。它们中的每一个都将并行运行,完成后,它们中的每一个都将调用我的“消费者”。
您的供应商应该返回某种保存webservice响应的对象。然后,此对象将成为“使用者”方法的参数。
你可能想在一切完成后(或中间)杀死游泳池。
请参见下面的示例-
package com.sanjeev.java8.thread;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Caller {
public static ExecutorService ex = Executors.newFixedThreadPool(100);
public static void main(String[] args) throws InterruptedException {
Caller caller = new Caller();
caller.start();
ex.shutdown();
ex.awaitTermination(10, TimeUnit.MINUTES);
}
private void start() {
for (int i = 0; i < 10000; i++) {
CompletableFuture.supplyAsync(this::supplySomething, ex).thenAccept(this::consumer);
}
}
private int supplySomething() {
try {
URL url = new URL("http://www.mywebservice.com");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.connect();
try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
wr.write("supply-some-data".getBytes());
}
Reader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
for (int c; (c = in.read()) >= 0;) {
System.out.print((char) c);
}
in.close();
// return the response code. I'm return 'int', you should return some sort of object.
return 200;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void consumer(Integer i) {
// This parameter should be of type 'your object' that supplier returned.
// I got the response; add it in the list or whatever....
}
}
另一个可能更适合您需要的示例-
public class Caller2 {
public static ExecutorService ex = Executors.newFixedThreadPool(2);
private static Iterator<String> addresses = Stream.of("www.google.com", "www.yahoo.com", "www.abc.com").collect(Collectors.toList()).iterator();
private static ArrayList<String> results = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
Caller2 caller = new Caller2();
caller.start();
ex.shutdown();
ex.awaitTermination(1, TimeUnit.HOURS);
System.out.println(results);
}
private void start() {
while (addresses.hasNext()) {
CompletableFuture.supplyAsync(this::supplyURL, ex).thenAccept(this::consumer);
}
}
private String supplyURL() {
String url = addresses.next();
// call this URL and return response;
return "Success";
}
public void consumer(String result) {
results.add(result);
}
我对RxJava并不完全陌生,但我被一项看似简单的任务所阻碍。 我有一个数据源,它公开了一个反应式API,我所要做的就是获取一些数据,返回它,并在没有其他消息发出时自动关闭连接。 这是我的代码: conn.query()和conn.close()在不同的调度程序中异步执行。此代码不起作用,因为conn.close()返回一个没有订阅服务器的Completable。此外,如果我手动订阅doOnCom
两个文件antmodule1.gradle和antmodule2.gradle简单如: 我可以在日志中看到,作为构建gradle-module的一部分,ant-module1目标被执行,然后转移到构建Ant-Module2。但是,这是崩溃的,因为在构建ant-module1时创建的临时文件无法删除。查看java进程,我可以看到第一个ant调用打开的java进程,它仍然保存着我的临时文件(加载一些自
我想有一个清单的comletablefutures我想等待。带有以下代码。 问题是,在所有的期货,其中一些可以是缓慢的,我想在到期时间后,get方法返回与所有完成的结果。有办法做到这一点吗? 多谢.
问题内容: 由于某种原因,我完成后没有被调用。 我的班级干事: 我的onPostExecute(): 一切正常,我成功完成并返回一个布尔值,但随后就结束了。 谢谢 问题答案: 您是否在UI线程上创建了AsyncTask?还要在onPostExecute()方法上添加一个@Override注释,以确保正确声明了它。
我是RxJava新手。我想从给定集合中下载每个TempoAccount实体的一些数据,并将其存储在map accountsWithProjects中。当上一个onNext(TempoAccount TempoAccount)的代码完成时,我想调用filterAccountsWithProjects(accountsWithProjects)方法。有什么简单的方法可以实现吗? 问题:在上面的代码中,