Java原子变量类
# 为何需要原子变量类
保证线程安全是 Java 并发编程必须要解决的重要问题。Java 从原子性、可见性、有序性这三大特性入手,确保多线程的数据一致性。
确保线程安全最常见的做法是利用锁机制(Lock、sychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。
volatile 是轻量级的锁(自然比普通锁性能要好),它保证了共享变量在多线程中的可见性,但 无法保证原子性 。所以,它只能在一些特定场景下使用。
为了兼顾原子性以及锁带来的性能问题,Java 引入了 CAS (主要体现在 Unsafe 类)来实现非阻塞同步(也叫乐观锁)。并基于 CAS ,提供了一套原子工具类。
# 分类
原子变量类可以分为 4 组:
基本类型
- AtomicBoolean** - 布尔类型原子类**
- AtomicInteger- 整型原子类
- AtomicLong- 长整型原子类
引用类型
- AtomicReference- 引用类型原子类
- AtomicMarkableReference - 带有标记位的引用类型原子类
- AtomicStampedReference - 带有版本号的引用类型原子类。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
数组类型
- AtomicIntegerArray- 整形数组原子类
- AtomicLongArray- 长整型数组原子类
- AtomicReferenceArray** - 引用类型数组原子类**
属性更新器类型
- AtomicIntegerFieldUpdater- 整型字段的原子更新器。
- AtomicLongFieldUpdater- 长整型字段的原子更新器。
- AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。
# 基本类型
# AtomicInteger
- AtomicInteger常用api:
public final int get():获取当前的值
public final int getAndSet(int newValue):获取当前的值,并设置新的值
public final int getAndIncrement():获取当前的值,并自增
public final int getAndDecrement():获取当前的值,并自减
public final int getAndAdd(int delta):获取当前的值,并加上预期的值
void lazySet(int newValue): 最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
- 源码分析:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;// value 属性的偏移量,通过这个偏移量可以快速定位到 value 字段,这个是实现 AtomicInteger 的关键。
static {
try {
//用于获取value字段相对当前对象的“起始地址”的偏移量
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;// 使用volatile修饰变量
//返回当前值
public final int get() {
return value;
}
//递增加detla
public final int getAndAdd(int delta) {
//三个参数,1、当前的实例 2、value实例变量的偏移量 3、当前value要加上的数(value+delta)。
return unsafe.getAndAddInt(this, valueOffset, delta);
}
//递增加1
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
...
}
我们可以看到 AtomicInteger 底层用的是volatile的变量和CAS来进行更改数据的。
- volatile保证线程的可见性,多线程并发时,一个线程修改数据,可以保证其它线程立马看到修改后的值
- CAS 保证数据更新的原子性。
# 引用类型
# AtomicReference
- 基于AtomicReference实现一个自旋锁:
public class AtomicReferenceDemo2 {
private static int ticket = 10;
public static void main(String[] args) {
threadSafeDemo();
}
private static void threadSafeDemo() {
SpinLock lock = new SpinLock();
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread(lock));
}
executorService.shutdown();
}
/**
* 基于 {@link AtomicReference} 实现的简单自旋锁
*/
static class SpinLock {
private AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread current = Thread.currentThread();
while (!atomicReference.compareAndSet(null, current)) {}
}
public void unlock() {
Thread current = Thread.currentThread();
atomicReference.compareAndSet(current, null);
}
}
/**
* 利用自旋锁 {@link SpinLock} 并发处理数据
*/
static class MyThread implements Runnable {
private SpinLock lock;
public MyThread(SpinLock lock) {
this.lock = lock;
}
@Override
public void run() {
while (ticket > 0) {
lock.lock();
if (ticket > 0) {
System.out.println(Thread.currentThread().getName() + " 卖出了第 " + ticket + " 张票");
ticket--;
}
lock.unlock();
}
}
}
}
# AtomicMarkableReference
原子类的实现基于 CAS 机制,而 CAS 存在 ABA 问题(不了解 ABA 问题,可以参考:https://www.yuque.com/hanchanmingqi-zjjw3/kb/bboky9 (opens new window))。正是为了解决 ABA 问题,才有了 AtomicMarkableReference 和 AtomicStampedReference。
AtomicMarkableReference 使用一个布尔值作为标记,修改时在 true / false 之间切换。这种策略不能根本上解决 ABA 问题,但是可以降低 ABA 发生的几率。常用于缓存或者状态描述这样的场景。
public class AtomicMarkableReferenceDemo {
private final static String INIT_TEXT = "abc";
public static void main(String[] args) throws InterruptedException {
final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(INIT_TEXT, false);
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}
String name = Thread.currentThread().getName();
if (amr.compareAndSet(INIT_TEXT, name, amr.isMarked(), !amr.isMarked())) {
System.out.println(Thread.currentThread().getName() + " 修改了对象!");
System.out.println("新的对象为:" + amr.getReference());
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(3, TimeUnit.SECONDS);
}
}
# AtomicStampedReference
AtomicMarkableReference类不能彻底解决ABA的问题,所以AtomicStampedReference采用版本号的机制来完全避免ABA问题的出现。
AtomicStampedReference源码实现如下:
public class AtomicStampedReference<V> {
private static class Pair<T> {
final T reference; //维护对象引用
final int stamp; //用于标志版本
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
....
/**
* expectedReference :更新之前的原始值
* newReference : 将要更新的新值
* expectedStamp : 期待更新的标志版本
* newStamp : 将要更新的标志版本
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取当前的(元素值,版本号)对
Pair<V> current = pair;
return
// 引用没变
expectedReference == current.reference &&
// 版本号没变
expectedStamp == current.stamp &&
// 新引用等于旧引用
((newReference == current.reference &&
// 新版本号等于旧版本号
newStamp == current.stamp) ||
// 构造新的Pair对象并CAS更新
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
// 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
}
分析:
如果元素值和版本号都没有变化,并且和新的也相同,返回true;
如果元素值和版本号都没有变化,并且和新的不完全相同,就构造一个新的Pair对象并执行CAS更新pair。
- 使用举例:
private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);
public static void main(String[] args){
Thread main = new Thread(() -> {
System.out.println("操作线程" + Thread.currentThread() +",初始值 a = " + atomicStampedRef.getReference());
int stamp = atomicStampedRef.getStamp(); //获取当前标识别
try {
Thread.sleep(1000); //等待1秒 ,以便让干扰线程执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() +",CAS操作结果: " + isCASSuccess);
},"主操作线程");
Thread other = new Thread(() -> {
Thread.yield(); // 确保thread-main 优先执行
atomicStampedRef.compareAndSet(1,2,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1);
System.out.println("操作线程" + Thread.currentThread() +",【increment】 ,值 = "+ atomicStampedRef.getReference());
atomicStampedRef.compareAndSet(2,1,atomicStampedRef.getStamp(),atomicStampedRef.getStamp() +1);
System.out.println("操作线程" + Thread.currentThread() +",【decrement】 ,值 = "+ atomicStampedRef.getReference());
},"干扰线程");
main.start();
other.start();
}
输出:
// 输出
> 操作线程Thread[主操作线程,5,main],初始值 a = 2
> 操作线程Thread[干扰线程,5,main],【increment】 ,值 = 2
> 操作线程Thread[干扰线程,5,main],【decrement】 ,值 = 1
> 操作线程Thread[主操作线程,5,main],CAS操作结果: false
# 数组类型
Java 提供了以下针对数组的原子类:
- AtomicIntegerArray- 整形数组原子类
- AtomicLongArray- 长整型数组原子类
- AtomicReferenceArray - 引用类型数组原子类
已经有了针对基本类型和引用类型的原子类,为什么还要提供针对数组的原子类呢?
数组类型的原子类为 数组元素 提供了 volatile 类型的访问语义,这是普通数组所不具备的特性——volatile 类型的数组仅在数组引用上具有 volatile 语义 。
# AtomicIntegerArray
public class AtomicIntegerArrayDemo {
private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
public static void main(final String[] arguments) throws InterruptedException {
System.out.println("Init Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
atomicIntegerArray.set(i, i);
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
Thread t1 = new Thread(new Increment());
Thread t2 = new Thread(new Compare());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final Values: ");
for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.print(atomicIntegerArray.get(i) + " ");
}
System.out.println();
}
static class Increment implements Runnable {
@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
int value = atomicIntegerArray.incrementAndGet(i);
System.out.println(Thread.currentThread().getName() + ", index = " + i + ", value = " + value);
}
}
}
static class Compare implements Runnable {
@Override
public void run() {
for (int i = 0; i < atomicIntegerArray.length(); i++) {
boolean swapped = atomicIntegerArray.compareAndSet(i, 2, 3);
if (swapped) {
System.out.println(Thread.currentThread().getName() + " swapped, index = " + i + ", value = 3");
}
}
}
}
}
# 属性更新器
更新器类支持基于反射机制的更新字段值的原子操作。
- AtomicIntegerFieldUpdater- 整型字段的原子更新器。
- AtomicLongFieldUpdater- 长整型字段的原子更新器。
- AtomicReferenceFieldUpdater - 原子更新引用类型里的字段。
这些类的使用有一定限制:
- 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
- **字段必须是 ****volatile ** 类型的 ;
- 不能作用于静态变量(static);
- 不能作用于常量(final);
# AtomicReferenceFieldUpdater
public class AtomicReferenceFieldUpdaterDemo {
static User user = new User("begin");
static AtomicReferenceFieldUpdater<User, String> updater =
AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executorService.execute(new MyThread());
}
executorService.shutdown();
}
static class MyThread implements Runnable {
@Override
public void run() {
if (updater.compareAndSet(user, "begin", "end")) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 已修改 name = " + user.getName());
} else {
System.out.println(Thread.currentThread().getName() + " 已被其他线程修改");
}
}
}
static class User {
volatile String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
public User setName(String name) {
this.name = name;
return this;
}
}
}