线程池ThreadPoolExecutor
创始人
2024-03-03 18:30:33
0

线程池的生命周期

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 ThreadPoolExecutor使用一个ctl变量代表两个信息,线程池的运行状态 (runState) 和 线程池内有效线程的数量 (workerCount),高三位表示状态。

workerCount:线程池内有效线程的数量,工作线程允许开始,不允许停止,使用ctl的低29位表示

runState:线程的生命周期,是有序的随时间递增,也就是状态不可逆的。

    private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
状态描述

   状态位

(高三位)

RUNNING运行状态,允许提交新的任务,正常处理阻塞队列中的任务111
SHUTDOWN关闭状态,不允许提交新的任务,但可以正常处理队列中的任务000
STOP停止状态,不允许提交新的任务,也不处理队列中的任务,也会停止正在运行的状态001
TIDYING整理状态,所有的任务都停止,workerCount清零,将会调用terminated()010
TERMINATED终止状态,terminated()方法调用完成后的状态011

 

 任务调度

execute

  public void execute(Runnable command) {
//判断任务是否为空if (command == null)throw new NullPointerException();
//获取当前工作线程数量和线程状态int c = ctl.get();
//如果工作线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {
//增加一个核心线程去执行任务
//增加成功结束if (addWorker(command, true))return;
//增加失败,再次获取当前线程池的工作线程数量和线程状态c = ctl.get();}
//如果当前线程池正在运行
//将任务放入阻塞队列if (isRunning(c) && workQueue.offer(command)) {
//插入成功★
//再次获取当前线程池的工作线程数量和线程状态int recheck = ctl.get();
//此时(并发情况)线程池非Running状态
//将任务移除阻塞队列if (! isRunning(recheck) && remove(command))
//移除成功,执行抛弃策略reject(command);
//移除失败,则判断当前的工作线程数是否为0else if (workerCountOf(recheck) == 0)
//为0则增加一个空工作线程,(去执行★之前插入成功的任务)addWorker(null, false);}
//创建一个非核心线程去执行任务else if (!addWorker(command, false))
//创建失败,执行拒绝策略reject(command);}

 submit

submit传入任务以及其返回值类型,如果任务是Runnable的话上会将任务包装成Callable任务也就是RunnableFuture,然后再丢给execute去执行。

executesubmit
是否有返回值有,需要阻塞去获取
外部对的异常处理无法处理,使用execute任务出现异常,外部无法感知可以在Future.get()中捕获异常,进行处理
执行过程直接放到工作线程如果参数是Runnable则需要包装成Callable然后执行execute去执行
接受任务类型RunnableRunnable、Callable

阻塞队列

线程池使用阻塞队列将要执行的任务与线程分开,用生产者和消费者模型实现。

当任务数量超过核心线程数,后续任务就会放入阻塞队列中进行排队。

因为实现了BlockingQueue所以都是线程安全的。

Array

Blocking

Queue

Linked

Blocking

Queue

Priority

Blocking

Queue

Delay

Queue

Synchronous

Queue

Linked

Transfer

Queue

Linked

Blocking

Deque

名称数组有界阻塞队列链表有界阻塞队列优先级无界阻塞队列使用优先级队列实现的无界阻塞队列不存储元素的阻塞队列链表无界阻塞队列链表双向阻塞队列
实现的方式数组链表自定义优先级使用

Priority

Blocking

Queue

链表、tryTransfer双向链表
队列长度自定义Integer.MAX_VALUE无界无界无界无界无界
特点支持公平和非公平锁可以自定义compareTo()方法进行排序,不能保证同优先级元素的排序可以指定所有才能从队列中获取当前元素,延迟期满了之后才能从队列种获取元素每put操作必须等待take,否则不能put成功。支持公平和非公平锁。

LinkedBlockingQueue多了tryTransfer,transfer方法

头部和尾部都可以插入元素,多线程并发时们可以将所得竞争降低到一半

任务拒绝

当线程池的缓存队列已满,线程数也达到了maximumPoolSize 时,再新增任务就会触发拒绝策略。

拒绝策略收到三个参数影响:corePoolSize 、maximumPoolSize 、workQueue 。

当任务提交后,如果没达到核心线程数,则会直接创建核心线程运行,如果达到了核心线程数则会放入队列中等待,当任务数大于workQueue.size+maximumPoolSize则触发拒绝策略。

名称AbortPolicy CallerRunsPolicy DiscardPolicy DiscardOldestPolicy 
策略抛出异常,终止任务使用调用线程执行任务,当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务直接丢弃丢弃workQueue队列最老任务,添加新任务
场景需要捕获异常进行处理适合并发小,性能要求不高且不允许失败的情况。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大允许任务不执行允许任务不执行,且希望执行最新的任务
JDK是否默认使用

常用线程池

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...