一、实现一个线程安全的单例模式
单例模式
- 在有些程序中,某个类在代码中只能创建一个实例,就可以将这个实例成为“单例”
- 实现一个代码,保证这个类只创建一个实例,这段代码就叫做“单例模式”
1.饿汉模式
- 创建实例,设置为私有的类属性(private、static)
- 设置私有的构造方法(private),防止在类外再次进行实例化
- 设置一个静态方法,用于获取唯一的实例
class MyDataSource{
private static MyDataSource instance = new MyDataSource();
private MyDataSource(){
}
public static MyDataSource getMyDataSource(){
return instance;
}
}
2.懒汉模式
- 设置私有的构造方法(private),防止在类外再次进行实例化
- 设置一个静态方法,创建实例,返回唯一的实例
- 与饿汉模式不同的是,懒汉模式是什么时候用,就什么时候创建实例
class MyDataSource{
private static MyDataSource instance = null;
private MyDataSource(){
}
public static MyDataSource getMyDataSource() {
if (instance == null) {
instance = new MyDataSource();
}
}
return instance;
}
}
3.线程安全性分析
- 饿汉模式是线程安全的,多线程获取实例,只是在进行“读”操作
- 懒汉模式是线程不安全的,会涉及到“读”和“修改”两个操作(第一次获取实例时,其余都是“读”操作)
未创建实例前,多线程调用getInstance()方法,因为线程的抢占式执行,可能会使类实例化两次
懒汉模式的线程不安全性
如果第一次获取实例时,t1 、t2 线程都调用getInstance()方法,可能会创建出两个实例
1 . 解决方法
通过synchronized加锁操作,将“读”和“修改”操作打包成原子的
class MyDataSource{
private static MyDataSource instance = null;
private static Object locker = new Object();
private MyDataSource(){
}
public static MyDataSource getInstance() {
synchronized (locker) {
if (instance == null) {
instance = new MyDataSource();
}
}
return instance;
}
}
2 . 优化
- 对于懒汉模式来说,线程不安全只是发生在为创建实例之前,即多线程第一次获取实例
- 创建实例之后,多线程调用getInstance()只是“读”操作,不加锁也能保证线程安全;如果每次调用getInstance()都要加锁的话,多线程之间势必会产生过多的锁竞争
- 第一个 if 判断是否需要加锁;第二个 if 判断是否需要创建实例
class MyDataSource{
private static MyDataSource instance = null;
private static Object locker = new Object();
private MyDataSource(){
}
public static MyDataSource getInstance() {
if(instance == null) {
synchronized (locker) {
if (instance == null) {
instance = new MyDataSource();
}
}
}
return instance;
}
}
- 针对指令重排序(可能会发生)问题优化
借鉴知乎大佬的文章:码农小胖
- 了解下 new MyDataSource() 这个操作的指令
a. 分配内存
b. 在内存上创建对象
c. 将这个内存的地址赋值给instance这个变量
- t1线程初始化instance的时候,b和c指令交换顺序;导致t2线程在某个时间点返回一个不完整的instance对象(还没有初始化)
- 使用volatile关键字 禁止指令重排序
class MyDataSource{
private static volatile MyDataSource instance = null;
private static Object locker = new Object();
private MyDataSource(){
}
public static MyDataSource getInstance() {
if(instance == null) {
synchronized (locker) {
if (instance == null) {
instance = new MyDataSource();
}
}
}
return instance;
}
}
二、阻塞队列
- 什么是阻塞队列
- 也是先进先出,是线程安全的队列
- 有“阻塞功能”,如果队列满了,尝试入队列,就阻塞;如果队列为空,尝试出队列,就阻塞;满足条件后就不在阻塞
- 生产者消费者模型
- 服务之间“解耦合”
A挂了时,B挂不了 - 在请求突然暴增时,能够“削峰填谷”
- 设置一个阻塞队列
设置一个生产者消费者模型,用阻塞队列作为交易场所
队列满了,入队列,生产者线程阻塞等待;队列空了,出队列,消费者线程阻塞等待
class MyBlockingQueue {
private int[] items = new int[100];
// 记录队首的元素
private int head;
// 记录队尾的位置,放下一个元素的位置
private int tail;
// 记录队列中的 元素个数
private int size;
private Object locker = new Object();
// 添加元素
public void put(int value) throws InterruptedException {
synchronized (locker) {
if (size == items.length) {
// 等待队列中出元素 腾出位置
locker.wait();
}
if (tail >= items.length) {
tail = 0;
}
items[tail] = value;
tail++;
size++;
// 唤醒调用take()后处于等待状态的线程
locker.notify();
}
}
// 删除元素
public Integer take() throws InterruptedException {
synchronized (locker) {
if (size == 0) {
// 等待队列入元素
locker.wait();
}
head++;
size--;
// 唤醒调用put()后处于等待状态的线程
locker.notify();
return items[head];
}
}
}
public class Test16 {
// 使用阻塞队列作为交易场所
private static MyBlockingQueue queue = new MyBlockingQueue();
public static void main(String[] args) {
// 实现一个生产者消费者模型
// 搞两个线程,一个作为生产者,一个作为消费者(生产者和消费者可以有多个)
Thread producer = new Thread(() -> {
int n = 1;
while(true){
try {
System.out.println("生产者生产了:" + n);
queue.put(n);
n++;
// 让生产者生产的慢,消费者消费的快
// Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.start();
Thread customer = new Thread(() -> {
while(true){
try {
int ret = queue.take();
System.out.println("消费者消费了:" + ret);
// 让消费者消费的快,生产者生产的慢
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
customer.start();
}
}
- Java标准库中的阻塞队列
public static void main(String[] args) {
}
BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>();
// 核心方法 put()
try {
queue.put(new Runnable() {
@Override
public void run() {
System.out.println("yyds");
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
// take() 返回队头元素
try {
Runnable runnable = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
三、实现一个定时器
定时器就相当于一个“闹钟”,到达一个特定的时间后,就开始执行指定的代码
1. 定时器的实现
(1)描述任务 ,使用一个worker类
- 描述任务具体作什么工作(Runnable)
- 描述任务执行的时间,使用绝对时间,不要使用相对时间(time)
- 任务执行(run())
(2)组织任务
- 使用一个最小堆(根据执行时间排序),堆顶的任务就是最近要执行的;同时应该使用优先级阻塞队列(PriorityBlockingQueue),因为线程使用堆时,要保证线程安全
扫描线程就是消费者,调用addwork()方法的线程就是生产者;队列中没有元素,消费者线程阻塞等待;队列满了,生产者线程阻塞等待- 分析一下PriorityBlockingQueue的构造方法 4中构造方法
- 无参数, 队列(数组)默认大小是11,
- 可以指定队列的大小
- 指定队列大小,同时设置比较器;不可以单独设置比较器;
- 还可以让Collection<? extends E> c作为参数,队列的大小就是c的大小
- 没有传递比较器时,一定要让需要比较的对象的类实现Comparable接口,重写compareTo()方法
- 设置一个方法,向队列中添加任务
(3) 设置一个扫描线程
- 不断获取堆顶的任务(最近要执行的任务),到达最近时间点就执行任务
- 没到最近时间点,就不要让线程“忙等”,即在到达最近时间点前,一直扫,一直扫,一直扫…这对于CPU是开销巨大的。
可以在到达最近时间点前,让线程处于阻塞等待状态,到达时间,就唤醒。- 扫描线程可以放到一个实例代码块中,也可以放到定时器的构造方法中
// 描述任务(任务内容 + 执行的时间)
class Worker implements Comparable<Worker>{
// 任务
private Runnable runnable;
// 执行的时间
private long time;
// 创建任务
// 此处的参数time是相对时间,执行的时间用绝对时间表示
public Worker(Runnable runnable, long after) {
this.runnable = runnable;
this.time = after + System.currentTimeMillis();
}
// 执行任务
public void run(){
runnable.run();
}
// 获取到执行的时间
public long getTime() {
return time;
}
@Override
public int compareTo(Worker o) {
// 从小到大排
return (int)(this.time - o.getTime());
}
@Override
public String toString() {
return "Worker{" +
"runnable=" + runnable +
", time=" + time +
'}';
}
}
class MyTimer{
// 使用一个最小堆(根据执行时间排序)组织任务,堆顶的任务就是最先要执行的任务
private PriorityBlockingQueue<Worker> queue = new PriorityBlockingQueue<Worker>();
// 向这个监视器中添加任务
public void addWork(Worker worker){
queue.put(worker);
}
// 使用一个 扫描线程 不断地扫描堆顶的任务,到达时间就执行任务
{
Thread t = new Thread(() -> {
try {
while (true) {
// 堆顶的任务出队列
Worker worker = queue.take();
// 这个地方啊,千万不要转为int类型,当前的时间戳数值太大了,超过了int所表示的范围了,如果类型转换的话,会得到一个错误的数值
long currentTime = System.currentTimeMillis();
// 判断是否执行
if (currentTime < worker.getTime()) {
queue.put(worker);
// 等待状态
synchronized (this) {
this.wait(worker.getTime() - currentTime);
}
} else {
worker.run();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
}
}
测试
public static void main(String[] args) {
MyTimer myTimer = new MyTimer();
Worker worker = new Worker(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
}, 3000);
myTimer.addWork(worker);
System.out.println("main结束");
}
此时 此时阻塞队列中没有元素,t 线程处在等待状态
2. Java标准库(java.util)中的定时器
Timer 核心方法是 schedule(TimeTask timeTask, long time),两个参数分别描述任务,和多长时间后执行
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
System.out.println("yyds");
}
}, 3000);
四、线程池
1. Java标准库中的线程池
- ThreadPoolExecutor的构造方法很复杂,
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 除了核心线程数外,允许其他线程存在的时间
- unit keepAliveTime的单位
- workQueue 组织任务的阻塞队列
- threadFactory 线程池里的线程创建的方式
- handler 如果任务队列满了,新增任务的处理方式(直接忽略新增的任务;干掉最老的任务;阻塞等待)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
- Executors
Java对ThreadPoolExecutor这个类进行了进一步的封装,提供了更简单的类Executors
Executors创建线程池的方式:
- newFixedThreadPool 创建固定线程数的线程池
- newCachedThreadPool 创建线程数目动态增长的线程池
- newSingleThreadExecutor 创建只包含单个线程的线程池
- newScheduledThreadPool 设定延迟时间后执行命令,或者定期执行命令,是进阶版的Timer ,返回ScheduledExecutorService,继承自ExecutorService
ExecutorService pool2 = Executors.newFixedThreadPool(10);
ExecutorService pool3 = Executors.newCachedThreadPool();
ExecutorService pool4 = Executors.newSingleThreadExecutor();
ExecutorService pool5 = Executors.newScheduledThreadPool(10)
2.实现一个线程池
(1)关于任务
- 描述任务----------Runnable
- 组织任务----------阻塞队列,BlockingQueue,线程安全的
队列满了,添加任务的线程阻塞等待(不太可能);队列空了,线程池中的线程阻塞等待- 新增任务----------设置方法
(2)关于工作线程
- 描述一个工作线程 --------子类继承自Thread,重写run()方法 ,扫描阻塞队列,执行任务
- 组织工作线程---------List
(3)线程池的构造方法,构造一个固定线程数的线程池
class MyThreadPool{
// 1. 描述任务就用 Runnable
// 2. 组织任务用 阻塞队列
private static BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>()
// 3.新增任务
public void AddRunnable(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 4. 描述工作线程
static class ThreadWorker extends Thread{
@Override
public void run() {
while(true){
try {
Runnable work = queue.take();
work.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 5.组织工作线程
private List<ThreadWorker> list = new ArrayList<>();
// 6.线程池构造方法
public MyThreadPool(int n) {
for (int i = 0; i < n; i++) {
ThreadWorker threadWorker = new ThreadWorker();
threadWorker.start();
list.add(threadWorker);
}
}
}
public class Pool {
public static void main(String[] args) {
// 创建线程池
MyThreadPool pool = new MyThreadPool(10);
// 创建10个任务
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
};
// 向线程池中添加任务
pool.AddRunnable(runnable);
}
System.out.println("main线程结束");
}
}
// 打印10个hello
第二种
class ThreadPool{
// 组织任务
private BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>();
// 新增任务
public void AddWork(Runnable runnable){
try {
queue.put(runnable);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 组织工作线程
private List<Thread> threads = new ArrayList<>();
// 构造方法
public ThreadPool(int n) {
for (int i = 0; i < n; i++) {
Thread t = new Thread( () -> {
while(true){
try {
Runnable runnable = queue.take();
runnable.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
threads.add(t);
}
}
}
public class Pool2 {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(10);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
};
pool.AddWork(runnable);
}
System.out.println("main线程结束");
}
}
文章评论