一、基础概念
线程池是一种多线程开发的处理方式,线程池可以方便得对线程进行创建,执行、销毁和管理等操作。主要用来解决需要异步或并发执行任务的程序
简单点来说,就是预先保存好大量的资源,这些是可复用的资源,你需要的时候给你。
1.谈谈池化技术
在开发中我们经常会碰到“池”的概念,比如数据库连接池、内存池、线程池、常量池,对象池等。
为什么需 要“池”呢?
程序运行的本质,就是通过使用系统资源(CPU、内存、网络、磁盘等)来完成信息的处理, 比如在JVM中创建一个对象实例需要消耗CPU和内存资源,如果你的程序需要频繁创建大量的对象,并且这 些对象的存活时间短,就意味着需要进行频繁销毁,那么很有可能这部分代码会成为性能的瓶颈。
程序中当你创建一个线程或者在堆上申请一块内存时,都涉及到很多系统调用,也是非常消耗CPU的,如果你的程序需要很多类似的工作线程或者需要频繁的申请释放小块内存,如果没有在这方面进行优化,那很有可能这部分代码将会成为影响你整个程序性能的瓶颈。
而“池”就是用来解决这个问题的,简单来说,对象池就是把用过的对象保存起来,等下一次需要这种对象 的时候,直接从对象池中拿出来重复使用,避免频繁地创建和销毁。
在Java中万物皆对象,线程也是一个对 象,Java线程是对操作系统线程的封装,创建Java线程也需要消耗系统资源,因此就有了线程池。JDK中提 供了线程池的默认实现,我们也可以通过扩展Java原生线程池来实现自己的线程池。
2.线程池的作用
以下摘自《Java并发编程的艺术》
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,
还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
自己理解
1.线程池未出现前:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间,而且会消耗系统资源。
如果使用线程池后:线程在run()方法执行完后,不用将其销毁,让它继续保持空闲状态,当有新任务时让它继续执行新的任务。最后统一交给线程池来决定是否销毁线程。
2.可以控制需要创建的最大线程数,避免无限制的创建线程导致服务器CPU过高,影响服务器处理性能。
3.线程池的使用场景
首先系统的线程是非常宝贵的资源,不能进行滥用,能不用进行并发处理的,尽量不用。
1. 可以适当用的场景
-
io操作过多,如在垂直的业务下,需要非常多的去查询和统计数据,如果使用多个线程并行的去查询,然后交给主线程去统计汇总,那将能有效的解决性能问题。
-
2.需要异步操作
-
3.主流运用容器,如tomcat,dubbo等。
2. 不建议用的场景
1.如为了提升消费速度,多线程去处理mq消息队列的消息等,这种最好结合中间件的产品特征,借助中间件的特性如增加分片等方式。
二、线程池实现原理
1. 核心构造参数
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
核心构造参数
1.corePoolSize
核心池的大小。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2.maximumPoolSize
线程池最大线程数。线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果
3.keepAliveTime
表示线程没有任务执行时最多保持多久时间会终止,其中unit参数表示keepAliveTime的时间单位。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
4.workQueue阻塞队列
用于保存等待执行的任务的阻塞队列。
这个参数的选择也很重要,会对线程池的运行过程产生重大影响可以选择以下几个阻塞队列。
-
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,初始化时必须指定其大小。此队列按FIFO(先进先出)原则对元素进行排序。内部通过ReentrantLock 来保证并发的安全性
-
LinkedBlockingQueue:一个基于链表结构的阻塞队列。LinkedBlockingQueue的容量为Integer.MAX_VALUE即2^31-1.此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
-
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,
静态工厂方法Executors.newCachedThreadPool使用了这个队列。 -
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
5.threadFactory
线程工厂,主要用来创建线程;
通过它你可以扩展原生的线程工厂,比如给创建出来的 线程取个有意义的名字。
6.RejectedExecutionHandler(饱和策略)
当队列和线程池都满了,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
几种策略如下:
拒绝策略1:将抛出 RejectedExecutionException. RejectedExecutionHandler
handler = new ThreadPoolExecutor.AbortPolicy();表示无法处理新任务时抛出异常
策略2:ThreadPoolExecutor.CallerRunsPolicy 由调用线程( 提交任务的线程) 处理该任务。
它直接在 execute方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- 策略3:RejectedExecutionHandler handler = new
ThreadPoolExecutor.DiscardOldestPolicy();把最老的加入队列的线程丢弃掉。源码如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
- 策略4:ThreadPoolExecutor.DiscardPolicy 用于被拒绝任务的处理程序,默认情况下它将丢弃新来的被拒绝的任务。
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
这四种策略是独立无关的,是对任务拒绝处理的四中表现形式。
最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最旧任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。
除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。也可以让调用者去捕获抛出来的异常,而实现自己自定义的逻辑
除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将有调用者线程去执行。
最后在生产中常常使用自定义的拒绝策略,即多余的任务再次放入队列中,如果队列已满不成功将进入阻塞状态,直到队列中有空余的节点再唤醒阻塞的线程。
taskExecutor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error(e.toString(), e);
Thread.currentThread().interrupt();
}
}
);
2. 线程池的处理流程
当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?
下图就展示了线程池对任务的处理流程。
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。
1.线程池判断核心线程池里的线程是否都在执行任务。
- 如果不是,则创建一个新的工作线程来执行任务。
- 如果核心线程池里的线程都在执行任务,则进入下个流程。
举例
如核心线程池的容量为5个线程。
1)如果有3个线程在工作,另外有2个线程没有创建或者处于空闲状态,那么线程池就创建一个线程或者让空闲状态的线程来执行任务。
2)如果有5个线程都在工作,则进入下个流程。
2.线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这
个工作队列里(期间不会创建新的线程)。
如果工作队列满了,则进入下个流程。
3.线程池发现队列已经满了,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数maximumPoolSize,则不能再创建新的临时线程了,转而执行拒 绝策略handler,比如抛出异常或者由调用者线程来执行任务等。
名词解释:
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务
后,还会循环获取工作队列里的任务来执行。
1.ThreadPoolExecutor执行流程
ThreadPoolExecutor执行execute()方法的示意图,如图9-2所示。
ThreadPoolExecutor执行execute方法分下面4种情况。
1.如果当前运行的线程少于corePoolSize,则创建新线程来执行任务
2.如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
线程池会让corePoolSize里执行完任务的线程,反复的获取BlockingQueue的任务执行。
3.如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
4.如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能 地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后 (当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而 步骤2不需要获取全局锁。
java.util.concurrent.ThreadPoolExecutor#execute 源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
思考:
1).全局锁
2.ThreadPoolExecutor类
ThreadPoolExecutor类的层级结构如下:
Executor接口
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
Executor接口只有一个方法execute(),通过这个方法可以向线程池提交一个任务,交由线程池去执行
ExecutorService接口
1.submit():
提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
submit()和execute()的区别
1.最大的区别是submit()可以有返回值
2.submit()里面实际上也会执行execute()方法,只不过它利用了Future来获取任务执行结果。而execute没有返回结果
2.shutdown()
启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
如何提交线程
如可以先随便定义一个固定大小的线程池
ExecutorService es = Executors.newFixedThreadPool(3);
提交一个线程
es.submit(xxRunnble);
es.execute(xxRunnble);
如何关闭线程池
es.shutdown();
不再接受新的任务,之前提交的任务等执行结束再关闭线程池。
es.shutdownNow();
不再接受新的任务,试图停止池中的任务再关闭线程池,返回所有未处理的线程list任务列表。
3. Future和FutureTask
Future:Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。
1)多用于获取callable的返回结果
FutureTask:对所继承的接口进行了基本的实现。
其对应关系如下
思考
1.队列满了后是直接创建一个新的工作线程来执行任务吗?
答:是的,不会使用核心线程池里的线程,任务执行完后,这个线程的生命周期由所设置的keepAliveTime的大小控制。
2.线程池中线程复用原理
线程池将线程和任务进行解耦,线程是线程,任务是任务,摆脱了之前通过 Thread 创建线程时的一个线程必须对应一个任务的限制。
在线程池中,同一个线程可以从阻塞队列中不断获取新任务来执行,其核心原理在于线程池对Thread 进行了封装,并不是每次执行任务都会调用 Thread.start() 来创建新线程,而是让每个线程去执行一个“循环任务”。
在这个“循环任务”中不停检查是否有任务需要被执行,如果有则直接执行,也就是调用任务中的 run 方法,将 run 方法当成一个普通的方法执行,执行完后又开始循环检查。
通过这种方式只使用固定的线程就将所有任务的 run 方法串联起来。
三、线程池的应用
3.1 合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
- 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
- 任务的优先级:高、中和低。
- 任务的执行时间:长、中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
1.CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。(cpu太忙了,线程很难抢到CPU资源,用不好可能导致系统的cpu使用率太高导致系统处理能力下降)如代码内有循环执行的线程,开太多了会是cpu标高。
2.由于IO密集型任务线程并不是一直在执行任务,则应配尽可能多的线程,提高线程的利用率如2*Ncpu。
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
3.优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
注意 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
4.执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
5.依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
建议使用有界队列:有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。如果设置成无界队列可能会撑爆。如依赖于数据库的线程,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里或者当数据库发生超时等异常时,其他线程将不断进入阻塞队列,可能会撑爆jvm内存空间,导致整个系统不可用。
3.2 Executors框架
3.2.1 Executors总览
Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线 程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器 (Executor框架)将这些任务映射为固定数量的线程;
在底层,操作系统内核将这些线程映射到 硬件处理器上。这种两级调度模型的示意图如图10-1所示。
从图中可以看出,应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图10-7所示。
3.2.2 Executors几种创建线程池的方式
一般如果对线程池没有特别深入的研究或特别复杂的业务,不建议开发人员自己手动配置线程池。如果要手动配置线程池可以使用spring提供ThreadPoolTaskExecutor类进行实现。
java中的Executors提供了很多静态工厂来配置线程池如下所示(图片来源于core java):
推荐使用Executors.newCachedThreadPool():
- Executors.newCachedThreadPool()
其源码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个核心数为0,最大数为Integer.MAX_VALUE即(2^31)-1。
被创建的线程60秒没有任务的时候就会被回收。由于采用的是SynchronousQueue(一个不存储元素的阻塞队列),当任务超过核心数的时候,就会创建线程去执行任务。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。
极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
偏向于需要较多线程的业务,即cup空闲多需提升cup利用率的业务,主线程提交的任务需要及时执行的场景。主要针对于io密集性。
CachedThreadPool的实现原理如图:
对图10-6的说明如下。
1.首先执行SynchronousQueue.offer(Runnable task)。
如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤2)。
2.当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败。此时CachedThreadPool会创建一个新线程去执行任务,execute()方法执行完成。
3.在步骤2)中新创建的线程将任务执行完后,会执行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;
否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一
个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图10-7所示。
SynchronousQueue详解
作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:
1.SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
2.因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
3.SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
4.若使用 TransferQueue, 则队列中永远会存在一个 dummy node(这点后面详细阐述)
2.Executors.newFixedThreadPool()
FixedThreadPool被称为可重用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool 创建一个固定核心数和最大线程数的线程池,被创建的线程将不会被回收,超出的线程会在基于链表的阻塞队列中等待。
极端情况下,newFixedThreadPool会因为任务过多,导致任务队
列占满,最终oom。
偏向于控制线程数的业务,即需要较少线程的业务,即不能让你创建太多的线程。偏向于cpu密集型。
对图10-4的说明如下。
1)如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
2)在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入 LinkedBlockingQueue。
3)线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。 FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。
使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。
3.Executors.newSingleThreadExecutor
单线程线程池。其源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
单线程线程池,核心线程数和最大线程数均为1,空闲线程存活0毫秒同样无意思,意味着每次只有一个线程执行任务,多余的先存储到工作队列,一个一个执行,保证了线程的顺序执行。
极端情况下,因为任务过多,导致任务队列占满,最终oom。
对图10-5的说明如下。
1)如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线 程来执行任务。
2)在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入Linked- BlockingQueue。
3)线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来 执行。
4.Executors.newScheduledThreadPool
调度线程池。其源码如下:
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
即按一定的周期执行任务,即定时任务,对ThreadPoolExecutor进行了包装而已。
调度线程池的实现原理待续???
四、Java中的并发工具类
4.1 CountDownLatch
CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch类只提供了一个构造器如下:
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1
注意与join()方法的联系
参考资料
1.《Java并发编程的艺术》方腾飞 魏鹏 程晓明 著
2.《core java》
3. Java并发编程:线程池的使用:http://www.cnblogs.com/dolphin0520/p/3932921.html
4. 【死磕Java并发】—– 死磕 Java 并发精品合集http://cmsblogs.com/?p=2611
5. java高级应用:线程池全面解析https://mp.weixin.qq.com/s/fFZfEe10bdVKBndrEFH4fA
文章评论