当前位置: 首页 > 编程笔记 >

浅谈java.util.concurrent包中的线程池和消息队列

邹英光
2023-03-14
本文向大家介绍浅谈java.util.concurrent包中的线程池和消息队列,包括了浅谈java.util.concurrent包中的线程池和消息队列的使用技巧和注意事项,需要的朋友参考一下

1.java并发包介绍

JDK5.0(JDK1.5更名后)以后的版本引入高级并发特性,大多数的特性在java.util.concurrent包中,是专门用于多线程编程的,充分利用了现代多处理器和多核心系统的功能以编写大规模并发应用程序。主要包括原子量、并发集合、同步器、可重入锁,并对线程池的构造提供了强力的支持

2.线程池

java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

假设服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能,减少创建和销毁线程所需消耗的时间。

一个线程池由以下四个基本部分组成:

  1. 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  2. 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  4. 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术正是关心如何缩短或调整T1,T3时间从而提高服务器程序性能的技术。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,免去了线程创建和销毁的开销。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理100000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,
一般线程池大小是远小于100000。所以利用线程池的服务器程序不会为了创建100000而在处理请求时浪费时间,从而提高效率。

线程池的五种创建方式

  1. Single Thread Executor:只有一个线程的线程池,因此所提交的任务是顺序执行,Executors.newSingleThreadExecutor();
  2. Cached Thread Pool:线程池里有很多线程需同时进行,旧的可用线程将被新的任务触发从而重新执行,如果线程超过60秒内没有执行,那么将被终止并从池中删除Executors.newCachedThreadPool();
  3. Fixed Thread Pool:拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待,Executors.newFixedThreadPool(10);在构造函数中的参数10是线程池的大小,你可以随意设置,也可以和cpu的数量保持一致,获取cpu的数量int cpuNums = Runtime.getRuntime().availableProcessors();
  4. Scheduled Thread Pool:用来调度即将执行的任务的线程池Executors.newScheduledThreadPool();
  5. Sing Thread Scheduled Pool:只有一个线程,用来调度任务在指定时间执行Executors.newSingleThreadScheduledExecutor();

3.线程池的使用

以下用Fixed Thread Pool作为示范,提供一个使用参考

LogNumVo

package com.ithzk.threadpool;

/**
 * 用作返回 执行的数量的
 * @author hzk
 * @date 2018/3/29
 */
public class LogNumVo {
  private static final long serialVersionUID = -5541722936350755569L;
  private Integer dataNum;
  private Integer successNum;
  private Integer waitNum;

  public Integer getDataNum() {
    return dataNum;
  }
  public void setDataNum(Integer dataNum) {
    this.dataNum = dataNum;
  }
  public Integer getSuccessNum() {
    return successNum;
  }
  public void setSuccessNum(Integer successNum) {
    this.successNum = successNum;
  }
  public Integer getWaitNum() {
    return waitNum;
  }
  public void setWaitNum(Integer waitNum) {
    this.waitNum = waitNum;
  }
}
 

DealObject

package com.ithzk.threadpool;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class DealObject {

  private Integer identifyId;

  private String data;

  public DealObject(Integer identifyId, String data) {
    this.identifyId = identifyId;
    this.data = data;
  }

  public DealObject() {
  }

  public Integer getIdentifyId() {
    return identifyId;
  }

  public void setIdentifyId(Integer identifyId) {
    this.identifyId = identifyId;
  }

  public String getData() {
    return data;
  }

  public void setData(String data) {
    this.data = data;
  }
}

AbstractCalculateThread

package com.ithzk.threadpool;

