我通过ThreadPoolExecutor
执行一些可调用项。如果线程列表只包含1个可调用的线程,那么我直接调用我的CallableService
的call
方法。若列表包含超过1个可调用项,那个么我将通过线程池执行器并行执行所有这些线程。
我如何在Java8 CompletableFuture中实现这一点?如果增强了future.get()
以避免阻塞,这将是一个加号。
private static ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(0, 100, 5L, TimeUnit.SECONDS, new SynchronousQueue<>());
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
List<Future<Boolean>> futureList = null;
CallableService singleService = (CallableService) threadList.get(0);
if (1 == threadList.size()) {
singleService.call();
}
else {
try {
futureList = myThreadPoolExecutor.invokeAll(threadList);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
if (null != futureList) {
for (Future<Boolean> future : futureList) {
try {
future.get();
}
catch (Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
我通过线程池执行几个可调用的。如果线程列表只包含1个可调用的,那么我直接调用我的CallableService的调用方法。如果列表包含超过1个可调用的,那么我通过线程池执行器并行执行所有这些线程。
我想你已经实现了这个部分。(如果作业繁重,并且有100个线程按配置运行,则可能会遇到内存使用问题。但这是另一个问题。)
如果future.get()被增强以避免阻塞,那将是一个优点。
为此,您可以采取以下方法:
>
ExecutorService
,其工作只是运行Future.get()
调用。Future.get()
到该服务,如下所示。 关闭它,等待终止。
if (null != futureList) {
ExecutorService waitSvc = Executors.newCachedThreadPool();
for (Future<Boolean> future : futureList) {
try {
waitSvc.submit( () -> future.get() );
}
catch (Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
waitSvc.shutdown(); //This may take some time. You may want to call awaitTermination() after this.
}
然而,我觉得你应该重新设计使用这么多线程的整体方法,除非这只是一个用于学习的应用程序。
Future.isDone()告诉我们执行器是否完成了任务的处理。如果任务完成,则返回true,否则返回false。
for (Future<Boolean> future : futureList) {
while(!future.isDone())
{
doSOmethingElse();
Thread.sleep(300);//Optional
}
try {
future.get();
}
catch (Exception e)
{
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
不需要CompletableFuture
,因为您使用ExecutorService
的方式就足够了,但是,代码流的某些方面可以改进。即使不需要,也可以获取第一个元素,并将其无缘无故地转换为CallableService
,因为您已经可以通过Callable
接口调用该方法。在另一个分支中,您正在捕获InterruptedExc0019
并继续执行,因此调用方永远不会知道并非所有作业都已执行。在直接的代码流中,您不需要检查列表中的null
:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
Callable<Boolean> singleService = threadList.get(0);
singleService.call();
}
else {
List<Future<Boolean>> futureList = myThreadPoolExecutor.invokeAll(threadList);
for(Future<Boolean> future : futureList) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
你可以把它进一步缩短到
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
for(Future<Boolean> future : myThreadPoolExecutor.invokeAll(threadList)) {
try {
future.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
}
但这是一个首选编码风格的问题。但请注意,我注意到,在单元素情况下,您没有执行相同的异常处理。
要使用CompletableFuture
,我们需要一个适配器方法,因为便利方法supplyAsync
需要一个Supplier
而不是可调用的
。使用这个答案的一个修改变量,我们得到
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
if(1 == threadList.size()) {
threadList.get(0).call();
}
else {
CompletableFuture<?> all = CompletableFuture.allOf(
threadList.stream()
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new));
try {
all.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
}
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
所以我们没有处理提交所有作业的调用。我们必须手动执行此操作,可以使用循环操作,也可以使用流操作。另一方面,我们通过表示完成状态的allOf
获得单个未来,如果至少有一个作业失败,则为例外。
与等待完成的invokeAll
不同,allOf
只返回未来,因此等待完成的是all.get()
调用。在此之前,我们可以执行其他操作,甚至可以使用此属性始终执行调用线程中的第一个作业:
public static void execute(List<Callable<Boolean>> threadList) throws Exception {
CompletableFuture<?> tail = CompletableFuture.allOf(
threadList.stream().skip(1)
.map(c -> callAsync(c, myThreadPoolExecutor))
.toArray(CompletableFuture<?>[]::new)),
head = callAsync(threadList.get(0), Runnable::run);
try {
head.get();
tail.get();
}
catch(Exception e) {
//do some calculations here and then throw exception
throw new Exception(e.getMessage(), e);
}
}
这将始终调用当前线程中的第一个可调用的,因为Runnable::run
将在调用线程中立即执行该操作。但是它在所有其他方面都被统一处理,尤其是异常处理。当只有一个作业时,带有空数组的allOf
调用将什么都不做,并返回一个已经完成的未来,这将具有所需的效果。
我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了
我试图从父组件按钮单击调用每个子组件函数。尝试使用道具并在componentDidMount函数中将按钮单击指定给新函数,但结果是只有最后一个函数警报(“child2保存”);在按钮点击时被分配和执行,我期望警报(“child1保存”);也将被执行。 如何在父组件上单击要调用的每个子函数按钮,注意子组件在父组件内部有相当大的嵌套级别。
我需要执行一些任务。有些任务是独立的,有些任务依赖于其他任务的成功执行。独立任务可以并行运行以获得更好的性能。我把这些任务称为服务。列说明哪些服务将以串联方式执行,哪些服务将以并联方式执行。列描述了一组定义的服务所遵循的执行顺序。例如,服务A和B应该并行运行。如果它们已成功执行,则将执行服务C。请注意,服务C并不直接依赖于其先前服务的输出,但它必须在成功执行其先前服务后运行,因为服务C在执行期间需
问题内容: 我有多个包含要发送的消息的BlockingQueues。消费者数量可能少于队列数量吗?我不想遍历队列并继续轮询它们(忙于等待),并且我不想为每个队列都分配一个线程。相反,当任何队列中都有消息可用时,我想唤醒一个线程。 问题答案: 您可以做的一个技巧是让队列排队。因此,您要做的是只有一个阻塞队列,所有线程都订阅该队列。然后,当您将某些内容排队到其中一个BlockingQueues中时,您
我正在使用 Kubernetes 作为容器编排器构建一个微服务应用程序。该应用程序现已启动并运行,但我有其他问题。那是在我的服务中,我每天都有一个计划任务运行,当服务部署时,将运行多个服务实例(通过设置副本编号),创建多个同时运行的任务。我期望的是只有一个服务任务实例将运行,而不是多个实例。有什么技术可以处理这种情况吗? 库伯内特斯 Asp.net核心构建微服务 CI/CD的基岩实现 Fabrik
我正在使用Python(IPython 我有一个3000个唯一ID的数组,可以从API中提取数据,一次只能使用一个ID调用API。 我希望以某种方式同时拨打3组1000个电话,以加快速度。 最好的方法是什么? 提前感谢任何帮助!