什么是原子操作?如何实现原子操作?
什么是原子性?
事务的一大特性就是原子性(事务具有ACID四大特性),一个事务包含多个操作,这些操作要么全部执行,要么全都不执行
并发里的原子性和原子操作是一样的内涵和概念,假定有两个操作A和B都包含多个步骤,如果从执行A的线程来看,当另一个线程执行B时,要么将B全部执行完,要么完全不执行B,执行B的线程看A的操作也是一样的,那么A和B对彼此来说是原子的。
实现原子操作可以使用锁,锁机制,满足基本的需求是没有问题的了,但是有的时候我们的需求并非这么简单,我们需要更有效,更加灵活的机制,synchronized关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁,
这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次,如果获得锁的线程一直不释放锁怎么办?同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重。为了解决这个问题,Java提供了Atomic系列的原子操作类。
这些原子操作类其实是使用当前的处理器基本都支持CAS的指令,比如Intel的汇编指令cmpxchg,每个厂家所实现的具体算法并不一样,但是原理基本一样。每一个CAS操作过程都包含三个运算符:一个内存地址V,一个期望的值A和一个新值B,操作的时候如果这个地址上存放的值等于这个期望的值A,则将地址上的值赋为新值B,否则不做任何操作
CAS的基本思路 :如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。自然CAS操作执行完成时,在业务上不一定完成了,这个时候我们就会对CAS操作进行反复重试,于是就有了循环CAS。很明显,循环CAS就是在一个循环里不断的做cas操作,直到成功为止。Java中的Atomic系列的原子操作类的实现则是利用了循环CAS来实现
CAS实现原子操作的三大问题
ABA问题
因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。
ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。举个通俗点的例子,你倒了一杯水放桌子上,干了点别的事,然后同事把你水喝了又给你重新倒了一杯水,你回来看水还在,拿起来就喝,如果你不管水中间被人喝过,只关心水还在,这就是ABA问题。
循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
只能保证一个共享变量的原子操作
对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
Atomic系列
概述:java从JDK1.5开始提供了java.util.concurrent.atomic包(简称Atomic包),这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。本次我们只讲解
原理:
有3个操作数(内存值V, 旧的预期值A,要修改的值B)
当旧的预期值A == 内存值 此时修改成功,将V改为B
当旧的预期值A!=内存值 此时修改失败,不做任何操作并重新获取现在的最新值,再重复之前的方法(这个重新获取的动作就是自旋)
使用原子的方式更新基本类型,使用原子的方式更新基本类型Atomic包提供了以下3个类:
- AtomicBoolean: 原子更新布尔类型
- AtomicInteger: 原子更新整型
- AtomicLong: 原子更新长整型
以上3个类提供的方法几乎一模一样,所以本节仅以AtomicInteger为例进行讲解,AtomicInteger的常用方法如下:
- public AtomicInteger(): 初始化一个默认值为0的原子型Integer
- public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer
- int get(): 获取值
- int getAndIncrement(): 以原子方式将当前值加1,注意,这里返回的是自增前的值。
- int incrementAndGet(): 以原子方式将当前值加1,注意,这里返回的是自增后的值。
- int addAndGet(int data): 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果。
- int getAndSet(int value): 以原子方式设置为newValue的值,并返回旧值。
- boolean compareAndSet(int expect, int update) 于传入的expect值进行比较,如果当前值等于expect值,则更新为update。
代码实现:
package com.itheima.threadatom3;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomIntergerDemo1 {
// public AtomicInteger(): 初始化一个默认值为0的原子型Integer
// public AtomicInteger(int initialValue): 初始化一个指定值的原子型Integer
public static void main(String[] args) {
AtomicInteger ac = new AtomicInteger();
System.out.println(ac);
AtomicInteger ac2 = new AtomicInteger(10);
System.out.println(ac2);
}
}
package com.itheima.threadatom3;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomIntergerDemo2 {
// int get(): 获取值
// int getAndIncrement(): 以原子方式将当前值加1,注意,这里返回的是自增前的值。
// int incrementAndGet(): 以原子方式将当前值加1,注意,这里返回的是自增后的值。
// int addAndGet(int data): 以原子方式将参数与对象中的值相加,并返回结果。
// int getAndSet(int value): 以原子方式设置为newValue的值,并返回旧值。
public static void main(String[] args) {
// AtomicInteger ac1 = new AtomicInteger(10);
// System.out.println(ac1.get());
// AtomicInteger ac2 = new AtomicInteger(10);
// int andIncrement = ac2.getAndIncrement();
// System.out.println(andIncrement);
// System.out.println(ac2.get());
// AtomicInteger ac3 = new AtomicInteger(10);
// int i = ac3.incrementAndGet();
// System.out.println(i);//自增后的值
// System.out.println(ac3.get());
// AtomicInteger ac4 = new AtomicInteger(10);
// int i = ac4.addAndGet(20);
// System.out.println(i);
// System.out.println(ac4.get());
AtomicInteger ac5 = new AtomicInteger(100);
int andSet = ac5.getAndSet(20);
System.out.println(andSet);
System.out.println(ac5.get());
}
}
对于非基本类型可以使用 AtomicReference
代码演示:
public static void main(String[] args) {
StudentDemo studentDemo = new StudentDemo();
studentDemo.setName("张三");
studentDemo.setAge(18);
studentDemo.setSco(100f);
AtomicReference atomicReference = new AtomicReference(studentDemo);
StudentDemo studentDemo2 = new StudentDemo();
studentDemo2.setName("李四");
Object o = atomicReference.get();
studentDemo2.setName("王五");
atomicReference.compareAndSet(o,studentDemo2);
}
正如前面所讲过的aba问题,Atomic还提供了区别版本号的方法类
AtomicStampedReference代码演示
public static void main(String[] args) throws InterruptedException {
AtomicStampedReference<BigDecimal> bigDecimalAtomicReference = new AtomicStampedReference<>(new BigDecimal("10000"), 0);
for (int i = 0; i < 1000; i++) {
Thread x = new Thread(() -> {
while (true) {
BigDecimal bigDecimal = bigDecimalAtomicReference.getReference();
BigDecimal subtract = bigDecimal.subtract(BigDecimal.TEN);
int stamp = bigDecimalAtomicReference.getStamp();
if (bigDecimalAtomicReference.compareAndSet(bigDecimal, subtract, stamp, stamp + 1)) {
System.out.println(bigDecimalAtomicReference.getReference());
break;
}
}
});
x.start();
x.join();
}
}
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:
A - > B - > A - > C ,通过 AtomicStampedReference ,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心 是否更改过 ,所以就有了AtomicMarkableReference
AtomicMarkableReference代码演示
AtomicMarkableReference<BigDecimal> bigDecimalAtomicReference = new AtomicMarkableReference<>(new BigDecimal("10000"), true);
for (int i = 0; i < 1000; i++) {
Thread x = new Thread(() -> {
while (true) {
BigDecimal bigDecimal = bigDecimalAtomicReference.getReference();
BigDecimal subtract = bigDecimal.subtract(BigDecimal.TEN);
if (bigDecimalAtomicReference.compareAndSet(bigDecimal, subtract, true, false)) {
System.out.println(bigDecimalAtomicReference.getReference());
break;
}
}
});
x.start();
x.join();
}
}
AtomicMarkableReference 提供了一个标记,在修改成功的时候把标记也一起修改,待到下一次变更的时候,如果标记改变了,则说明已经变化过值。
LongAdder
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
// 累加单元数组 , 懒惰初始化
transient volatile Cell [] cells ;
// 基础值 , 如果没有竞争 , 则用 cas 累加这个域
transient volatile long base ;
// 在 cells 创建或扩容时 , 置为 1, 表示加锁
transient volatile int cellsBusy ;
伪共享原理
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
cell为累加单元,而方法上的@sun.misc.Contended 的作用在下面介绍
需要重缓存说起,在操作系统种由以下几中数据读取放方式,从cpu内部寄存器、一级缓存、二级缓存、三级缓存、内存
现在比较以下内存与缓存的数据差别
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte ( 8 个 long )
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节( 16 字节的对象头和 8 字节的 value ),因
此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加
Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的
padding ,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
对于LongAdder来说,内部有一个base变量,一个Cell[]数组。
在实际运用的时候,只有从未出现过并发冲突的时候,base基数才会使用到,一旦出现了并发冲突,之后所有的操作都只针对Cell[]数组中的单元Cell。
而LongAdder最终结果的求和(如上图),并没有使用全局锁,返回值不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加,所以只能得到某个时刻的近似值,这也就是LongAdder并不能完全替代LongAtomic的原因之一。
而且从测试情况来看,线程数越多,并发操作数越大,LongAdder的优势越大,线程数较小时,AtomicLong的性能还超过了LongAdder。
Semaphore
用于控制同时访问某个资源的线程数量
。
- 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
- 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。
常用API
构造器
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
- permits 表示许可证的数量(资源数)
- fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
常用方法
- acquire() 表示阻塞并获取许可
- tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
- release() 表示释放许可
举个栗子:模拟限流
/**
* 模拟限流场景
*/
@Slf4j
public class SemaphoreDemo {
/**
* 同一时刻最多只允许有两个并发
*/
private static Semaphore semaphore = new Semaphore(2);
private static Executor executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
for(int i=0;i<10;i++){
executor.execute(()->getProductInfo());
}
}
public static String getProductInfo() {
if(!semaphore.tryAcquire()){
log.error("请求被流控了");
return "请求被流控了";
}
try {
log.info("请求服务");
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
semaphore.release();
}
return "返回商品详情信息";
}
}
CountDownLatch
- 并行任务同步:CountDownLatch可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下一步操作。
- 多任务汇总:CountDownLatch可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
- 资源初始化:CountDownLatch可以用于等待资源的初始化完成,以便在资源初始化完成后开始使用。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
// 会将 count 减 1,直至为 0
public void countDown() { };
CyclicBarrier
CyclicBarrier可以被重用。
- 多线程任务:CyclicBarrier 可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操作。
- 数据处理:CyclicBarrier 可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
public CyclicBarrier(int parties)
// 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
public CyclicBarrier(int parties, Runnable barrierAction)
举个栗子
CyclicBarrier cb = new CyclicBarrier(2,()->{
System.out.println("执行完一轮");
}); // 个数为2时才会继续执行
new Thread(()->{
System.out.println("线程1开始.."+new Date());
try {
cb.await(); // 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..."+new Date());
}).start();
new Thread(()->{
System.out.println("线程2开始.."+new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
try {
cb.await(); // 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..."+new Date());
}).start();
}
文章评论