自定义线程池

结构

image-20220320135837804

阻塞队列

1
2
3
4
5
6
7
8
9
10
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
+ blocking queue
+ 任务队列-双向链表
+ 可重入锁
+ 生产者条件变量
+ 队列容量限制
+ 消费者条件变量
+ 队列空限制
+ 容量上限
+ 阻塞获取
+ 上锁
+ 队列不为空,获取队列头元素
+ 唤醒队列满条件变量
+ 阻塞添加
+ 上锁
+ 队列不满,进入队列尾部
+ 唤醒队列空条件变量
+ poll增强
+ 带超时阻塞获取
+ await返回的是:被唤醒后剩余时间
+ 剩余时间赋值给下一个等待时间
+ 剩余时间用完,退出

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
+ 实现
+ TheadPool
+ blockqueue
+ hashset:线程worker
+ 如果任务数没超过最大,交给线程执行
+ 如果任务数超过最大,任务进入阻塞队列
+ MAX
+ timeout
+ 执行任务
+ worker执行
+ 从传过来的task、任务队列中的task不断获取执行
+ 执行完毕,移除线程
+ 现在这个好像一旦没任务就扔掉,这不是享元模式啊
+ 并不会移除,因为taskQueue的take方法是阻塞的,也就是说线程会一直活在休息室
+ take死等&poll超时
+ take()死等
+ 不会停止线程池中的线程
+ 采用poll超时
+ 没有任务,超时结束线程
+ 当任务队列已满
+ 阻塞等待添加任务队列
+ offer增强
+ 带超时时间,阻塞添加队列
+ 拒绝策略
+ 策略模式
+ tryput
+ 拒绝策略测试
+ 面试手写线程池不慌了
+ 等待、放弃、自己执行

ThreadPoolExecutor

构造方法

image-20220320141145740

执行流程

image-20220320141746086

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
+ JDK的线程池
+ 线程池状态
+ 存在原子变量中
+ 一个整数同时保存了状态和线程个数
+ 构造方法
+ 工作方式
+ 救急线程
+ 阻塞队列满了之后,开启
+ 执行完毕销毁
+ 可设置生存时间
+ 核心线程一直保存
+ 救急线程都满了
+ 执行拒绝策略
+ 执行流程
+ 拒绝策略:四种
+ 抛出异常
+ 调用者执行任务
+ 放弃任务
+ 放弃队列中最早的任务,本任务取代
+ 其他框架
+ 站在架构角度思考问题
+ dubbo
+ 抛异常前记录日志,线程栈信息
+ netty
+ 创建新线程
+ activemq
+ 带超时放入队列
+ pinpoint
+ 拒绝策略链,逐一尝试每种策略
+ 将框架拆解开,从自己实现到带逐一分析别人怎么实现,思路清晰。
+ 工厂方法创建线程池
+ 固定大小线程池
+ 没有救急线程,线程不会结束
+ 无界队列
+ 厉害,底层使用到了许多设计模式
+ 带缓冲线程池
+ 全部救急线程,60s回收
+ 队列:没有容量
+ 没有线程取是放不进去的
+ 队列的put只有其他线程take才会被执行
+ 单线程线程池
+ 保证线程池始终有一个可用的线程
+ 装饰器模式
+ 只能调用返回包装对象中的方法
+ submit(带返回结果)
+ execute(runnable)
+ submit(callable)
+ 返回结果
+ 保护性暂停模式
+ TODO 异步编排和ThreadLocal
+ 测试
+ invokeAll(执行任务集合、超时时间)
+ invokeAny
+ 执行完一个即可(最先执行完毕)
+ 停止
+ shutdown(执行完毕所有任务(队列)停止)
+ 主线程异步,不会等待
+ showdownnow
+ STOP立即停止
+ 返回队列
+ 判断暂停,等待结束等
+ 测试
+ 结束
+ 等待

设计模式-工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+ 异步模式之工作线程
+ 定义
+ 工作线程:线程池、享元模式
+ 不同任务类型采用不同线程池
+ 不同类型的任务 执行时间不一样 所以同一个线程池可能导致有些线程很难或者很迟被执行 而且每个task的优先级不同
+ 饥饿
+ 现象
+ 线程数量不足导致饥饿(固定大小线程池)
+ 线程数不足,无法处理任务
+ 解决
+ 不同任务类型,应该使用不同线程池
+ 池大小
+ 过小、过大都不合适
+ 运算类型
+ IO密集型公式

