AtomicInteger解析

本文代码基于 JDK 8

前言

AtomicIntegerInteger 类型的线程安全原子类,可以在应用程序中以原子方式更新 int 值,是 atomic 框架中经常使用的原子类。

AtomicInteger 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AtomicTest {
public static AtomicInteger atomicRace = new AtomicInteger(0);
public static int race = 0;

public static void increase() {
atomicRace.incrementAndGet();
race++;
}

private static final int THREADS_COUNT = 20;

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(() -> {
for (int i1 = 0; i1 < 10000; i1++) {
increase();
}
});
threads[i].start();
}
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i].join();
}
System.out.println("atomic race: " + atomicRace);
System.out.println(" race: " + race);
}
}

执行结果如下:

1
2
atomic race: 200000
race: 114084

由于自增操作并不能保证原子性,所以并发情况下 race 的值每次不太一样,而使用 AtomicInteger 可以保证自增操作原子性,使得每次结果都是 200000。

AtomicInteger 底层实现

AtomicInteger 继承 Number,底层是通过 Unsafe 类做 CAS 操作,来原子的更新状态值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// 使用 Unsafe.compareAndSwapInt 去更新状态值
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// AtomicIntegerd 的值
private volatile int value;
// 根据 initialValue 创建一个新的 AtomicInteger 对象
public AtomicInteger(int initialValue) {
value = initialValue;
}
// 创建一个新的 AtomicInteger
public AtomicInteger() {
}
}

AtomicInteger 常用方法

AtomicInteger 常用方法包括原子的加、减等,类似 i++、++i、i–、–i 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 获取当前值
public final int get() {
return value;
}
// 将 newValue 设置为当前值
public final void set(int newValue) {
value = newValue;
}
// 原子的将旧状态值设置为 newValue 并返回旧状态值
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
// 如果expect值和当前值相等且原子的设置当前值为 update,则返回 true,否则返回 false
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 原子的将当前状态值加一,返回之前的状态值,先取后增
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 原子的将当前状态值减一,返回之前的状态值,先取后减
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
// 原子的将当前状态值加 delta,返回之前的状态值,先取后增
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
// 原子的将当前状态值减 delta,返回之前的状态值,先增后取
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// 原子的将当前状态值减一,返回当前的状态值,先减后取
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
// 原子的将当前状态值加 delta,返回当前的状态值,先增后取
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

AtomicInteger 特殊方法

lazySet 方法

1
2
3
4
5
// set() 方法的不可见版本
public final void lazySet(int newValue) {
// putOrderedInt 设置值并且马上写入主存,该变量必须是 volatile 类型
unsafe.putOrderedInt(this, valueOffset, newValue);
}

lazySet()⽅法是 set() ⽅法的不可⻅版本。通过 volatile 修饰的变量,可以保证在多处理器环境下的可⻅性,即当⼀个线程修改⼀个共 享变量时,其它线程能⽴即读到这个修改的值。

volatile 的实现加了内存屏障:

  1. 保证写 volatile 变量时会强制把 CPU 写缓存区的数据刷新到内存。
  2. volatile 变量时,使缓存失效,强制从内存中读取最新的值。
  3. 由于内存屏障的存在,volatile变量还能阻⽌重排序。

lazySet 内部调⽤了 Unsafe 类的 putOrderedInt ⽅法,通过该⽅法对共享变量值的改变,不⼀定能被其他线 程⽴即看到。也就是说以普通变量的操作⽅式来写变量。

什么情况下需要使⽤ lazySet 呢? 考虑下⾯这样⼀个场景:

1
2
3
4
5
6
7
AtomicInteger ai = new AtomicInteger();
lock.lock();
try {
// ai.set(1);
} finally {
lock.unlock();
}

