我这里需要通过多线程去处理数据,然后在所有数据都处理完成后再往下执行。这里就用到了CountDownLatch。把countdownlatch作为参数传入到每个线程类里,在线程中处理完数据后执行countdown方法。在所有countdownlatch归零后,其await方法结束阻塞状态而往下执行。
具体代码如下:
将多线程任务提交线程池
@Bean(name = "ggnews_executor") public Executor postExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setKeepAliveSeconds(120); executor.setThreadNamePrefix("executor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); return executor; } //通过定时任务调用的fetch方法,为了避免定时任务在多次执行中失效,通异步指定线程池的方式进行调用 @Async("ggnews_executor") public void fetch() { if(fetchFlag.getAndSet(false)) { List<FetchTag> tags = fetchTagService.selectFetchTagList(fetchTag); CountDownLatch downLatch = new CountDownLatch(tags.size()); for (FetchTag tag : tags) { FetchTag tagNew; try { tagNew =(FetchTag) tag.clone(); } catch (Throwable e) { log.error("",e); continue; } //作为参数将CountDownLatch传入 InnerRunner innerRunner = new InnerRunner(downLatch, tagNew); executor.execute(innerRunner); } try { //等待线程执行完毕,如果十分钟后还没结束也会停止阻塞状态 downLatch.await(10,TimeUnit.MINUTES); fetchFlag.getAndSet(true); } catch (Throwable e) { log.error("fetch()方法发生错误:{}", e); fetchFlag.getAndSet(true); //e.printStackTrace(); } finally { fetchFlag.getAndSet(true); } } else { log.info("=======上次抓取尚未结束========="); } }
InnerRunner为要执行具体任务的线程类
private class InnerRunner implements Runnable { private CountDownLatch downLatch; private FetchTag tag; private InnerRunner(CountDownLatch downLatch, FetchTag tag) { this.downLatch = downLatch; this.tag = tag; } @Override public void run() { //将countDown方法移入到具体方法中的finally块中,以保证即使在抛出异常的情况下也算执行了此次任务,countdown会被执行 fetchGG(tag.getTag(), downLatch); //downLatch.countDown(); this.tag = null; } }
private static final String GOOGLE_URL_IN = "https://news.google.com/rss/search?hl=hi&gl=IN&ceid=IN:hi&q="; public void fetchGG(String tag, CountDownLatch downLatch) { try { Document document = Jsoup.parse(new URL(GOOGLE_URL_IN + URLEncoder.encode("\"" + tag + "\"", "utf-8")), 30000); Elements elements = document.getElementsByTag("item"); int rank = 1; for (Element element : elements) { String sourceTitle = element.getElementsByTag("title").get(0).text(); log.info("source title:" + sourceTitle); } } catch (Throwable e) { log.info("fetch google url error", e); } finally { //肯定会被执行 downLatch.countDown(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
我有4-5个工作线程处理大型消息队列。我还有另一段代码,它使用2-3个worker运行。我想在处理大型消息队列时阻止所有其他工作者。 我正在使用JDK6和Jms 编辑: 队列进程工作者从未终止。当没有消息时,它们阻塞队列。这些工作者由执行器线程池管理,如果我使用读写锁,其中一个工作者也会被阻塞。此外,如果使用循环屏障,那么我必须终止线程,以便重新传递阻塞的第二个进程。由于工作者是由线程池管理的,所
那么,我是否理解正确:await task.delay()不阻塞调用线程,而是阻塞等待的任务转移到的某个线程? 如果这句话是真的,那么你能给我一个方法,让任务等待一段时间而不阻塞任何线程在等待期间吗?
我们在应用程序中使用了Stomp、SpringBoot和WebSockets。服务器应用程序执行以下操作:1)生成要推送给用户的消息;2)接受WebSocket连接;3)将消息推送给ActiveMQ stomp Broker。线程转储显示了大量与simpMessagingTemplate convertAndSendToUser API调用相关联的等待线程。 应用程序的两个实例在云中运行。该应用程
问题内容: 我有两个分开的阻塞队列。客户端通常使用第二个阻塞队列中的第一个来检索要处理的元素。 在某些情况下,客户端对两个阻塞队列中的元素感兴趣,无论哪个队列首先提供数据。 客户端如何并行等待两个队列? 问题答案: 您可以尝试在某种循环中使用该方法,以仅在指定时间量内等待一个队列,然后再轮询另一个队列。 除此之外,我会说在另一个线程上为每个队列运行阻塞操作并为您的主应用程序提供回调接口是另一个稍微
我正在使用java Callable和ExecutorService执行计算: 我想让任务运行最多2分钟。但如果我打电话: 然后它就会立刻封锁!不要让我为其他任务分配超时,直到超时结束。 我可以用 但它返回的是未来列表,我不知道什么任务属于哪个未来。
摘要 本文描述了使用 QEMU 运行 RT-Thread 提供的基于多线程的非阻塞 socket 编程示例。 简介 随着物联网的发展,越来越多产品需要基于网络进行数据传输。在实际开发中,往往要求网络传输时不能阻塞当前线程,以致无法及时处理其他消息。在用户无法直接套用简单的 socket demo 时,RT-Thread 提供基于多线程的非阻塞 socket 编程示例,方便用户进行应用程序开发。 在