import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class AbstractCalculateThread<T> implements Callable<String> {

  protected Collection<T> insertList;

  protected CountDownLatch countd;

  protected String threadCode;

  protected String batchNumber;

  public Collection<T> getInsertList() {
    return insertList;
  }

  public void setInsertList(Collection<T> insertList) {
    this.insertList = insertList;
  }

  public CountDownLatch getCountd() {
    return countd;
  }

  public void setCountd(CountDownLatch countd) {
    this.countd = countd;
  }

  public String getThreadCode() {
    return threadCode;
  }

  public void setThreadCode(String threadCode) {
    this.threadCode = threadCode;
  }

  public String getBatchNumber() {
    return batchNumber;
  }

  public void setBatchNumber(String batchNumber) {
    this.batchNumber = batchNumber;
  }


  public AbstractCalculateThread() {
    super();
  }

  public AbstractCalculateThread(Collection<T> insertList, CountDownLatch countd, String threadCode,String batchNumber) {
    super();
    this.insertList = insertList;
    this.countd = countd;
    this.threadCode = threadCode;
    this.batchNumber = batchNumber;
  }

  public String call() throws Exception {
    return null;
  }
}

CalculateDealThread

package com.ithzk.threadpool;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class CalculateDealThread extends AbstractCalculateThread<DealObject> {

  private ExecutorPool executorPool = SpringContextUtil.getBean(ExecutorPool.class);

  @Override
  public String call() throws Exception {
    try {
      System.out.println("========开始跑线程【"+threadCode+"】");
      return executorPool.syncBatchDealObject(insertList,batchNumber);
    } catch (Exception e) {
      e.printStackTrace();
      System.out.println("========开始跑线程【"+threadCode+"】:"+e.getMessage());
    }finally {
      countd.countDown();
    }
    return null;
  }

  public CalculateDealThread() {
    super();
  }

  public CalculateDealThread(Collection<DealObject> insertList, CountDownLatch countd, String threadCode,String batchNumber) {
    super(insertList, countd, threadCode, batchNumber);
  }

}

ExecutorPool

package com.ithzk.threadpool;

import java.util.*;
import java.util.concurrent.*;

/**
 * @author hzk
 * @date 2018/3/29
 */
public class ExecutorPool {

  /**
   * 模拟需要处理数据的大小
   */
  private static final int ARRAY_COUNT = 50000;
  /**
   * 开启多线程处理的条件
   */
  private static final int MULTI_THREAD_STARTCOUNT = 10000;
  /**
   * 批量处理的大小
   */
  private static final int BATCH_DEAL_SIZE = 100;
  /**
   * 每次开启线程数量
   */
  public static final int THREAD_POOL_NUM=10;

  public static void main(String[] args){
    testExecutorPool();
  }