由于锁的存在:

  • lock() ⽅法获取锁时,和 volatile 变量的读操作⼀样,会强制使 CPU 缓存失效,强制从内存读取变量。
  • unlock() ⽅法释放锁时,和 volatile 变量的写操作⼀样,会强制刷新 CPU 写缓冲区,把缓存数据写到主内存。

所以,上述 ai.set(1)可以⽤ ai.lazySet(1) ⽅法替换: 由锁来保证共享变量的可⻅性,以设置普通变量的⽅式来修改共享变量,减少不必要的内存屏障,从⽽提⾼ 程序执⾏的效率。

weakCompareAndSet 方法

weakCompareAndSet 操作仅保留了 volatile自身变量的特性,而去除了 happens-before 规则带来的内存语义。也就是说,weakCompareAndSet 无法保证处理操作目标的 volatile 变量外的其他变量的执行顺序( 编译器和处理器为了优化程序性能而对指令序列进行重新排序 ),同时也无法保证这些变量的可见性。

1
2
3
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

在 JDK 8 及之前的版本,weakCompareAndSet 方法并没有被真正意义上的实现,目前该方法所呈现出来的效果与 compareAndSet 方法是一样的。

在 JDK 9 中 compareAndSetweakCompareAndSet 方法的实现有些许的不同

  1. 底层调用的 native 方法的实现中,cmpxchgb 指令前都会有“lock”前缀,而在 JDK 8 中,程序会根据当前处理器的类型来决定是否为 cmpxchg 指令添加 lock 前缀,只有在 CPU 是多处理器(multi processors)的时候,会添加一个 lock 前缀)。
  2. 新增 @HotSpotIntrinsicCandidate 注解,该注解是特定于 Java 虚拟机的注解。通过该注解表示的方法可能( 但不保证 )通过 HotSpot VM 自己来写汇编或IR编译器来实现该方法以提供性能。

AtomicInteger 新增方法

在 JDK 8 中,AtomicInteger 新增了一些方法,使用 JDK 8 的函数式接口。不过这些函数应该是无副作用的,因为当尝试的更新时,如果由于线程争用更新失败,可能会重新应用它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 原子的将给定函数的结果更新当前值,并返回旧值。
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
// 将此运算符 applyAsInt 应用于给定的操作数 prev
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
}

// 原子的将函数的结果更新当前值,并返回新值。
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}

// 原子的将函数的结果和 x 的计算结果更新当前值,并返回新值。
public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

// 原子地根据给定函数的结果和 x 的计算结果更新当前值
public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
// 将此运算符 applyAsInt 应用于给定的操作数 prev
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
}

AtomicInteger 溢出问题

使用了 AtomicIntegerincrementAndGet 方法不断的增加,如果 AtomicInteger 增加到了2147483647 (即) 再加一,AtomicInteger 的值会变成负数 -2147483648。如果不对其作出处理,当资源数目不断累积超过最大值变成负数的时候。

比如可以在 AtomicInteger 变量达到最大值的时候,转为零重新开始计数,并保证 AtomicInteger 在多线程环境下的原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final AtomicInteger i = new AtomicInteger(0);

public final int incrementAndGet() {
int current;
int next;
do {
current = this.i.get();
next = current >= Integer.MAX_VALUE ? 0 : current + 1;
} while (!this.i.compareAndSet(current, next));

return next;
}

public final int decrementAndGet() {
int current;
int next;
do {
current = this.i.get();
next = current <= 0 ? Integer.MAX_VALUE : current - 1;
} while (!this.i.compareAndSet(current, next));

return next;
}

总结

AtomicInteger 使用非阻塞算法实现并发控制,在一些高并发程序中非常适合,但并不代表每一种场景都适合,不同场景要使用使用不同的数值类。比如 AtomicBooleanAtomicLong 等。

AtomicIntegerArrayAtomicLongArray 操作类似,不过它们对应的数组,操作的是数组中的某个元素,通过索引对值进行操作。

其他 Atomic 类

AtomicReference