任务调度线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
+ timer的缺点
+ timer:一个线程调度,串行执行
+ 前面任务执行时间会影响、终止后面的任务
+ ScheduledThreadPoolExecutor
+ 延时执行
+ 改变线程池大小
+ 出现异常,不影响下面的运行
+ 定时执行
+ 按照间隔不断执行一次任务
+ 一个任务开始为起点的
+ delay
+ 结束时间算
+ 这些还是属于比较基础的
+ 正确处理线程池异常
+ trycatch
+ future对象 + callable
+ get()
+ 线程池应用
+ 定时任务
+ 记得Linux或者spring里有个cron表达式专门做定时任务
+ 用quartz或springtask
+ 可以用Xxl-job
+ 下次执行时间 + 间隔时间
+ tomcat线程池
+ 线程池-分工
+ 拒绝策略:尝试放入队列后,抛异常
+ 配置
+ 最大线程不够,再放入队列
+ fork/join线程池
+ 分治、任务拆分线程池
+ 使用
+ 任务对象:recursivetask
+ 拆分任务:递归思想
+ 测试
+ 这个方法处理斐波那契数列非常慢,还是需要加上记忆化
+ 任务拆分优化
+ 不用一个任务依赖另一个任务
+ 和Java高并发程序设计里的例子一样,很好的分离任务之间的联系
+ 分治
+ 拆分两个任务,分治

Aqs原理

JUC包原理部分

image-20220320142555302

AQS同步器

image-20220320142530222

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
+ AQS原理
+ 概述
+ 阻塞式锁和同步工具的框架
+ state
+ 控制获取锁释放锁
+ 等待队列,entrylist
+ 条件变量,waitlist
+ reentrantLock就是用的aqs
+ 自定义锁(不可重入锁)
+ 同步器类-继承aqs
+ tryacquire
+ cas修改status
+ 设置owner为当前线程
+ tryrelease
+ owner为null
+ 直接修改status
+ 前面为写屏障,保证null被其他线程可见
+ isHeldexclusively
+ 是否持有独占锁
+ newcondition
+ 加锁
+ 加锁、可打断加锁
+ 尝试加锁,尝试加锁带超时
+ 解锁
+ release唤醒
+ 测试
+ 不可重入

ReentrantLock原理

image-20220320143257653