  public static void testExecutorPool(){
    ArrayList<DealObject> dealObjects = new ArrayList<DealObject>();
    for (int i = 0;i<ARRAY_COUNT;i++){
      DealObject dealObject = new DealObject(i,"data_"+i);
      dealObjects.add(dealObject);
      System.out.println("Data add success current:"+i);
    }
    int size = dealObjects.size();
    int successNum = 0;
    int waitNum = 0;
    System.out.println("需要处理的数据数据量为:"+size);
    // 判断数据是否大于10000 如果大于则开启线程池 跑数据
    if (size > MULTI_THREAD_STARTCOUNT) {
      try {
        System.out.println("===================dataNum > 1000 | Multiple Thread Run=======================");
        // 每次新增处理的条数
        int batchInsertSize = BATCH_DEAL_SIZE;
        // 定义保存的线程池
        ExecutorService executorInsert = Executors.newFixedThreadPool(THREAD_POOL_NUM);
        // 定义保存过程中返回的线程执行返回参数
        List<Future<String>> futureListIsert = new ArrayList<Future<String>>();
        // 线程 修改list
        List<Map<Integer, DealObject>> listDealObjects = new ArrayList<Map<Integer, DealObject>>();
        List<Map<Integer, DealObject>> listLiveSyncLogInsert = pointDateClassify(dealObjects, batchInsertSize, listDealObjects);
        if (null != listLiveSyncLogInsert && !listDealObjects.isEmpty()) {
          System.out.println("===================切割后的大小:"+listLiveSyncLogInsert.size()+"=======================");
          //配合使用CountDownLatch为了保证在执行完所有子程序之后再执行主程序
          CountDownLatch countd = new CountDownLatch(listLiveSyncLogInsert.size());
          for (int j = 0; j < listLiveSyncLogInsert.size(); j++) {
            Map<Integer, DealObject> insert = listLiveSyncLogInsert.get(j);
            Future<String> future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null));
            futureListIsert.add(future);
          }
        }
        // 等待线程执行完成
        executorInsert.shutdown();
        for (Future<String> future : futureListIsert) {
          String json = future.get();
          if (null != json && !"".equals(json)) {
            将返回的json格式数据转换为实体类 进行业务记录
            LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class);
            successNum += logNumVo.getSuccessNum();
            waitNum += logNumVo.getWaitNum();
          }
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      }

    }
  }

  /**
   * 拆分线程数
   * 假设集合中有50000个元素 则按照100个一组切分 可切分为500组
   * 即每个线程一次处理一组(100个元素)
   *
   * @author
   * @param lPostUploadIntegralList
   * @param batchInsertSize
   * @param listPostUploadIsert
   */
  @SuppressWarnings("all")
  public static List<Map<Integer, DealObject>> pointDateClassify(List<DealObject> lPostUploadIntegralList,int batchInsertSize, List<Map<Integer, DealObject>> listJSONObjectUpdate) {
    List<Map<Integer, DealObject>> listLiveSyncLogInsert = new Vector<Map<Integer, DealObject>>();
    // 新增数据list
    List<DealObject> integralListInsert = lPostUploadIntegralList;

    System.out.println("============integralListInsert.size()=====:" + integralListInsert.size());
    // 拆分数据(拆成多个List)
    int inserti = 0;
    if (integralListInsert != null && integralListInsert.size() > 0) {
      ConcurrentHashMap<Integer, DealObject> integralListIns = null;
      for (int l = 0; l < integralListInsert.size(); l++) {
        if (integralListIns == null) {
          integralListIns = new ConcurrentHashMap<Integer, DealObject>();
        }
        integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l));
        inserti++;
        if ((inserti % batchInsertSize) == 0) {
          listLiveSyncLogInsert.add(integralListIns);
          integralListIns = null;
        } else {
          // 最后100条或不足100条数据
          if ((l + 1) == integralListInsert.size()) {
            listLiveSyncLogInsert.add(integralListIns);
          }
        }
      }
    }
    System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size());
    return listLiveSyncLogInsert;
  }

  /**
   * 多线程保存数据至数据库
   */
  public String syncBatchDealObject(Collection<DealObject> insertList,String batchNumber) {
    int successNum = 0, waitNum = 0;
    Date currentDate = new Date(System.currentTimeMillis());
    for (DealObject dealObject : insertList) {
      try {
        int icount = syncDealObject(dealObject,currentDate);
        if(icount > 0){
          successNum ++;
        }else {
          waitNum ++;
        }
      } catch (Exception e) {
        e.printStackTrace();
        ++waitNum;
      }
    }
    LogNumVo logNum = new LogNumVo();
    logNum.setDataNum(0);
    logNum.setSuccessNum(successNum);
    logNum.setWaitNum(waitNum);
    // 将记录实体类转为json格式反馈给线程池
    return JSON.toJSONString(logNum);
  }

  /**
   * 处理数据业务
   * @param dealObject
   * @param currentDate
   * @return
   */
  private int syncDealObject(DealObject dealObject,Date currentDate){
    int successNum = 0;
    //业务处理逻辑
    if(null != dealObject.getData()){
      successNum++;
    }
    return successNum;
  }
}

4.BlockingQueue

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

插入:

