本系列文章:
多线程(一)线程与进程、Thread
多线程(二)Java内存模型、同步关键字
多线程(三)线程池
多线程(四)显式锁、队列同步器
多线程(五)可重入锁、读写锁
多线程(六)线程间通信机制
多线程(七)原子操作、阻塞队列
多线程(八)并发容器
多线程(九)并发工具类
多线程(十)多线程编程示例
一、原子类(无锁类)
原子操作指不可被中断的一个或一系列操作
。
在多线程程序中,诸如++i 或 i++等运算不具有原子性,是不安全的线程操作之一。通常我们会使用synchronized将该操作变成一个原子操作,但JVM为此类操作特意提供了一些同步类,使得使用更方便,且使程序运行效率变得更高。
从JDK 1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),Atomic包里面提供了一组原子类,它们可以自动的保证对于他们的操作是原子的并且不需要使用同步。
Atomic包大致可以属于4种类型的原子更新方式,分别是:原子更新基本类型、原子更新数组、原子更新引用、原子更新属性。
1.1 基本类型原子类*
Atomic包提高原子更新基本类型的工具类,主要有:AtomicInteger(以原子更新的方式更新Integer)、AtomicBoolean(以原子更新的方式更新boolean)、AtomicLong(以原子更新的方式更新Long)。3个类提供的方法几乎一模一样。
1.1.1 AtomicInteger*
AtomicInteger类常用的方法:
//获取当前的值
public final int get()
//获取当前的值,并设置新的值
public final int getAndSet(int newValue)
//获取当前的值,并自增
public final int getAndIncrement()
//获取当前的值,并自减
public final int getAndDecrement()
//获取当前的值,并加上预期的值
public final int getAndAdd(int delta)
//如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
boolean compareAndSet(int expect, int update)
//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
public final void lazySet(int newValue)
- 1、AtomicInteger的创建和获取值
AtomicInteger的创建分为两种:1、无参的,默认值0;有参的,指定默认值。
get():用于获取当前值,该方法不需要锁。
AtomicInteger i = new AtomicInteger();
System.out.println(i.get()); //0
AtomicInteger j = new AtomicInteger(10);
System.out.println(j.get()); //10
- 2、设置值
AtomicInteger i = new AtomicInteger();
i.set(12);
System.out.println(i.get()); //12
- 3、先取值,再设置值
AtomicInteger i = new AtomicInteger();
int result = i.getAndSet(10);
System.out.println(result); //0
System.out.println(i.get()); //10
- 4、先取值并且后加上指定的值
AtomicInteger i = new AtomicInteger(10);
int result = i.getAndAdd(10);
System.out.println(result); //10
System.out.println(i.get()); //20
- 5、先加上指定的值再取值
AtomicInteger i = new AtomicInteger(10);
int result = i.addAndGet(10);
System.out.println(result);//输出20
System.out.println(i.get());//输出20
- 6、先取值然后再+1
AtomicInteger i = new AtomicInteger();
int result = i.getAndIncrement();
System.out.println(result); //0
System.out.println(i.get()); //1
- 7、先+1再取值
AtomicInteger i = new AtomicInteger();
int result = i.incrementAndGet();
System.out.println(result); //1
System.out.println(i.get()); //1
- 8、先取值再-1
AtomicInteger i = new AtomicInteger(10);
int result = i.getAndDecrement();
System.out.println(result); //10
System.out.println(i.get()); //9
- 9、先-1再取值
AtomicInteger i = new AtomicInteger();
int result = i.decrementAndGet();
System.out.println(result); //9
System.out.println(i.get()); //9
- 10、快速失败(CAS)策略
是用于判断期望值是否与变量实际值相等,如果相等则将update赋值给变量,否则失败。示例:
//成功案例
AtomicInteger atomicInteger = new AtomicInteger(10);
boolean result = atomicInteger.compareAndSet(10, 12);
System.out.println(result); //true
System.out.println(atomicInteger.get()); //12
//失败案例
AtomicInteger atomicInteger1 = new AtomicInteger(10);
boolean result1 = atomicInteger1.compareAndSet(11, 12);
System.out.println(result1); //false
System.out.println(atomicInteger1.get()); //10
- 11、使用和不使用原子类实现线程安全的对比
class Test {
private volatile int count = 0;
//若要线程安全执行执行count++,需要加锁
public synchronized void increment() {
count++;
}
public int getCount() {
return count;
}
}
class Test2 {
private AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
//使用AtomicInteger之后,不需要加锁,也可以实现线程安全。
public int getCount() {
return count.get();
}
}
1.1.2 AtomicBoolean
- 1、AtomicBoolean的创建和获取值
AtomicBoolean的创建分为两种:1、无参的,默认值false;有参的,指定默认值。
AtomicBoolean bool = new AtomicBoolean();
System.out.println(bool.get()); //false
AtomicBoolean bool2 = new AtomicBoolean(true);
System.out.println(bool2.get()); //true
- 2、设置值
AtomicBoolean bool = new AtomicBoolean();
bool.set(true);
System.out.println(bool.get()); //true
- 3、先取值,再设置值
AtomicBoolean bool = new AtomicBoolean(true);
boolean result = bool.getAndSet(false);
System.out.println(result); //true
System.out.println(bool.get()); //false
- 4、快速失败(CAS)策略
是用于判断期望值是否与变量实际值相等,如果相等则将update赋值给变量,否则失败。示例:
//成功案例
AtomicBoolean bool = new AtomicBoolean(true);
boolean result = bool.compareAndSet(true, false);
System.out.println(result); //true
System.out.println(bool.get()); //false
//失败案例
AtomicBoolean bool1 = new AtomicBoolean(true);
boolean result1 = bool1.compareAndSet(false, true);
System.out.println(result1); //false
System.out.println(bool1.get()); //true
AtomicBoolean可以当做多线程中的开关flag,从而来代替synchronized这样比较重的锁
。
1.1.3 原子类实现*
原子类的用法基本一致,以AtomicInteger中的getAndIncrement方法为例,其源码:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
unsafe实例是通过UnSafe类的静态方法getUnsafe获取:
private static final Unsafe unsafe = Unsafe.getUnsafe();
valueOffset是由AtomicInteger类中的变量转化而来,源码:
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex); }
}
private volatile int value;
Unsafe类提供了一些底层操作,Atomic包下的原子操作类的也主要是通过 Unsafe类提供的compareAndSwapInt、compareAndSwapLong等一系列提供CAS操作的方法来进行实现。CAS操作能够保证数据更新的时候是线程安全的,并且由于CAS是采用乐观锁策略,因此,这种数据更新的方法也具有高效性。
示例:
private static AtomicInteger atomicInteger = new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(atomicInteger.getAndIncrement()); //1
System.out.println(atomicInteger.get()); //2
}
AtomicLong的实现原理和AtomicInteger 一致,只不过一个针对的是long变量,一个针对的是int变量。AtomicBoolean稍有不同,看下AtomicBoolean的compareAndSet方法:
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
上面的方法的实际上也是先转换成0、1的整型变量,然后是通过针对int型变量的原子更新方法compareAndSwapInt来实现的。
1.2 数组类型原子类
Atomic包下提供能原子更新数组中元素的类有:AtomicIntegerArray(整形数组原子类)、AtomicLongArray(长整形数组原子类)、AtomicReferenceArray(引用类型数组原子类)。
他们的用法基本一致,以AtomicIntegerArray来介绍下常用的方法:
//获取 index=i 位置元素的值
public final int get(int i)
//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndSet(int i, int newValue)
//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndIncrement(int i)
//获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndDecrement(int i)
//获取 index=i 位置元素的值,并加上预期的值
public final int getAndAdd(int i, int delta)
//如果输入的数值等于预期值,则以原子方式将index=i位置的元素值设置为输入值(update)
boolean compareAndSet(int i, int expect, int update)
//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
public final void lazySet(int i, int newValue)
AtomicIntegerArray与AtomicInteger的方法基本一致,只不过在 AtomicIntegerArray的方法中会多一个指定数组索引位 i。示例:
private static int[] value = new int[]{
1, 2, 3};
private static AtomicIntegerArray integerArray = new AtomicIntegerArray(value);
public static void main(String[] args) {
//对数组中索引为1的位置的元素加5
int result = integerArray.getAndAdd(1, 5);
System.out.println(integerArray.get(1)); //7
System.out.println(result); //2
}
1.3 引用类型原子类
Atomic包下相关的原子引用类:AtomicReference(引用类型原子类)、AtomicStampedReference(带有版本号的引用类型原子类)、AtomicMarkableReference(原子更新带有标记的引用类型)。
AtomicStampedRerence:他内部不仅维护了对象的值,还维护了一个时间戳(我们这里把他称为时间戳,实际上它可以使用任何一个整形来表示状态值),当AtomicStampedRerence对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedRerence设置对象值时,对象值及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变量,就能防止不恰当的写入。可以解决使用 CAS 进行原子更新时可能出现的ABA问题。
AtomicMarkableReference :该类将boolean标记与引用关联起来。
AtomicStampedRerence的关键方法:
//比较设置,参数依次为:期望值、写入新值、期望时间戳、新时间戳
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
//获得当前对象引用
public V getReference();
//获得当前时间戳
public int getStamp();
//设置当前对象引用和时间戳
public void set(V newReference, int newStamp);
使用AtomicStampedRerence解决ABA问题示例:
public class Demo {
//账户原始余额
static int accountMoney = 19;
//用于对账户余额做原子操作
static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(accountMoney, 0);
//模拟2个线程同时更新后台数据库,为用户充值
static void recharge() {
for (int i = 0; i < 2; i++) {
int stamp = money.getStamp();
new Thread(() -> {
for (int j = 0; j < 50; j++) {
Integer m = money.getReference();
if (m == accountMoney) {
if (money.compareAndSet(m, m + 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",小于20,充值20元成功,余额:" + money.getReference() + "元");
}
}
//休眠100ms
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
//模拟用户消费
static void consume() throws InterruptedException {
for (int i = 0; i < 50; i++) {
Integer m = money.getReference();
int stamp = money.getStamp();
if (m > 20) {
if (money.compareAndSet(m, m - 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",大于10,成功消费10元,余额:" + money.getReference() + "元");
}
}
//休眠50ms
TimeUnit.MILLISECONDS.sleep(50);
}
}
public static void main(String[] args) throws InterruptedException {
recharge();
consume();
}
}
结果:
当前时间戳:1,当前余额:19,小于20,充值20元成功,余额:39元
当前时间戳:2,当前余额:39,大于10,成功消费10元,余额:19元
1.4 字段类型原子类
如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类:
AtomicIntegerFieldUpdater:原子更新整形字段的更新器
AtomicLongFieldUpdater:原子更新长整形字段的更新器
AtomicReferenceFieldUpdater :原子更新引用类型里的字段的更新器
AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号,是为了解决 CAS 的 ABA 问题。
使用原子更新字段需要两步操作:
- 原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性;
- 更新类的属性必须使用public volatile进行修饰。
这几个类提供的方法基本一致,以AtomicIntegerFieldUpdater为例,看下其使用:
private static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
public static void main(String[] args) {
User user = new User("a", 1);
int oldValue = updater.getAndAdd(user, 5);
System.out.println(oldValue); //1
System.out.println(updater.get(user)); //6
}
static class User {
private String userName;
public volatile int age;
public User(String userName, int age) {
this.userName = userName;
this.age = age;
}
@Override
public String toString() {
return "User{" + "userName='" + userName + '\'' + ", age=" + age + '}';
}
}
1.5 原子操作的实现原理
原子操作相关的术语:
1.5.1 处理器如何实现原子操作*
处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。
- 1、使用总线锁保证原子性
第一个机制是通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写操作(i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致。举个例子,如果i=1,我们进行两次i++操作,我们期望的结果是3,但是有可能结果是2。示例:
原因可能是多个处理器同时从各自的缓存中读取变量i,分别进行加1操作,然后分别写入系统内存中。那么,想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作缓存了该共享变量内存地址的缓存。
处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。 - 2、使用缓存锁保证原子性
第二个机制是通过缓存锁定来保证原子性。在同一时刻,我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间的通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,目前处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
频繁使用的内存会缓存在处理器的L1、L2和L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在Pentium 6和目前的处理器中可以使用“缓存锁定”的方式来实现复杂的原子性。所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效,在如上图所示的例子中,当CPU1修改缓存行中的i时使用了缓存锁定,那么CPU2就不能同时缓存i的缓存行。
有两种情况下处理器不会使用缓存锁定:
- 当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line)时,则处理器会调用总线锁定。
- 有些处理器不支持缓存锁定。对于Intel 486和Pentium处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
1.5.2 Java如何实现原子操作*
在Java中可以通过锁和循环CAS的方式来实现原子操作。
- 1、使用循环CAS实现原子操作
JVM中的CAS操作正是利用了处理器提供的CMPXCHG指令实现的。自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止。
从JDK1.5开始,JUC里提供了一些类来支持原子操作,如AtomicBoolean、AtomicInteger和AtomicLong。 - 2、使用锁机制实现原子操作
锁机制保证了只有获得锁的线程才能够操作锁定的内存区域。JVM内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁。有意思的是除了偏向锁,JVM实现锁的方式都用了循环CAS,即当一个线程想进入同步块的时候使用循环CAS的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁。
1.6 JDK1.8中提供的原子类
1.6.1 LongAdder
AtomicLong是JDK1.5开始出现的,里面主要使用了一个long类型的value作为成员变量,然后使用循环的CAS操作去操作value的值,并发量比较大的情况下,CAS操作失败的概率较高,内部失败了会重试,导致耗时可能会增加。
LongAdder是JDK1.8开始出现的,所提供的API基本上可以替换掉原先的AtomicLong。LongAdder在并发量比较大的情况下,操作数据的时候,相当于把这个数字分成了很多份数字,然后交给多个人去管控,每个管控者负责保证部分数字在多线程情况下操作的正确性。当多线程访问的时,通过hash算法映射到具体管控者去操作数据,最后再汇总所有的管控者的数据,得到最终结果。相当于降低了并发情况下锁的粒度,所以效率比较高。
用LongAdder实现计数器示例:
public class Demo {
static LongAdder count = new LongAdder();
public static void incr() {
count.increment();
}
public static void main(String[] args) throws ExecutionException,InterruptedException {
for (int i = 0; i < 10; i++) {
count.reset();
m1();
}
}
private static void m1() throws ExecutionException, InterruptedException {
long t1 = System.currentTimeMillis();
int threadCount = 50;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000000; j++) {
incr();
}
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s", count.sum(), (t2 - t1)));
}
}
1.6.2 LongAccumulator
LongAccumulator是LongAdder的功能增强版。LongAdder的API只有对数值的加减,而LongAccumulator提供了自定义的函数操作,其构造函数:
//accumulatorFunction:需要执行的二元函数(接收2个long作为形参,并返回1个long)
//identity:初始值
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
同样实现计数器示例:
public class Demo4 {
static LongAccumulator count = new LongAccumulator((x, y) -> x + y, 0L);
public static void incr() {
count.accumulate(1);
}
public static void main(String[] args) throws ExecutionException,InterruptedException {
for (int i = 0; i < 10; i++) {
count.reset();
m1();
}
}
private static void m1() throws ExecutionException, InterruptedException {
long t1 = System.currentTimeMillis();
int threadCount = 50;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000000; j++) {
incr();
}
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("结果:%s,耗时(ms):%s",count.longValue(), (t2 - t1)));
}
}
1.7 原子类相关问题
1.7.1 Atomic的原理*
Atomic包中的类基本的特性就是在多线程环境下,当有多个线程同时对单个(包括基本类型及引用类型)变量进行操作时,具有排他性,即当多个线程同时对该变量的值进行更新时,仅有一个线程能成功,而未成功的线程可以向自旋锁一样,继续尝试,一直等到执行成功。
AtomicInteger类主要利用CAS + volatile
和native方法来保证原子操作,从而避免synchronized的高开销,提升执行效率。
CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址,返回值是 valueOffset。value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
1.7.2 volatile变量和Atomic变量的区别*
volatile常见的功能是保证其修饰的变量在不同线程之间的可见性和禁止重排序, 但它并不能保证原子性
。例如用volatile修饰count变量,那么count++操作就不是原子性的。
Atomic变量提供的方法可以让类似count++的操作具有原子性
。如AtomicInteger类中的getAndIncrement()方法会原子性的进行增量操作把当前值加1,其它数据类型和引用变量也可以进行相似操作。
二、BlockingQueue
在经典的生产者消费者问题中,阻塞队列常常被用到。因为BlockingQueue提供了可阻塞的插入和移除的方法。即:当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列空时,获取元素的线程会阻塞;
- 当队列满时,存储元素的线程会阻塞。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程
。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
BlockingQueue接口是Queue的子接口,它的主要用途并不是作为容器,而是作为线程同步的的工具,因此他具有一个很明显的特性,当生产者线程试图向BlockingQueue放入元素时,如果队列已满,则线程被阻塞,当消费者线程试图从中取出一个元素时,如果队列为空,则该线程会被阻塞,正是因为它所具有这个特性,所以在程序中多个线程交替向BlockingQueue中放入元素,取出元素,它可以很好的控制线程之间的通信。
2.1 阻塞队列的基本操作*
BlockingQueue基本操作:
抛异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
删除 | remove() | poll() | take() | poll(time,unit) |
获取 | element() | peek() |
2.1.1 从Queue继承的基本接口
BlockingQueue继承于Queue接口,对数据元素的基本接口有:
- 1、插入元素
//往队列插入数据
boolean add(E e);
//当往队列插入数据时,插入成功返回true,否则则返回false
boolean offer(E e);
- 2、删除元素
//从队列中删除数据,成功则返回true,否则为false
boolean remove(Object o);
//删除数据,当队列为空时,返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
- 3、查找元素
//获取队首元素
E element();
//获取队首元素
E peek();
2.1.2 BlockingQueue的特有接口
- 1、插入数据
//当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,
//直至阻塞队列已经有空余的容量可供使用
void put(E e) throws InterruptedException;
//若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列
//已经有空余的地方,与put方法不同的是,该方法会有一个超时时
//间,若超过当前给定的超时时间,插入数据的线程会退出
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
- 2、删除数据
//获取队首数据,当阻塞队列为空时,获取队头数据的线程会被阻塞
E take() throws InterruptedException;
//当阻塞队列为空时,获取数据的线程会被阻塞;如果被阻塞的线程
//超过了指定时间,该线程会退出
E poll(long timeout, TimeUnit unit) throws InterruptedException;
2.2 BlockingQueue实现类*
JDK1.7提供了7个阻塞队列:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- 1、ArrayBlockingQueue
ArrayBlockingQueue是由数组实现的有界(一旦创建,容量不能改变)阻塞队列。该队列中的元素FIFO(先进先出)
。因此,队头元素时队列中存在时间最长的数据元素,而队尾数据则是当前队列最新的数据元素。ArrayBlockingQueue可作为“有界数据缓冲区”,生产者插入数据到队列容器中,并由消费者提取。
当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。
如果保证公平性,通常会降低吞吐量。获得公平性的ArrayBlockingQueue示例:
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
- 2、LinkedBlockingQueue
LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存
。
通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE。 - 3、PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。需要注意的是不能保证同优先级元素的顺序。 - 4、SynchronousQueue
SynchronousQueue每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。
2.3 ArrayBlockingQueue实现原理
阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素
。
当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。
2.3.1 成员变量
ArrayBlockingQueue的主要属性:
//数据数组
final Object[] items;
//头节点下标
int takeIndex;
//尾节点下标
int putIndex;
//元素个数
int count;
//独占锁,入队和出队公用一个lock,说明不能同时出队和入队
final ReentrantLock lock;
//出队等待条件队列
private final Condition notEmpty;
//入队等待条件队列
private final Condition notFull;
可以看出ArrayBlockingQueue内部是采用数组进行数据存储的,为了保证线程安全,采用的是ReentrantLock。为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。
notEmpty和notFull等重要属性在构造方法中进行创建:
public ArrayBlockingQueue(int capacity) {
//false默认lock为非公平锁
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.3.2 元素入队
入队方法有3种:put入队,满则等待;offer入队,满则返回;add入队,满则抛异常。
- offer(E e)
public boolean offer(E e) {
checkNotNull(e);
//获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果队列满了,返回false
if (count == items.length)
return false;
else {
//入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
- put(E e)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果当前队列已满,将线程移入到notFull等待队列中
while (count == items.length)
notFull.await();
//满足插入数据的要求,直接进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
put方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。
- enqueue(E x)
private void enqueue(E x) {
final Object[] items = this.items;
//插入数据
items[putIndex] = x;
//如果putIndex超出数组范围了,就置为0
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消费者线程,当前队列中有数据可供消费
notEmpty.signal();
}
enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。
- offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
有的入队方法中中lock.lockInterruptibly()。ReentrantLock的中断和非中断加锁模式的区别在于:线程尝试获取锁操作失败后,在等待过程中,如果该线程被其他线程中断了,它是如何响应中断请求的。lock方法会忽略中断请求,继续获取锁直到成功;而lockInterruptibly则直接抛出中断异常来立即响应中断,由上层调用者处理中断。
2.3.3 元素出队
- take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,没有数据,将消费者线程移入等待队列中
while (count == 0)
notEmpty.await();
//获取数据
return dequeue();
} finally {
lock.unlock();
}
}
take方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作dequeue。
- dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取数据
E x = (E) items[takeIndex];
items[takeIndex] = null;
//如果takeIndex等于items.length,将takeIndex = 0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知被阻塞的生产者线程
notFull.signal();
return x;
}
dequeue方法也主要做了两件事情:
- 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);
- 通知notFull等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得lock,并执行完成功退出。
可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。
2.4 LinkedBlockingQueue实现原理
LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时未指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
具有两把锁takeLock、putLock。takeLock作为消费线程获取的锁,同时有个对应的notEmpty条件变量用于消费线程的阻塞和唤醒,putLock作为生产线程获取的锁,同时有个对应的notFull条件变量用于生产线程的阻塞和唤醒。
2.4.1 成员变量
LinkedBlockingQueue的主要属性:
//阻塞队列的容量,默认为Integer.MAX_VALUE,最大为Integer.MAX_VALUE
private final int capacity;
//阻塞队列的元素个数,原子变量
private final AtomicInteger count = new AtomicInteger();
//阻塞队列的头结点,并不是真正的头结点
transient Node<E> head;
//阻塞队列的尾结点
private transient Node<E> last;
//消费线程使用的锁
private final ReentrantLock takeLock = new ReentrantLock();
//notEmpty条件对象,当队列为空时用于挂起消费线程
private final Condition notEmpty = takeLock.newCondition();
//生产线程使用的锁
private final ReentrantLock putLock = new ReentrantLock();
//notFull条件对象,当队列已满时用于挂起生产线程
private final Condition notFull = putLock.newCondition();
//链表的结点内部类,用于存储数据
static class Node<E> {
//数据域
E item;
//后继引用
Node<E> next;
//构造器
Node(E x) {
item = x;
}
}
可以看出与ArrayBlockingQueue主要的区别是,LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(takeLock和putLock)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(notEmpty和notFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,链表中元素就是上面的Node内部类。
2.4.2 元素入队
以put(e)为例,此方法将指定的元素插入此队列的尾部,如果该队列已满,则线程等待。
如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断。如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。另外,如果指定元素为null则抛出NullPointerException异常。
在ArrayBlockingQueue中,生产(放入数据)线程阻塞的时候,需要消费(移除数据)线程才能唤醒,并且因为它们获取的同一个锁,消费和生产不能并发进行(假设一个线程仅仅从事生产或者消费工作的一种)。在LinkedBlockingQueue中,如果有线程因为获取不到锁或者队列已满而导致生产(放入数据)线程阻塞,那么他可能被后面的消费线程唤醒也可能被后面的生产线程唤醒。因为它内部有两个锁,生产和消费获取不同的锁,可以并行执行生产和消费任务,不仅在消费数据的时候会唤醒阻塞的生产线程,在生产数据的时候如果队列容量还没满,也会唤醒此前阻塞的生产线程继续生产。
put方法的大概步骤:
- 指定元素e的null校验;
- 新建结点node,lockInterruptibly可中断的等待获取生产者锁putLock,即响应中断;没有获取到锁就在同步队列中阻塞等待,被中断了则直接中断等待并抛出异常;
- 获取到锁之后,while循环判断此时结点数量是否等于容量,即队列是否满了,如果满了,那么该线程在notFull条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断;
- 队列没有满,node结点添加到链表尾部成为新的尾结点;
- 获取此时计数器的值赋给c,并且计数器值自增1;
- 如果c+1小于capacity,说明此时队列未满,还可以入队,那么唤醒一个在notFull条件队列中等待的生产线程;
- 释放生产者锁putLock;
- 如果前面没有发生异常,那么执行最后的if语句:如果c为0,那么此时队列中还可能有存在1条数据,刚放进去的那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程。如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者。注意这里唤醒消费者线程的时候,必须先获取Condition关联的消费者锁。
put方法源码:
/** * 将指定的元素插入此队列的尾部,如果该队列已满,则线程等待。 * * @param e 指定元素 * @throws InterruptedException 如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断 * 如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。 * @throws NullPointerException 如果指定元素为 null */
public void put(E e) throws InterruptedException {
//e的null校验
if (e == null) throw new NullPointerException();
int c = -1;
//新建结点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断的等待获取生产者锁,即响应中断
putLock.lockInterruptibly();
try {
//while循环判断此时结点数量是否等于容量,即队列是否满了
while (count.get() == capacity) {
//如果满了,那么该线程在notFull条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断
notFull.await();
}
// 队列没有满,结点添加到链表尾部
enqueue(node);
//获取此时计数器的值赋给c,并且计数器值自增1
c = count.getAndIncrement();
//如果c+1小于capacity,说明还可以入队
if (c + 1 < capacity)
//唤醒一个在notFull条件队列中等待的生产线程
notFull.signal();
} finally {
//释放生产者锁
putLock.unlock();
}
//如果前面没有抛出异常,那么在finally之后会执行下面的代码
//如果c为0,那么此时队列中还可能有存在1条数据,刚放进去的
//那么由于刚才队列没有数据,可能此时有消费者线程在等待,这里需要唤醒一个消费者线程
//如果此前队列中就有数据没有消费完毕,那么也不必唤醒唤醒消费者
if (c == 0)
//获取消费者锁并且尝试唤醒一个消费者线程
signalNotEmpty();
}
/** * 指定结点链接到队列尾部成为新的尾结点,在获取锁之后才会调用该方法 * @param node 指定结点 */
private void enqueue(Node<E> node) {
//很简单,原尾结点的next引用指向node结点,然后last指向最新node结点
last = last.next = node;
}
//唤醒一个在notEmpty条件队列中等待的消费线程,需要先获取消费者锁
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//阻塞式的获取消费者锁,即不响应中断
takeLock.lock();
try {
//唤醒一个在notEmpty条件队列中等待的消费线程
//要想调用Condition对象的方法,必须先要获取该Condition对象对应的lock锁
notEmpty.signal();
} finally {
//释放消费者锁
takeLock.unlock();
}
}
2.4.3 元素出队
以take方法为例,该方法的作用:获取并移除此队列的头部,在元素变得可用(队列非空)之前一直等待。
如果因为获取不到锁而在同步队列中等待的时候被中断则抛出InterruptedException,即响应中断。如果因为队列满了在条件队列中等待的时候在其他线程调用signal、signalAll方法唤醒该线程之前就因为中断而被唤醒了,也会抛出InterruptedException。
在ArrayBlockingQueue中,消费(移除数据)线程阻塞的时候,需要生产(放入数据)线程才能唤醒,并且因为它们获取的同一个锁,消费和生产不能并发进行(假设一个线程仅仅从事生产或者消费工作的一种)。在LinkedBlockingQueue中,如果有线程因为获取不到消费者锁或者队列已空而导致消费(移除数据)线程阻塞,那么他可能被后面的生产线程唤醒也可能被后面的消费线程唤醒。因为它内部有两个锁,生产和消费获取不同的锁,可以并行执行生产和消费任务,不仅在生产数据的时候会唤醒阻塞的消费线程,在消费数据的时候如果队列容量还没空,也会唤醒此前阻塞的消费线程继续消费。
take方法大概步骤:
- 指定元素e的null校验;
- lockInterruptibly可中断的等待获取消费者锁takeLock,即响应中断;没有获取到锁就在同步队列中阻塞等待,被中断了则直接中断等待并抛出异常;
- 获取到锁之后,while循环判断此时结点数量是否等于0,即队列是否空了,如果空了,那么该线程在notEmpty条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断;
- 队列没有空,调用dequeue方法获取并移除此队列的头部;
- 获取此时计数器的值赋给c,并且计数器值自减1;
- 如果c大于1,说明此时队列未空,说明还可以出队列,那么唤醒一个在notEmpty条件队列中等待的消费线程;
- 释放消费者锁putLock;
- 如果前面没有发生异常,那么执行最后的if语句:如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,这里需要唤醒一个生产者线程。如果此前队列中的数据没有满,那么也不必唤醒生产者。注意这里唤醒生产者线程的时候,必须先获取Condition关联的生产者锁。
take方法源码:
/** * 获取并移除此队列的头部 * * @return 被移除的队列头部元素 * @throws InterruptedException 因为获取不到锁而等待的时候被中断 */
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//可中断的等待获取消费者锁,即响应中断
takeLock.lockInterruptibly();
try {
//while循环判断此时结点数量是否等于0,即队列是否空了
while (count.get() == 0) {
//如果空了,那么该线程在notEmpty条件队列中等待并释放锁,被唤醒之后会继续尝试获取锁、并循环判断
notEmpty.await();
}
// 队列没有空,获取并移除此队列的头部
x = dequeue();
//获取此时计数器的值赋给c,并且计数器值自减1
c = count.getAndDecrement();
//如果c大于1,说明还可以出队列
if (c > 1)
//唤醒一个在notEmpty条件队列中等待的消费线程
notEmpty.signal();
} finally {
//释放消费者锁
takeLock.unlock();
}
//如果前面没有抛出异常,那么在finally之后会执行下面的代码
//如果c为capacity,那么此前队列中可能具有满的数据,可能此时有生产者线程在等待,
//这里需要唤醒一个生产者线程
//如果此前队列中的数据没有满,那么也不必唤醒唤醒生产者
if (c == capacity)
//获取生产者锁并且尝试唤醒一个生产者线程
signalNotFull();
//返回被移除的队列头部元素
return x;
}
//唤醒一个生产者线程,只会在take/poll方法中被调用
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
//阻塞式的获取生产者锁,即不响应中断
putLock.lock();
try {
//唤醒一个在notFull条件队列中等待的生产线程
notFull.signal();
} finally {
//释放生产者锁
putLock.unlock();
}
}
2.5 线程同步的7种方式*
- 1、同步方法
即有synchronized关键字修饰的方法。示例:
//synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类
public synchronized void save(){
}
- 2、同步代码块
即有synchronized关键字修饰的语句块。示例:
synchronized(object){
}
同步是一种高开销的操作,因此应该尽量减少同步的内容。通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。
- 3、使用特殊域变量(volatile)实现线程同步
volatile关键字可以保证一个变量在不同线程之间的可见性。示例:
class Bank {
//需要同步的变量加上volatile
private volatile int account = 100;
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
account += money;
}
}
- 4、使用重入锁实现线程同步
即ReentrantLock。示例:
class Bank {
private int account = 100;
//需要声明这个锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
lock.lock();
try{
account += money;
}finally{
lock.unlock();
}
}
}
- 5、使用局部变量实现线程同步
如果使用ThreadLocal管理变量,则每一个使用该变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。
ThreadLocal 类的常用方法
ThreadLocal():创建一个线程本地变量。
get():返回此线程局部变量的当前线程副本中的值。
initialValue():返回此线程局部变量的当前线程的"初始值"。
set(T value):将此线程局部变量的当前线程副本中的值设置为value。
示例:
public class Bank{
//使用ThreadLocal类管理共享变量account
private static ThreadLocal<Integer> account = new ThreadLocal<Integer>(){
@Override
protected Integer initialValue(){
return 100;
}
};
public void save(int money){
account.set(account.get()+money);
}
public int getAccount(){
return account.get();
}
}
- 6、使用阻塞队列实现线程同步
阻塞队列有多种,比如LinkedBlockingQueue。 LinkedBlockingQueue 类常用方法:
LinkedBlockingQueue():创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue
put(E e):在队尾添加一个元素,如果队列满则阻塞
size():返回队列中的元素个数
take():移除并返回队头元素,如果队列空则阻塞
示例:
package com.xhj.thread;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
/** * 用阻塞队列实现线程同步 LinkedBlockingQueue的使用 * * @author XIEHEJUN * */
public class BlockingSynchronizedThread {
/** * 定义一个阻塞队列用来存储生产出来的商品 */
private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
/** * 定义生产商品个数 */
private static final int size = 10;
/** * 定义启动线程的标志,为0时,启动生产商品的线程;为1时,启动消费商品的线程 */
private int flag = 0;
private class LinkBlockThread implements Runnable {
@Override
public void run() {
int new_flag = flag++;
System.out.println("启动线程 " + new_flag);
if (new_flag == 0) {
for (int i = 0; i < size; i++) {
int b = new Random().nextInt(255);
System.out.println("生产商品:" + b + "号");
try {
queue.put(b);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("仓库中还有商品:" + queue.size() + "个");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} else {
for (int i = 0; i < size / 2; i++) {
try {
int n = queue.take();
System.out.println("消费者买去了" + n + "号商品");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("仓库中还有商品:" + queue.size() + "个");
try {
Thread.sleep(100);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
}
public static void main(String[] args) {
BlockingSynchronizedThread bst = new BlockingSynchronizedThread();
LinkBlockThread lbt = bst.new LinkBlockThread();
Thread thread1 = new Thread(lbt);
Thread thread2 = new Thread(lbt);
thread1.start();
thread2.start();
}
}
- 7、使用原子变量实现线程同步
需要使用线程同步的根本原因在于对普通变量的操作不是原子的。原子操作就是指将读取变量值、修改变量值、保存变量值看成一个整体来操作。即这几种行为要么同时完成,要么都不完成。
原子变量有很多,比如AtomicInteger,可以用原子方式更新int的值。AtomicInteger常用方法:
AtomicInteger(int initialValue):创建具有给定初始值的新的AtomicInteger
addAddGet(int dalta):以原子方式将给定值与当前值相加
get():获取当前值
示例:
class Bank {
private AtomicInteger account = new AtomicInteger(100);
public AtomicInteger getAccount() {
return account;
}
public void save(int money) {
account.addAndGet(money);
}
}
文章评论