image-20220320143313550

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
+ ReentrantLock原理
+ 实现lock
+ 内部sync同步器,继承aqs,实现了非公平和公平锁
+ 非公平锁实现原理(默认)
+ 加锁成功流程——lock()
+ cas修改status
+ 修改owner为当前线程
+ cas修改失败
+ JDK11 已经没有了,直接调用的AQS抽象类的acquire
+ JDK11 对于失败采用的是短路与的方式
+ tryacquire——cas重试
+ 重试失败,acquirequeue加入队列
+ 第1个线程直接抢占同步器,第2个线程初始化队列:产生哑元和Node1节点,第三个线程产生Node2节点并将其插入到哑元和Node1节点之后。就是这样一个过程
+ 队列大体是这样的:head->哑元->Node1(Thead-1)->Node2(Thread-2)->...->tail)
+ 获取前驱节点
+ 若为头结点(哨兵)
+ 再次尝试获取锁tryacquire
+ 仍然获取失败
+ 把前驱节点waitstatus改为-1(有责任唤醒后继节点)
+ 返回false
+ 重新进入acquirequeue
+ 再次tryacquire
+ 这次前驱node为-1
+ park阻塞当前线程
+ 解锁竞争成功流程
+ tryrelease
+ status、owner
+ unpark
+ 找到最近没取消的node,unpark
+ 唤醒后,重新进入循环tryacquire
+ 变成owner线程
+ 当前node变为头结点,哨兵节点
+ 解锁竞争失败流程
+ 不在等待队列中的线程,解锁时来竞争
+ node尝试获取锁失败
+ 重新进入park阻塞
+ 可重入原理(非公平锁为例)
+ 同一线程再次调用tryacquire
+ 判断状态为上锁
+ 再次判断owner是否为当前线程
+ 发生锁重入
+ status++表示重入两次
+ 释放锁tryrelease
+ status--
+ 但是status不为0,返回false
+ 再次调用tryrelease时将status变为0,设owner为null
+ 返回true
+ 可打断原理
+ 不可打断模式(默认)
+ 就是在获取到锁之前不能被打断
+ 不可打断就是会用interrupted标记自己被打断过,但不会立即响应打断,等获得锁之后再自己打断自己
+ 可打断模式interruptibly
+ park时打断,直接抛出异常,停止等待锁
+ 公平锁原理
+ 非公平锁
+ 没有锁时,直接去cas获取锁,不会检测等待队列
+ 公平锁
+ 先去检查aqs队列中是否有前驱节点,没有才去获取锁
+ 如果队列中没有第二个节点、第二个节点不是此线程
+ 返回没资格获取
+ 条件变量实现原理
+ 每个条件变量对应一个等待队列
+ await
+ 进入addconditionwaiter流程
+ 把当前线程加入条件变量的链表
+ 创建新的node状态为-2(每个node都有状态)
+ 加入链表
+ fullyrelease流程
+ 释放同步器上的所有锁
+ owner空出、 unpark后序节点
+ park当前线程,等待被唤醒
+ signal
+ 判断当前线程是否为锁持有者
+ 非持有者无法唤醒条件变量线程
+ 条件队列,找到队首元素
+ 断开node
+ node转移到竞争锁的队列中
+ 转移成功,修改node状态
+ 阻塞队列除了最后一个node状态为0,其他都为-1
+ 然后如果是CAS设置waitStatus出错的话,源码翻译好像说这个状态错误是短暂且无害的,但是为了正确,可能还是得重新让他抢锁,然后重新入队,再次CAS到状态正确就结束
+ 如果CAS错误,可能也有一种情况就是他超时正在取消,然后还没改waitStatus,等你调用CAS他的watiStatus已经改成取消了
+ 转移失败,使用条件队列下一个node唤醒

读写锁

image-20220320143730503

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
+ ReentrantReadWriteLock
+ 使用
+ 读读并发,读写互斥
+ 只是读就可以读,读的时候写了就阻塞读
+ 测试
+ 同时读取,不互斥
+ 读写,阻塞无法写,等待读锁释放
+ 写写互斥
+ 注意事项
+ 读锁不支持条件变量,写锁支持
+ 重入的时候,不能升级获取
+ 读锁时不能重入获取写
+ 重入时降级支持
+ 写锁时可以再次获取读
+ 有缓存数据,直接读取返回
+ 没有缓存
+ 获取写锁(提前释放读锁)
+ 再次判断缓存是否已经更新
+ 直接返回数据
+ 缓存仍不存在
+ 重新计算数据
+ 获取读锁,防止写线程干扰
+ 释放写锁
+ 读取数据
+ 应用之缓存
+ 添加缓存
+ 缓存map集合:key:标志缓存(sql、参数)
+ 查询:先从缓存中找,没有查询后添加到缓存

+ 查询key
+ 更新:删除缓存,更新
+ 问题分析
+ hashmap不安全
+ 高并发查询不安全
+ 缓存更新策略
+ 先清缓存,再更新数据库
+ 清空后还未更新,其他线程查询旧数据,同步到缓存(旧)
+ 先更新数据库 √
+ 还未清空缓存,其他线程查询到旧数据,才清空缓存(短暂不一致)
+ 可以加原子操作
+ 实现
+ 创建读写锁
+ update(更新、删缓存)添加写锁
+ 保证原子性
+ 直接查询缓存添加读锁
+ 查库(查询、放入缓存)添加写锁
+ 可能多个线程阻塞在写锁
+ 双重检查,再次判断缓存存在
+ 保证原子性
+ 缓存意义是为了减少连接数据库的查询用的,多线程下缓存的访问需要考虑线程安全问题,所以加锁
+ 读本来可以不加锁,但是写操作会导致读错误,所以加了读锁,会导致写锁加了不能写,还有其他原因
+ 补充
+ 适合读多写少
+ 问题
+ 缓存容量
+ 缓存过期
+ 单机
+ 并发低
+ 锁细分
+ 针对不同操作分区,上不同锁