可以原子更新的对象引用。和 AtomicInteger 类似,少了一些数字类型的“加”、”减”操作,支持泛型,具体含义和方法名息息相关,具体实现和 AtomicInteger 也类似。AtomicReferenceArray 则和 AtomicIntegerArray 类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class AtomicReference<V> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile V value;

// 根据 initialValue 创建一个 AtomicReference 对象
public AtomicReference(V initialValue) {
value = initialValue;
}

/**
* Creates a new AtomicReference with null initial value.
*/
// 创建一个空 AtomicReference 对象,value 为 null
public AtomicReference() {
}
}

方法如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final V get();

public final void set(V newValue);

public final void lazySet(V newValue);

public final boolean compareAndSet(V expect, V update);

public final boolean weakCompareAndSet(V expect, V update);

@SuppressWarnings("unchecked")
public final V getAndSet(V newValue);

public final V getAndUpdate(UnaryOperator<V> updateFunction);

public final V updateAndGet(UnaryOperator<V> updateFunction);

public final V getAndAccumulate(V x, BinaryOperator<V> accumulatorFunction);

public final V accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction);

public String toString();

AtomicIntegerFieldUpdater

基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。此类用于原子数据结构,该结构中同一节点的几个字段都独立受原子更新控制。此类中 compareAndSet方法的保证弱于其他原子类中该方法的保证。因为此类不能确保所有使用的字段都适合于原子访问目的,所以对于相同更新器上的 compareAndSetset 的其他调用,它仅可以保证原子性和可变语义。

有如下限制:

  1. 操作的 int 字段必须是基本类型数据,用volatile修饰,不能是包装类型,int、long就可以,但是不可以是Integer和Long;
  2. 必须是实例变量,不可以是类变量;
  3. 必须是可变的变量,不能是final修饰的变量。

示例如下:

定义一个 Person 类,有一个 id 字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Person {
volatile int id;

public Person(int id) {
this.id = id;
}

public void setId(int id) {
this.id = id;
}

public int getId() {
return id;
}
}

修改 id 字段

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
// 指定 Person 对象和 id 字段
AtomicIntegerFieldUpdater<Person> personAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "id");
Person person = new Person(99999);
personAtomicIntegerFieldUpdater.compareAndSet(person, 99999, 99999);
System.out.println("id=" + person.getId());
Person person1 = new Person(99999);
personAtomicIntegerFieldUpdater.incrementAndGet(person1);
System.out.println("id=" + person1.getId());
IntBinaryOperator intBinaryOperator = (left, right) -> left+right;
personAtomicIntegerFieldUpdater.getAndAccumulate(person1, 1, intBinaryOperator);
System.out.println("id=" + person1.getId());
}

从示例来看,具体操作和 AtomicInteger 操作起来其实和是一样的。只不过操作之前需要指定对哪个对象的哪个字段进行操作。

AtomicStampedReference

在运用 CAS 做锁释放操作中有一个经典的 ABA问题:

在线程1准备用 CAS 将变量的值由A替换为B之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行 CAS 时发现变量的值仍然为A,所以 CAS 成功。但实际上这时的现场已经和最初不同了,尽管 CAS 成功,但可能存在潜藏的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class AtomicStampedReference<V> {
// 内部 Pair 类,一个[reference, stamp]元组 。
private static class Pair<T> {
final T reference;
// 版本戳
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

private volatile Pair<V> pair;

// 根据初始 initialRef 和 initialStamp 创建一个 AtomicStampedReference 对象
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
}

各种乐观锁的实现中通常都会用版本戳 version 来对记录或对象标记,避免并发操作带来的问题,在 Java 中,AtomicStampedReference<V>也实现了这个作用,它通过包装 [E,Integer] 的元组来对对象标记版本戳 stamp,从而避免 ABA 问题。

AtomicMarkableReferenceAtomicStampedReference ,不过前者版本戳 final int stamp 变成了 final boolean mark