写在前面
本篇内容是学习记录的一些笔记,在学习过程中有许多疑惑,通过多写Demo测试验证自己的想法,该过程比较杂乱无章,所以本篇内容更侧重于记录结论和自己的一些总结以及一些辅助自己容易回忆起的简单Demo等等。但也就如此而已,如果我真想不起来,还是更愿意去找网上的文章看看,毕竟人家的文章足够的好。
即使每个知识点网上都有,但我觉得通过自己整理过的东西会更容易理解,同时也能加深自己的记忆,而且在整理过程中,自己脑袋偶尔会产生一些想法,“如果我这么做,那他得到的结果会不会还是一样呢”,我不会放过它,这说不定能让我更深一步领悟该知识点。
学习收获
- api的使用(也就是JDK提供的各个并发类的玩法)
- api的了解,知道了可以拥有各种骚操作的玩法
- 如果遇到这方面的问题或者情景,知道有什么东西可以使用,提供给我问题排查的思路
- 学习到了一些思维,比如:无锁–CAS、AQS的原理—以及里面的等待队列唤醒机制、不会让CPU空转浪费资源、公平锁与非公平锁。
- 同步工具类底层实现都是unsafe类,unsafe类的使用
- 源码级别–shutdown和shutdownNow的区别,抛出中断异常底层是因为shutdownNow调用了interrupt方法
- 一些并发模式的思想
- 大任务分治处理,无序–Future 、有序–ForkJoin
- ……
文章目录
- 写在前面
- 学习收获
- 1. 并发编程基础篇
- 2. 并发编程进阶篇
- 3. 并发编程精通篇
-
- CountDownLatch工具类
- CyclicBarrier工具类
- Phaser(阶段器/移相器)
- Semaphore工具类
- Exchanger交换器
- ReentrantLock和ReentrantReadWriteLock
- StampedLock简介&写锁writeLock
- LockSupport
- AbstractQueuedSynchronizer(AQS)
- 锁的分类
- ThreadLocalRandom与Random类
- LongAdder高性能累加器
- Accumulator高性能累加器
- [
- 线程池拒绝策略
- ThreadFactory
- 处理线程池内未捕获异常
- 线程池shutdown 和shutdownNow方法
- 线程池状态
- 核心线程超时策略
- 线程池其他用法
- 线程及线程池切面
- 移除线程池中的任务
- Future模式
- Master-Worker模式
- CompletionService
- ForkJoin
1. 并发编程基础篇
线程中断(interrupt)
- 线程调用
interrupt()
方法不会让线程中断,只会给线程设置一个中断的标志(设置中断的flag为true),具体的中断仍需我们自己写程序来控制
其他方法:
Thread.currentThread().isInterrupted()
:获取当前线程的中断标记,调用此方法不会改变中断的状态,也就是说这个只是简单的get操作Thread.interrupted()
:不同于上面的方法,这是一个静态方法, 也是获取当前线程的中断标记,但是调用此方法会改变中断的状态,清除了线程的中断标记。也就是说get操作后会将是否处于中断的标记设置为false。
public boolean isInterrupted() {
return isInterrupted(false);
}
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
//demo
public class InterruptsTest {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
for (int i = 0; i <= 99999; i++) {
if (Thread.interrupted()) {
}
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + i + "线程处于中断状态");
System.out.println(Thread.currentThread().isInterrupted());
break;
}
System.out.println(Thread.currentThread().isInterrupted());
}
});
thread.start();
// 先睡一会觉
Thread.sleep(20);
// 再中断,
thread.interrupt();
System.out.println("main程序");
}
}
如果该线程在调用 Object 类的 wait()方法或 join()时被阻塞, sleep(long), 这个类的方法,那么它的中断状态会被清除并且会收到一个InterruptedException(中断异常)
如果遇到了中断异常,我们不必恐惧它,它的出现 说明我们在调用中断的时候被中断的线程中执行着sleep()、wait()、join()方法,可以利用这个异常来进行一些数据补偿之类的操作。
线程安全概念
- 线程安全的概念:当多个线程访问某一个类、对象或方法时,这个类、对象或方法都能表现出与单线程执行时一致的行为,那么这个类、对象或方法就是线程安全的。
- 线程安全问题都是由全局变量及静态变量引起的。
- 若每个线程中对全局变量、静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的;若有多个线程同的执行写操作,一般都需要考虑线程同步,否则的话就可能影响线程安全。
Synchronized概念
- Synchronized的作用是加锁,所有的synchronized方法都会顺序执行,(这里只占用CPU的顺序)。
- Synchronized方法执行方式:
- 首先尝试获得锁
- 如果获得锁,则执行Synchronized的方法体内容。
- 如果无法获得锁则等待,并且不断的尝试去获得锁,一旦锁被释放,则多个线程会同时去尝试获得锁,造成锁竞争问题。
锁竞争问题,在高并发、线程数量高时会引起CPU占用居高不下,或者直接宕机。
类锁和对象锁
- Synchronized作用在非静态方法上代表的对象锁(理解为资源),一个对象一个锁、多个对象之间不会发生锁竞争(各自获取自己的资源,就不会发生竞争)
- Synchronized作用在静态方法上则升级为类锁,所有对象共享一把锁,存在锁竞争(静态是供大家一起使用的,所以会发生锁竞争)。
对象锁的同步和异步
- 对象锁只针对synchronized修饰的方法生效、对象中的所有synchronized方法都会同步执行、而非synchronized方法异步执行
- 避免误区:类中有两个synchronized方法,两个线程分别调用两个方法,相互之间也需要竞争锁,因为两个方法属于同一个对象,而我们是在对象上加锁
并发脏读问题
- 多个线程访问同一个资源,在一个线程修改数据的过程中,有另外的线程来读取数据,就会引起脏读的产生。
- 为了避免脏读我们一定要保证数据修改操作的原子性、并且对读取操作也要进行同步控制
银行存钱和取钱的例子
synchronized锁重入
- 同一个线程得到了一个对象的锁之后,再次请求此对象时可以再次获得该对象的锁。
- 同一个对象内的多个synchronized方法可以锁重入
- 父子类可以锁重入,不用担心死锁问题
理解:自己拿到了资源自己可以反复的利用
抛出异常和锁的关系
一个线程在获得锁之后执行操作,发生错误抛出异常,则自动释放锁
- 可以利用抛出异常,主动释放锁
- 程序异常时防止资源被死锁、无法释放
- 异常释放锁可能导致数据不一致
Synchronized代码块和锁失效问题
Synchronized代码块:
- 同类型锁之间互斥,不同类型的锁之间互不干扰(比如类锁和对象锁互不影响)
不要在线程中修改对象锁的引用,引用被改变会导致锁失效
- 在线程中修改了锁对象的属性,而不修改引用则不会引起锁失效、不会产生线程安全问题。如代码//1
- 线程A修改了对象锁的引用,则线程B实际的到了新的对象锁,而不是锁被释放了,因此引发了线程安全问题。如代码//2
Person person = new Person();
synchronized(person){
//1
person.setName("周三");
//2
person = new Person();
}
线程之间通讯
- 每个线程都是独立运行的个体,线程通讯能让多个线程之间协同工作
- Object类中的wait/notify方法可以实现线程间通讯
- Wait/notify必须与synchronized一同使用
- Wait释放锁、notify不释放锁
使用关键字volatile
定义的变量也可以实现同样的效果
- Notify只会通知一个wait中的线程,并把锁给他,不会产生锁竞争问题,但是该线程处理完毕之后必须再次notify或notifyAll,完成类似链式的操作。
- NotifyAll会通知所有wait中的线程,会产生锁竞争问题。(理解:当同时通知所有等待的线程,这些线程争抢同一个资源)
守护线程和用户线程
- 线程分类: daemon线程(守护线程)、user线程(用户线程)
- 易混淆知识点:main函数所在的线程就是一个用户线程
- 重要知识点1∶最后一个user线程结束时,JVM会正常退出(在main启动时会启动一个DestroyJVM的用户线程来关闭JVM),不管是否有守护线程正在运行。反过来说∶只要有一个用户线程还没结束,JVM进程就不会结束。
- 重要知识点2∶父线程结束后,子线程还可以继续存活,子线程的生命周期不受父线程的影响
2. 并发编程进阶篇
volatile关键字
作用在类的变量上
- 强制线程到共享内存中读取数据,而不从线程工作内存中读取,从而使变量在多个线程间可见。
- volatile无法保证原子性,volatile属于轻量级的同步,性能比synchronized强很多(因为不加锁),但是只保证线程见的可见性,并不能替代synchronized的同步功能。netty框架中大量使用volatile
volatile只能解决可见性,不能保证原子性
volatile和static的区别
- Static保证唯一性,不保证一致性,多个实例共享一个静态变量。
- Volatile保证一致性,不保证唯一性,多个实例有多个volatile变量
static变量的修改方式和普通变量一样都是先拷贝再修改后写回主内存,只有一份(唯一性)
volatile直接在主内存中修改(可见性、一致性),每个实例都有单独的变量,只是一个普通变量
Atomic类的原子性
- 使用Atomiclnteger等原子类可以保证共享变量的原子性
- 但是Atomic类不能保证成员方法的原子性
原理是使用CAS
CAS原理解析
- JDK提供的非阻塞原子操作,通过硬件保证了比较、更新操作的原子性
- JDK的Unsafe类提供了一系列的compareAndSwap*方法来支持CAS操作
CAS-ABA问题:解决->(增加版本号、使用时间戳)
ThreaLocal和InheritableThreadLocal
使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本
- 父线程使用
ThreadLocal
创建的变量,在子线程中获取不到 - 如果子线程想要得到可以使用
InheritableThreadLocal
,且子线程之间不会互相影响
使用InheritableThreadLocal
的
- 好处是可以知道他的父线程,这样就方便我们跟踪程序执行到了什么地方
- 与ThreadLocal不同的是local变量的初始化时机(在Thread类里面找),在线程创建的时候会判断父线程是否是
InheritableThreadLocal
,如果是的话就将父线程的值赋值给当前线程,否则赋值为null,所以他的get方法会返回父线程的值
小结:
- Thread类中的threadLocals、inheritableThreadLocals成员变量为ThreadLocal.ThreadLocalMap对象Map的key值是ThreadLocal对象本身,查看set、set、remove方法
- Thread无法解决继承问题,而InheritableThreadLocal可以
- InheritableThreadLocal继承自ThreadLocal
- InheritableThreadLocal可以帮助我们做链路追踪、日志输出等
Unsafe类安全限定
AtomicXXX类的底层就是依赖Unsafe类里的相关方法(本地方法)来完成原子性操作
- Unsafe类是单例的
- 使用Unsafe类可以直接操控内存,所以他是不安全的类
- Unsafe类在rt.jar包里,是核心类,是BootStrapClassLoad来加载的,我们不能调用他的方法,原因是他会判断是否是根加载器调加载的类调用的他方法,如果不是抛出异常,所以Atomic类能调用
- 需要看它的源码,可以去github
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
Unsafe实操
Unsafe -突破安全限制
通过反射模式可以突破Unsafe类的安全限制
putInt、getInt、getAndSetInt、getAndAddInt、CAS、实例demo:
public class UnsafeDemo01 {
private int age;
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
UnsafeDemo01 demo01 = new UnsafeDemo01();
// 通过反射获取Unsafe的静态变量成员
Field field = Unsafe.class.getDeclaredField("theUnsafe");
// 设置无障碍
field.setAccessible(true);
// 强转获取Unsafe对象
Unsafe unsafe = (Unsafe) field.get(null);
// 获取age属性的内存偏移地址
long ageOffset = unsafe.objectFieldOffset(UnsafeDemo01.class.getDeclaredField("age"));
// 设置age值为11
unsafe.putInt(demo01, ageOffset, 11);
System.out.println("unsafe的put方法设置age :" + demo01.age);
// 获取age
int age = unsafe.getInt(demo01, ageOffset);
System.out.println("unsafe的get方法获取age : " + age);
// 验证CAS方法,如果它是10,设置为88,cas不一定保证成功,有默认的自旋次数,失败
boolean flag = unsafe.compareAndSwapInt(demo01, ageOffset, 10, 88);
System.out.println("CAS test :" + flag + " value is " + demo01.age);
// 成功
flag = unsafe.compareAndSwapInt(demo01, ageOffset, 11, 666);
System.out.println("CAS test :" + flag + " value is " + demo01.age);
// 获取并设置,返回的是旧的值
int setInt = unsafe.getAndSetInt(demo01, ageOffset, 333);
System.out.println("old value :" + setInt + "new value :" + demo01.age);
// 获取并增加
int addInt = unsafe.getAndAddInt(demo01, ageOffset, 7);
System.out.println("old value :" + setInt + "new value :" + demo01.age);
}
}
- 如果是针对static变量操作,则上面的方法不是再操作实例,而是需要操作类模板,例如
private static int age;
...
unsafe.staticFieldOffset(UnsafeDemo01.class.getDeclaredField("age"));
int age = unsafe.getInt(UnsafeDemo01.class, ageOffset);
-
如果是针对volatile变量操作,则几乎和普通变量没区别,只是put、get方法调用改成有volatile后缀的方法
-
如果是数组,则传入数组类模板来获取地址偏移量,设置数组元素值 使用偏移量+类型大小*下标
Unsafe unsafe = (Unsafe) field.get(null);
long arrOffset = unsafe.arrayBaseOffset(long[].class);
// 设置数组下标为1的元素的值为9
unsafe.putLong(demo01.getArray(),arrOffset + arrSize * 1)
**注意:**如果设置的大小超过了数组的长度,不会报错,但也不会扩容
- 直接操作内存**(危险性操作体现)**
public native long allocateMemory(long bytes);//分配内存
public native long reallocateMemory(long address, long bytes);//重新分配内存
public native void setMemory(Object o, long offset, long bytes, byte value);//初始化内存
public void setMemory(long address, long bytes, byte value)//初始化内存
public native void copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset, longbytes);//复制内存
public void copyMemory(long srcAddress, long destAddress, long bytes);//复制内存
public native void freeMemory(long address);//释放内存
有C++基础理解起来不难
Unsafe线程调度
- public native void park(boolean isAbsolute, long time);挂起线程
- public native void unpark(Object thread);唤醒线程
- 需要注意线程的interrupt方法同样能唤醒线程,但是不报错
- java.util.concurrent.locks.LockSupport使用unsafe实现,可以点进去看一下该类的源码和相关方法
public class UnsafeDemo02 {
public static void main(String[] args) throws Exception {
UnsafeDemo01 demo01 = new UnsafeDemo01();
// 通过反射获取Unsafe的静态变量成员
Field field = Unsafe.class.getDeclaredField("theUnsafe");
// 设置无障碍
field.setAccessible(true);
// 强转获取Unsafe对象
Unsafe unsafe = (Unsafe) field.get(null);
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "--start");
// 线程挂起2000 * 100000 纳秒 = 2s
unsafe.park(false, 2000 * 1000000);
if (Thread.currentThread().isInterrupted()) {
System.out.println("Thread status is interrupted");
}
// 挂起当前时间往后推2s
//unsafe.park(true, System.currentTimeMillis() + 2000);
System.out.println(Thread.currentThread().getName() + "--end");
});
thread.start();
// 唤醒线程
unsafe.unpark(thread);
// 设置中断flag
//thread.interrupt();
System.out.println("main run over");
}
}
同步并发类容器
- Collections.synchronizedXXX():可以将集合变成线程安全的同步类容器,原理是在整个集合上加了synchronized锁,并发会比较差
- jdk1.5后增加了
Concurrent
并发类容器:- ConcurrentHashMap替代HashMap、HashTable
- ConcurrentSkipListMap替代TreeMap
- ConcurrentHashMap:将hash表分为16个segment(分区),每个segment单独进行锁控制,从而减小了锁的粒度,提升了性能。
- COW(copy on write)
COWlterator的弱一致性
- 使用COW容器的iterator方法实际返回的是COWlterator实例,遍历的数据为快照数据,其他线程对于容器元素增加、删除、修改不对快照产生影响。
- 对java.util.concurrent.CopyOnWriteArrayList、java.util.concurrent.CopyOnWriteArraySet均适用。
常见并发队列
并发-无阻塞队列
**ConcurrentLinkedQueue**
:无阻塞、无锁、高性能、无界、线程安全,性能优于BlockingQueue(并发阻塞)、不允许null值
并发-阻塞队列
**ArrayBlockingQueue**
:基于数组实现的阻塞有界队列、创建时可指定长度,内部实现维护了一个定长数组用于缓存数据,内部没有采用读写分离,写入和读取数据不能同时进行,不允许null值
使用的时候要注意,该队列的offer、add、put方法有很大区别
**LinkedBlockingQueue**
:基于链表的阻塞队列,内部维护一个链表存储缓存数据,支持写入和读取的并发操作,创建时可指定长度也可以不指定,不指定时代表无界队列,不允许null值**SynchronousQueue **
:没有任何容量,必须先有线程先从队列中take,才能向queue中add数据,否则会抛出队列已满的异常。不能使用peek方法取数据,此方法底层没有实现,会直接返回null**PriorityBlockingQueue**
:一个无界阻塞队列,默认初始化长度11,也可以手动指定,但是队列会自动扩容。资源被耗尽时导致OutOfMemoryError。不允许使用null元素。不允许插入不可比较的对象(导致抛出 ClassCastException),加入的对象实现comparable接口(维护了take取出元素的优先级)**DelayQueue**
:https://www.cnblogs.com/myseries/p/10944211.html
3. 并发编程精通篇
CountDownLatch工具类
**CountDownLatch**
是一个辅助工具类(可以理解为一个同步计数器),它允许一个或多个线程等待系列指定操作的完成。CountDownLatch 以一个给定的数量初始化。countDown()每被调用一次,这一数量就减一。通过调用await()方法,线程可以阻塞等待这一数量到达零(当计数器数值减为0时,所有受其影响而等待的线程将会被激活)。
- 只能使用一次,计数器的值只能在构造方法中初始化一次,之后无法再改变它的值
CyclicBarrier工具类
它的作用就是会让所有线程都等待完成后才会继续下一步行动
CyclicBarrier 使用场景
可以用于多线程计算数据,最后合并计算结果的场景。(CountDownLatch也可以)
CyclicBarrier 与 CountDownLatch 区别
- CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
- CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
public class CyclicBarrierDemo {
public static void main(String[] args) {
test1();
}
private static void test1() {
int size = 4;
ExecutorService executorService = Executors.newFixedThreadPool(size);
CyclicBarrier cyclicBarrier = new CyclicBarrier(size, () -> {
System.out.println("最后一名是:" + Thread.currentThread().getName());
});
for (int i = 0; i < size; i++) {
Runnable r = () -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " :达到栅栏点A");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " :从栅栏点A出发");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " :达到栅栏点B");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " :从栅栏点B出发");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
};
executorService.execute(r);
}
executorService.shutdown();
}
}
运行结果
private static void test2() {
// 分为2个线程去查询,需要加1,主线程也必须算上.
int size = 3;
ExecutorService executorService = Executors.newFixedThreadPool(size);
CyclicBarrier cyclicBarrier = new CyclicBarrier(size, () -> {
System.out.println("查询结束");
});
AtomicInteger sum = new AtomicInteger();
Runnable r1 = ()->{
sum.getAndAdd(QueryUtil.querySaas());
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable r2 = ()->{
sum.getAndAdd(QueryUtil.queryDatabases());
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
};
executorService.execute(r1);
executorService.execute(r2);
try {
// 等待结果,主线程也必须参与等待
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("sum = " + sum);
executorService.shutdown();
}
class QueryUtil {
/** * 模拟查询中台 * * @return */
public static int querySaas() {
// 模拟业务耗时
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " :查询中台……");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10;
}
/** * 模拟查询数据库 * * @return */
public static int queryDatabases() {
// 模拟业务耗时
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " :查询数据库……");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 5;
}
}
Phaser(阶段器/移相器)
https://cloud.tencent.com/developer/article/1350849
Phaser(移相器,一种电子元件)是JDK7中引入的新的并发工具辅助类,oralce官网文档描述Phaser是一个可重复使用的同步栅栏,功能上与 CountDownLatch 和 CyclicBarrier类似但支持的场景更加灵活,这个类可能是目前并发包里面实现最复杂的一个了。
Phaser的灵活性主要体现在在构造函数时不需要强制指定目前有多少参与协作的线程,可以在运行时动态改变
- 在Phaser中计数器叫做parties
- 注册,使用Phaser的构造函数或者register()方法
- 调用register()方法,可以动态的控制phaser的个数
- 取消注册,调用arriveAndDeregister()方法
- arriveAndAwaitAdvance 到达此移相器并等待其他移相器
package study.demo;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/** * @author yhchen * @date 2022/9/13 19:40 */
public class PhaserDemo5 {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("=================step-" + phase + "===================" + registeredParties);
return super.onAdvance(phase, registeredParties);
}
};
Bus bus1 = new Bus(phaser, "小张");
Bus bus2 = new Bus(phaser, "小李");
Bus bus3 = new Bus(phaser, "小王");
bus1.start();
bus2.start();
bus3.start();
System.out.println(phaser.getRegisteredParties());
}
static public class Bus extends Thread {
private Phaser phaser;
private Random random;
public Bus(Phaser phaser, String name) {
this.phaser = phaser;
setName(name);
random = new Random();
phaser.register();
}
private void trip(int sleepRange, String cityName) {
System.out.println(this.getName() + " 准备去" + cityName + "....");
int sleep = random.nextInt(sleepRange);
try {
TimeUnit.SECONDS.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName() + " 达到" + cityName + "...... ");
if (this.getName().equals("小王1")) {
// 测试掉队的情况
try {
TimeUnit.SECONDS.sleep(7);
} catch (InterruptedException e) {
e.printStackTrace();
}
phaser.arriveAndDeregister();
} else {
phaser.arriveAndAwaitAdvance();
}
}
@Override
public void run() {
try {
int s = random.nextInt(3);
TimeUnit.SECONDS.sleep(s);
System.out.println(this.getName() + " 准备好了,旅行路线=北京=>上海=>杭州 ");
phaser.arriveAndAwaitAdvance();// 等待所有的汽车准备好
} catch (InterruptedException e) {
e.printStackTrace();
}
trip(5, "北京");
trip(5, "上海");
trip(3, "杭州");
}
}
}
Semaphore工具类
**Semaphore**
一个计数信号量。信号量维护了一个许可证集合,可以用来做流量分流。
- 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源
Semaphore
内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。- 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可
- 通过release释放许可证,其他线程才能进行获取。
- Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
公平性:没有办法保证线程能够公平地可从信号量中获得许可。也就是说,无法拉保掉第一个调用acquire()的线程会是第一个获得一个许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程前来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能会在第一个线程之前获得许可证。如果你想要强制公平,Semaphore类具有一个布尔类型的参数的构造子,通过这个参数以告知Semaphore是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它否则不要启用它。(默认不公平)
public class DemoSemaphore {
public static void main(String[] args) {
test1();
}
/** * 模拟10个顾客点餐,只有2个服务员 */
private static void test1() {
int size = 10;
Semaphore semaphore = new Semaphore(2);
ExecutorService executorService = Executors.newFixedThreadPool(size);
for (int i = 0; i < size; i++) {
Runnable r = () -> {
try {
System.out.println(Thread.currentThread().getName() + " :顾客呼叫服务员");
// 线程拿到许可证,可以继续执行
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "正在点餐……");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "点餐结束");
//执行完毕释放许可证,其他线程才能获取,若不释放,程序回一直阻塞
//semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
executorService.execute(r);
}
executorService.shutdown();
}
}
Exchanger交换器
Exchanger 原理
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。因此使用Exchanger的重点是成对的线程使用exchange()方法,当有一对线程达到了同步点,就会进行交换数据。因此该工具类的线程对象是成对的
- 只能用于成对线程之间,并且两个线程必须都到达汇合点才会进行数据交换.(如果是不成对的有一个线程回一直等待永远被阻塞,或者设置等待时间)
- String exchange(V x):用于交换,启动交换并等待另一个线程调用exchange
- String exchange(V x,long timeout,TimeUnit unit):用于交换,启动交换并等待另一个线程调用exchange,并且设置最大等待时间,当等待时间超过timeout便停止等待
package com.concurrent.demo;
import java.util.concurrent.Exchanger;
/** * @author cyh * @date 2022/9/17 16:25 */
public class DemoExChange {
public static void main(String[] args) {
test1();
}
public static void test1() {
Exchanger<String> exchanger = new Exchanger<>();
TaskExchange task1 = new TaskExchange(exchanger, "月饼");
TaskExchange task2 = new TaskExchange(exchanger, "蛋糕");
new Thread(task1, "小王").start();
new Thread(task2, "大明").start();
//new Thread(task2, "张三").start();
//new Thread(task2, "李四").start();
}
static class TaskExchange implements Runnable {
Exchanger<String> exchanger;
String goods;
public TaskExchange(Exchanger<String> exchanger, String goods) {
this.exchanger = exchanger;
this.goods = goods;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + " 拥有: " + goods);
try {
System.out.println(name + "交换出" + goods);
Thread.sleep(1000);
exchanger.exchange(goods);
System.out.println(name + " 交换到了: " + goods);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
ReentrantLock和ReentrantReadWriteLock
ReentrantLock
可以用来替代synchronized
,在需要同步的代码块加上锁,最后一定要释放锁,否则其他线程永远进不来。
- 底层实现原理(AQS)
- 公平与非公平性:是否尝试获取锁
ReentrantLock使用方法
- 显式声明
- 可以使用Condition代替wait和notify来进行线程间的通讯,一个ReentrantLock可以创建多个Condition,一个Condition只针对一把锁
- 公平性,在new的时候设置参数
ReentrantLock.newCondition()
创建的每一个Condition对象,实质上都是AQS.ConditionObject
对象,而这个对象也是一个FIFO的队列
解决线程安全问题使用ReentrantLock就可以了,但是ReentrantLock是独占锁,某一时刻只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁
ReentrantReadWriteLock使用方法
- 读写锁内部维护了一个ReadLock和一个WriteLock,他们依赖Sync实现具体功能,而Sync继承自AQS,并且提供了公平和非公平的实现。
package com.concurrent.demo;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** * @author cyh * @date 2022/9/17 20:19 */
public class DemoReentrantLock {
public static void main(String[] args) {
//test01();// 重入
test02();// 读写锁
}
private static void test02() {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
Thread read1 = new Thread(() -> {
readLock.lock();
System.out.println(Thread.currentThread().getName() + "获取读锁……");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
readLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
});
Thread read2 = new Thread(() -> {
readLock.lock();
System.out.println(Thread.currentThread().getName() + "获取读锁……");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
readLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放读锁");
});
Thread write1 = new Thread(() -> {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "获取写锁……");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
writeLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
});
Thread write2 = new Thread(() -> {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "获取写锁……");
System.out.println(Thread.currentThread().getName() + "持有写锁次数:" + writeLock.getHoldCount());
bbb(writeLock);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
writeLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
System.out.println(Thread.currentThread().getName() + "持有写锁次数:" + writeLock.getHoldCount());
});
// 读读不互斥
//read1.start();
//read2.start();
//读写互斥
//read1.start();
//write1.start();
//写写互斥
//write1.start();
write2.start(); //测试写锁重入
}
private static void bbb(ReentrantReadWriteLock.WriteLock writeLock) {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "持有写锁次数:" + writeLock.getHoldCount());
writeLock.unlock();
System.out.println(Thread.currentThread().getName() + "释放写锁");
}
private static void test01() {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "获得锁,当前锁数量:" + lock.getHoldCount());
Thread.sleep(1000);
aaa(lock);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + "释放锁");
}
});
t1.start();
}
private static void aaa(ReentrantLock lock) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "获得锁,当前锁数量:" + lock.getHoldCount());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + "释放锁");
}
}
}
StampedLock简介&写锁writeLock
StampedLock
特点:**写写互斥、读写互斥、读读共享 **
StampedLock
与ReentrantReadWriteLock
功能类似
- stampedLock中引入了一个stamp(邮戳)的概念。它代表线程获取到锁的版本,每一把锁都有一个唯一的stamp.
- 其写锁
StampedLock.writeLock
,类似于ReetrantReadWriteLock.writeLock
,区别是StampedLock.writeLock
是不可重入锁。 writeLock
与unlockWrite
必须成对儿使用,解锁时必须需要传入相对应的stamp才可以释放锁。每次获得锁之后都会得到一个新stamp值。
StampedLock悲观读锁
- 在多个线程之间依然存在写写互斥、读写互斥、读读共享的关系
- 读锁可以多次获取(没有写锁占用的情况下),写锁必须在读锁全部释放之后才能获取写锁
- 使用方法:默认悲观锁
悲观读锁并算不上绝对的悲观,排他锁才是真正的悲观锁,由于读锁具有读读共享的特性,所以对于读多写少的场景十分适用,可以大大提高并发性能。
StampedLock乐观读锁
https://www.jianshu.com/p/c9447688dc62
LockSupport
LockSupport
的底层采用Unsafe
类来实现,他是其他同步类的阻塞与唤醒的基础。- park与unpark需要成对适用,parkUntil与parkNanos可以单独适用
- 先调用unpark再调用park会导致unpark失效
- 线程中断interrupte会导致park失效并且不抛异常(所有park都失效)
- 例如blocker可以对堆栈进行追踪,官方推荐,例如结合jstack进行使用(也可以使用arthas工具来查看)
AbstractQueuedSynchronizer(AQS)
- 抽象队列同步器简称AQS,它是同步器的基础组件,JUC各种锁的底层实现均依赖于AQS,开发不需要使用
- 采用FIFO的双向队列实现,队列元素为Node(静态内部类),Node内的thread变量用于存储进入队列的线程。
- Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的。waitStatus记录当前线程等待状态可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点);prev记录当前节点的前驱节点,next记录当前节点的后继节点。
- 在AQS中维持了一个单一的状态信息state,可以通过getState、setState、compareAndSetState函数修改其值。
- 对于
ReentrantLock
的实现来说,state可以用来表示当前线程获取锁的可重入次数; - 对于读写锁
ReentrantReadWriteLock
来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数; - 对于
semaphore
来说,state用来表示当前可用信号的个数; - 对于
CountDownlatch
来说,state用来表示计数器当前的值。
- 对于
锁的分类
- **乐观锁/悲观锁: **是否在修改之前给记录增加排它锁
- **公平锁/非公平锁: **请求锁的时间顺序是否与获得锁的时间顺序一致。一致为公平锁,不一致为非公平锁
- **独占锁/共享锁: **是否可以被多个线程共同持有,可以则为共享锁、不可以则为独占锁
- **可重入锁: **一个线程再次获取它自己已经获取的锁时是否会被阻塞
- **自旋锁: **无法获取锁时是否立刻阻塞,还是继续尝试获取指定次数
乐观锁和悲观锁
- 乐观锁和悲观锁的概念来自于数据库
- 悲观锁对数据被修改持悲观态度,认为数据很容易就会被其他线程修改,所以在处理数据之前先加锁,处理完毕释放锁。
- 乐观锁对数据被修改持乐观态度,认为数据一般情况下不会被其他线程修改,所以在处理数据之前不会加锁,而是在数据进行更新时进行冲突检测。
- 对于数据库的悲观锁就是排它锁,在处理数据之前,先尝试给记录加排它锁,如果成功则继续处理,如果失败则挂起或抛出异常,直到数据处理完毕释放锁。
- 对于数据库的乐观锁所典型的就是CAS方式更新,例如:update name=‘zhangsan’ where id=1 and name='lisi’,在更新数据的时候校验这个值是否发生了变化,类似于CAS的操作。
公平锁与非公平锁
- 据线程获取锁的抢占机制,锁可以分为公平锁和非公平锁,最早请求锁的线程将最早获取到锁。而非公平锁则先请求不一定先得。JUC中的ReentrantLock提供了公平和非公平锁特性。
- 公平锁:ReentrantLock pairLock = new ReentrantLock(true)
- 非公平锁:ReentrantLock pairLock = new ReentrantLock(false),如果构造函数不传递参数,则默认是非公平锁。
- 非必要情况下使用非公平锁,公平锁存在性能开销
在入FIFO同步等待队列前是否尝试获取锁,如果有,那就是非公平;如果已进入同步等待队列,那么则按先进先出的顺序被唤醒
独占锁与共享锁
- 只能被单个线程所持有的锁是独占锁,可以被多个线程持有的锁是共享锁。
ReentrantLock
就是以独占方式实现的,属于悲观锁ReadWriteLock
读写锁是以共享锁方式实现的,属于乐观锁StampedLock
的写锁,属于悲观锁。StampedLock
的乐观读锁、悲观读锁,属于乐观锁。
可重入锁
- 当一个线程想要获取本线程已经持有的锁时,不会被阻塞,而是能够再次获得这个锁,这就是重入锁。
- Synchornized是一种可重入锁,内部维护一个线程标志(谁持有锁),以及一个计数器。
- ReentrantLock也是一种可重入锁
- ReadWriteLock、StampedLock的读锁也是可重入锁
自旋锁
- 当获取锁的时候如果发现锁已经被其他线程占有,则不阻塞自己,也不释放CPU使用权,而是尝试多
次获取,如果尝试了指定次数之后仍然没有获得锁,再阻塞线程。
- 自旋锁认为锁不会被长时间持有,使用CPU时间来换取线程上下文切换的开销,从而提高性能。但是可能会浪费CPU资源。
- -XX:PreBlockSpin=n可以设置自旋次数(已经成为了历史),在Jdk7u40时被删除了,其实在jdk6的时候就已经无效了,现在HotSpotVM采用的是adaptive spinning (自适应自旋),虚拟机会根据情况来对每个线程使用不同的自旋次数。
ThreadLocalRandom与Random类
Random
存在性能缺陷,主要原因是要不断的计算新的种子更新原种子,使用CAS方法。高并发的情况下会造成大量的线程自旋,而只有一个线程会更新成功。ThreadLocalRandom
采用ThreadLocal
的机制,每一个线程都是用自己的种子去进行计算下—个种子,规避CAS在并发下的问题。
SecureRandom坑记录——阻塞程序
SecureRandom.getInstanceStrong()
; 是jdk1.8里新增的加强版随机数实现
如果服务器在Linux操作系统上,这里的罪魁祸首是SecureRandom generateSeed()。它使用/dev/random生成种子。但是/dev/random是一个阻塞数字生成器,如果它没有足够的随机数据提供,它就一直等,这迫使JVM等待。键盘和鼠标输入以及磁盘活动可以产生所需的随机性或熵。但在一个服务器缺乏这样的活动,可能会出现问题
LongAdder高性能累加器
背景:AtomicLong
存在性能瓶颈,由于使用CAS方法。高并发的情况下会造成大量的线程自旋,而只有一个线程会更新成功,浪费CPU资源。
LongAdder的思想是将单一的原子变量拆分为多个变量,从而降低高并发下的资源争抢。
理解和总结:
AtomicLong
操作的使用CAS操作一个volatile修饰的变量,多个线程操作一个共享变量;LongAdder
类似分治归并,几个线程为一个单元操作一个变量,最后多个合并为一个- 合并的时机是请求结果的时候,多个单元将自己的结果合为最终结果,(sum()方法数组求和)
- 多个单元变量使用的是一个数组来存储,基础值是0,increment()和decrement()方法使值±1,也就是说操作的步长为1
- 有繁忙判定机制,是否是多并发场景,根据线程数与CPU数量的对比(有算法)
- 每个线程每次操作不一定都是同一个变量
没有incrementAndGet、decrementAndGet这种累加后获取值的原子性方法,只有单独的increment、longValue这种方法,如果组合使用则需要自己做同步控制,否则无法保证原子性。
Accumulator高性能累加器
LongAdder
是LongAccumulator
的特例,DoubleAdder
是DoubleAccumulator
的特列
Accumulator
牛b的地方在于可以设置初始值、自定义累加算法
***Accumulator 和 ***Adder 非常相似,实际上 ***Accumulator 就是一个更通用版本的 ***Adder,比如 LongAccumulator 是 LongAdder 的功能增强版,因为 LongAdder 的 API 只有对数值的加减,而 LongAccumulator 提供了自定义的函数操作。
使用方法
package study.demo;
import utils.LookTime;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.LongBinaryOperator;
/** * @author yhchen * @date 2022/9/20 15:37 */
public class DemoLongAccumulator {
public static void main(String[] args) throws InterruptedException {
test1();
}
private static void test1() throws InterruptedException {
int size = 1000;
// 自定义累加算法
AtomicLong atomicLong = new AtomicLong();
LongBinaryOperator binaryOperator = (left, right) -> left + right;
//累加器 参数(累加算法,初始值)
LongAccumulator accumulator = new LongAccumulator(binaryOperator, 3);
System.out.println("初始值=" + accumulator.longValue());
// System.out.println("初始值=" + atomicLong.longValue());
ArrayList<Thread> list = new ArrayList<>();
LookTime.setStartTime();
// 创建1000个线程
for (int i = 0; i < size; i++) {
Thread thread = new Thread(() -> {
// 每个线程要对累加器加多少值
accumulator.accumulate(1);
// atomicLong.getAndAdd(1);
});
list.add(thread);
}
// 线程全部启动
for (Thread thread : list) {
thread.start();
}
// 等待线程全部执行完
for (Thread thread : list) {
thread.join();
}
LookTime.lookSpendMill();
System.out.println("最终值=" + accumulator.longValue());
// System.out.println("最终值=" + atomicLong.longValue());
}
}
[
](https://blog.csdn.net/inthat/article/details/108469200)四种线程池
**CachedThreadPool**
具有缓存性质的线程池,线程最大空闲时间60s,线程可重复利用(缓存特性),没有最大线程数限制。任务耗时端,数量大。**FixedThreadPool**
具有固定数量的线程池,核心线程数等于最大线程数,线程最大空闲时间为0,执行完毕即销毁,超出最大线程数进行等待。高并发下控制性能(控制适合机子的线程数量,让服务不会被打死)。**ScheduledThreadPool**
具有时间调度特性的线程池,必须初始化核心线程数,底层使**DelayedWorkQueue**
实现延迟特性。**SingleThreadExecutor**
核心线程数与最大线程数均为1,用于不需要并发顺序执行。
总结:可以发现,这四个线程池都是通过new一个ThreadPoolExecutor来实现的,我们也可以通过这种方式来实现我们自己的线程池
线程池拒绝策略
jdk提供了四种拒绝策略
AbortPolicy
(默认):抛出异常,不影响其他线程运行CallerRunsPolicy
:调用当前任务(线程池没空闲线程了,使用调用的线程来执行,有点特别)DiscardOldestPolicy
丢弃最旧的任务DiscardPolicy
:直接丢弃,什么也不做
可以发现,四个策略都是实现了RejectedExecutionHandler接口,我们也可以做实现改接口来自定义拒绝策略
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 6, 30L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("做操作:打日志、发邮件、数据库操作等……");
}
});
}
}
ThreadFactory
我们通常使用线程池的submit方法将任务提交到线程池内执行。
如果此时线程池内有空闲的线程,则会立即执行该任务,如果没有则需要根据线程池的类型选择等待,或者新建线程。
所以线程池内的线程并不是线程池对象初始化( new )的时候就创建好的。而是当有任务被提交进来之后才创建的,而创建线程的过程是无法干预的。
如果我们想在每个线程创建时记录一些日志,或者推送一些消息那怎么做?
使用ThreadFactory
第一步: 编写ThreadFactory接口的实现类
第二步: 创建线程池时传入ThreadFactory对象
package study.thread01;
import java.util.concurrent.ThreadFactory;
/** * @author yhchen * @date 2022/9/27 21:15 */
public class ThreadFactoryDemo implements ThreadFactory {
/** * 执行标志 */
private boolean flag;
/** * 工厂名称 */
private String factoryName;
public ThreadFactoryDemo(boolean flag, String factoryName) {
this.flag = flag;
this.factoryName = factoryName;
}
@Override
public Thread newThread(Runnable r) {
if (flag) {
System.out.println("线程池--创建线程前--吃了一个水果");
}
Thread thread = new Thread(r);
// 设置线程名字
thread.setName("改了个名字:血汗"+factoryName);
//执行各种 初始化等等
if (flag) {
System.out.println("线程创建完成后又吃了一个西瓜");
}
return thread;
}
}
package study.thread01;
import java.util.concurrent.*;
/** * @author yhchen * @date 2022/9/27 21:24 */
public class ThreadFactoryClient {
public static void main(String[] args) {
Runnable runnable = () -> {
System.out.println("hhhhhhhhhh");
};
//创建工厂
ThreadFactoryDemo factoryDemo = new ThreadFactoryDemo(true, "工厂1");
//创建自定义线程池
ExecutorService executorService = Executors.newCachedThreadPool(factoryDemo);
for (int i = 0; i < 3; i++) {
executorService.execute(runnable);
}
executorService.shutdown();
}
}
只有使用Executors创建的线程池可以放这个线程创建工厂
处理线程池内未捕获异常
- 线程池内运行的线程如果发生异常,一定要捕获,要养成习惯。
- 可以采用更优雅的方式处理所有线程的异常,例如记录日志,发送预警消息等。
- 结合ThreadFactory以及线程的setUncaughtExceptionHandler方法来处理最为优雅
- 注意:上一点对execute方法提交的任务有效,对submit方法提交的任务无效,巨坑!
execute()方法和submit()的区别
- 接收的参数不同
- 返回值不同
虽然submit()方法可以提交Runnable类型的参数,但执行Future方法的get()时,线程执行完会返回null,不会有实际的返回值,这是因为Runable本来就没有返回值
当用submit()提交线程时,run()orcall()方法尽量显式的catch异常,这样才不至于任务提交线程池后丢失异常信息
线程池shutdown 和shutdownNow方法
shutdown
让线程池内的任务继续执行完毕,但是不允许新的任务提交shutdown
方法不阻塞,等所有线程执行完毕后,销毁线程shutdown
之后提交的任务会抛出RejectedExecutionException
异常,代表拒绝接收shutdownNow
之后提交的任务会抛出RejectedExecutionException
异常,代表拒绝接收,也不会执行线程池内的任务shutdownNow
之后会引发sleep、join、wait方法的InterruptedException
异常,如果任务中没有触发InterruptedException
的条件,则任务会继续运行直到结束
如果是使用Executors.newCachedThreadPool()创建的线程池,等线程执行完后程序不会马上停止,而是等待60s后(默认),线程池销毁,jvm才退出,如果是其他线程池有可能会无限等待,不会销毁,如果需要马上退出 在最后执行shutdown方法
扩展:
- 给线程设置的状态不一样,SHUTDOWN、STOP
shutdownNow
试图终止线程的方法是通过调用Thread.interrupt()
方法来实现的。这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出
线程池状态
- isShutdown用来判断线程池是否已经关闭
- isTerminated任务全部执行完毕,并且线程池已经关闭,才会返回true
- awaitTermination阻塞,直到所有任务在关闭请求后完成执行,或发生超时,或当前线程中断(以先发生者为准)。
核心线程超时策略
- 核心线程也允许销毁,
allowsCoreThreadTimeOut
就用来做这个事 - 设置控制核心线程是否可能超时的策略,如果在保持活动时间内没有任务到达,则该策略将在新任务到达时根据需要被替换。
- 如果为false,则不会由于缺少传入任务而终止核心线程。
- 如果为true,则应用于非核心线程的相同保持活动策略(keepAliveTime)也适用于核心线程。为避免连续更换线程,设置为true时保持活动时间必须大于零。
- 通常应该在池被激活之前调用此方法。(测试发现效果都一样,暂时没注意到有什么坑)
应用场景:
- 核心线程开的比较多,需要销毁
线程池其他用法
prestartCoreThread
:预启动,每调用一次,线程池中初始化一个核心线程prestartAllCoreThreads
:全启动
获取线程池的各种动态和静态数据,用于程序控制
- getCorePoolSize :返回核心线程数
- getPoolSize:返回当前线程池中的线程数
- getMaximumPoolSize:返回最大允许的线程数
- getLargestPoolSize:返回池中同时存在的最大线程数
- getTaskCount:返回预定执行的任务总和
- getCompletedTaskCount:返回当前线程池已经完成的任务数
- getActiveCount:返回正在执行任务的线程的大致数目
- getKeepAliveTime:返回线程池空闲时间
线程及线程池切面
- 在线程执行前、执行后增加切面,在线程池关闭时执行某段程序。
- 实现自己的线程池类继承
ThreadPoolExecutor
,并覆写beforeExecute、afterExecute、terminated方法
这样使用自己的实现的线程池来执行线程,切面功能会在每个线程生效
移除线程池中的任务
- 使用remove方法
- 正在运行中的任务不可以删除
- execute方法提交的,未运行的任务可以删除
- submit方法提交的,未运行任务就不可以删除,小心踩坑!
Future模式
Future模式是多线程开发中常见的一种设计模式。它的核心思想是异步调用。当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。
普通方式和Future模式的差别
普通模式是串行的,在遇到耗时操作的时候只能等待。
Future模式,只是发起了耗时操作,函数立马就返回了,并不会阻塞客户端线程。所以在工作线程执行耗时操作的时候客户端无需等待,可以继续做其他事情,等到需要的时候再向工作线程获取结果
public static void main(String[] args) {
LookTimeUtils.setStartTime();
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(QueryUtils::querySaas); //此方法耗时1s
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(QueryUtils::querySaas); //此方法耗时1s
int dataBase = QueryUtils.queryDataBase();//此方法耗时1s
// 可以将 future1-2塞到一个List<CompletableFuture>里面
CompletableFuture.allOf(future1,future2).join();
LookTimeUtils.lookSpendMill(); //总共耗时1064ms
System.out.println(dataBase + future1.join()+future2.join());
}
class QueryUtils {
public static int querySaas() {
System.out.println(Thread.currentThread().getName() + "-查询中台");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "查询中台结束");
return 2;
}
public static int queryDataBase() {
System.out.println(Thread.currentThread().getName() + "-查询数据库");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "查询数据库结束");
return 1;
}
}
将耗时任务拆分,分到多个future去做,可提高效率
Master-Worker模式
- Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集
- 客户端将所有任务提交给Master,Master分配Worker去并发处理任务,并将每一个任务的处理结果返回给Master,所有的任务处理完毕后,由Master进行结果汇总再返回给Client
使用CompletionService类可以完成Master-Worker模式的开发
CompletionService
如图说明了CompletionService
的使用模式:
- 可用它不断的提交任务(线程)给
Executor
处理处理后的结果都会自动放入BlockedQueue
,另外一个线程不断的从队列里取得处理结果 - 优点:哪个任务先处理完就能先得到哪个结果最后做汇总处理,从而轻松完成MasterWorker模式相同的功能
**CompletionService**
本质是线程池+阻塞队列
- 线程池用来处理任务
- 阻塞队列用来获取每个线程的执行结果
ForkJoin
核心思想:递归分治,如图
使用
- 创建任务池ForkJoinPool
- 创建自定义递归类实现RecursiveTask
- 重写compute方法
- 使用任务池提交任务
注意
- 递归分治思想
- 可以是不同的任务,不一定都是对同一类型的任务进行细分
- 本质上也是使用线程池来完成
- 模型和功能与
CompletionService
都很像,与大数据MapReduce也像 - 模型特点:任务盗取,完成了子任务的线程可以去获取执行慢的线程的子任务
- 任务调用执行的有序性,不同于
CompletionService
的无序性 - 应用场景很少,自我要求–了解即可
文章评论