add(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常

offer(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则返回false.

put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.

读取:

poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其他:

int remainingCapacity()
返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
该数量总是等于此队列的初始容量,小于队列的当前 size(返回队列剩余的容量)。
注意,不能总是通过检查 remainingcapacity 来断定试图插入一个元素是否成功,因为可能是另一个线程将插入或移除某个元

素。
boolean remove(Object o)
从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true

public boolean contains(Object o)
查看队列是否存在这个元素,存在返回true

int drainTo(Collection<? super E> c)
传入的集合中的元素,如果在队列中存在,那么将队列中的元素移动到集合中

int drainTo(Collection<? super E> c, int maxElements)
和上面方法的区别在于,制定了移动的数量

以下是一个BlockQueue的基本使用参考:

Producer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class Producer implements Runnable{

  BlockingQueue<String> blockingQueue;

  public Producer(BlockingQueue<String> blockingQueue) {
    this.blockingQueue = blockingQueue;
  }

  @Override
  public void run() {
    try {
      String threadIdentify = "A Producer,生产线程"+Thread.currentThread().getName();
      blockingQueue.put(threadIdentify);
      System.out.println("Produce success! Thread:"+Thread.currentThread().getName());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Consumer

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class Consumer implements Runnable{

  BlockingQueue<String> blockingQueue;

  public Consumer(BlockingQueue<String> blockingQueue) {
    this.blockingQueue = blockingQueue;
  }

  @Override
  public void run() {
    try {
      String consumer = Thread.currentThread().getName();
      System.out.println("Current Consumer Thread:"+consumer);
      //如果队列为空会阻塞当前线程
      String take = blockingQueue.take();
      System.out.println(consumer + " consumer get a product:"+take);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

BlockTest

package com.ithzk.BlockingQueueTest;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author hzk
 * @date 2018/3/31
 */
public class BlockTest {
  
  public static void main(String[] args) throws InterruptedException {
    // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
    // BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
    // BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2);
    BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(2);
    Consumer consumer = new Consumer(blockingQueue);
    Producer producer = new Producer(blockingQueue);
    for (int i = 0; i < 3; i++) {
      new Thread(producer, "Producer" + (i + 1)).start();
    }
    for (int i = 0; i < 5; i++) {
      new Thread(consumer, "Consumer" + (i + 1)).start();
    }

    Thread.sleep(5000);

    new Thread(producer, "Producer" + (5)).start();

  }
}

BlockingQueue有四个具体的实现类,常用的两种实现类为:

  1. ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
  2. LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。

若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

LinkedBlockingQueue和ArrayBlockingQueue区别

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍浅谈Android中线程池的管理,包括了浅谈Android中线程池的管理的使用技巧和注意事项,需要的朋友参考一下 说到线程就要说说线程机制 Handler,Looper,MessageQueue 可以说是三座大山了 Handler Handler 其实就是一个处理者,或者说一个发送者,它会把消息发送给消息队列,也就是Looper,然后在一个无限循环队列中进行取出消息的操作 mMyHa

  • 本文向大家介绍浅谈python 线程池threadpool之实现,包括了浅谈python 线程池threadpool之实现的使用技巧和注意事项,需要的朋友参考一下 首先介绍一下自己使用到的名词: 工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务; 任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过 

  • 我正计划创建可调整队列大小的可调整线程池。我正在使用unbounded LinkedBlockingQueue和一个外部设置,该设置控制排队的消息数量。最初,my corepoolsize和maxpoolsize是相等的。现在,如果我想在运行时更新我的线程池大小,我通过一个公共设置将corepoolsize和maxpoolsize设置为不同的值。我想知道你对这种做法有什么看法。 当maxpools

  • 本文向大家介绍浅谈java常用的几种线程池比较,包括了浅谈java常用的几种线程池比较的使用技巧和注意事项,需要的朋友参考一下 1. 为什么使用线程池 诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方式可能是通过网络协议(例如 HTTP、FTP 或 POP)、通过 JMS 队列或者可能通过

  • 本文向大家介绍浅谈chuck-lua中的多线程,包括了浅谈chuck-lua中的多线程的使用技巧和注意事项,需要的朋友参考一下 chuck-lua支持actor模式的线程模型.可以通过cthread.new创建线程,然后通过cthread.sendmail向线程发送消息. 与skynet这种框架不同,chuck-lua并不提供多线程的任务/消息调度功能,每个线程维护了一个简单的线程邮箱,用于缓存其

  • 本文向大家介绍Python中线程的MQ消息队列实现以及消息队列的优点解析,包括了Python中线程的MQ消息队列实现以及消息队列的优点解析的使用技巧和注意事项,需要的朋友参考一下 “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相