CAS(compare and swap)
1 2 3 public final boolean compareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); }
CAS 有效地说明了“我认为位置valueOffset
放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”在 Java 中,sun.misc.Unsafe类提供了硬件级别的原子操作来实现这个 CAS,java.util.concurrent包下的大量类都使用了这个Unsafe类的 CAS 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class AtomicInteger extends Number implements java .io.Serializable { private static final long serialVersionUID = 6214790243416807050L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private volatile int value; public AtomicInteger (int initialValue) { value = initialValue; } public AtomicInteger () { } public final int get () { return value; } public final void set (int newValue) { value = newValue; } public final int getAndSet (int newValue) { for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } } public final boolean compareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); } public final int getAndIncrement () { for (;;) { int current = get(); int next = current + 1 ; if (compareAndSet(current, next)) return current; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Counter { private int count; public Counter () {} public int getCount () { return count; } public void increase () { count++; } } public class Counter { private int count; public Counter () {} public synchronized int getCount () { return count; } public synchronized void increase () { count++; } }
1 2 3 4 5 6 7 8 9 10 11 public class Counter { private AtomicInteger count = new AtomicInteger (); public Counter () {} public int getCount () { return count.get(); } public void increase () { count.getAndIncrement(); } }
CAS的三大缺点 ABA问题
因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号,在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A-2B-3A。
从 Java 1.5 开始 JDK 的atomic包里提供了一个类AtomicStampedReference来解决 ABA 问题。这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
CAS 自旋如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,一是它可以延迟流水线执行指令,使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;二是它可以避免在退出循环的时候因内存顺序冲突而引起 CPU 流水线被清空,从而提高 CPU 的执行效率。
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就需要用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用 CAS 来操作ij。从 Java 1.5 开始 JDK 提供了AtomicReference类来保证引用对象之间的原子性,我们可以把多个变量放在一个对象里来进行 CAS 操作。
原子类 AtomicReference原子性引用 AtomicReference
AtomicReference类提供了一个可以原子读写的对象引用变量。 原子意味着尝试更改相同AtomicReference的多个线程(例如,使用比较和交换操作)不会使AtomicReference最终达到不一致的状态。 AtomicReference甚至有一个先进的compareAndSet()方法,它可以将引用与预期值(引用)进行比较,如果它们相等,则在AtomicReference对象内设置一个新的引用。
AtomicStampReference 安全的修改一个变量的值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.keytech.task;import org.junit.platform.commons.logging.LoggerFactory;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;public class AtomicIntegerTest { private static AtomicReference<Integer> count=new AtomicReference <>(0 ); public static void main (String[] args) { count.compareAndSet(0 , 2 ); count.compareAndSet(1 , 4 ); count.compareAndSet(2 , 8 ); System.out.println(count.get()); } }
是一个自定义的对象,线程安全? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class AtomicReference <V> implements java .io.Serializable { private static final long serialVersionUID = -1848883965231344442L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicReference.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } private volatile V value; public AtomicReference (V initialValue) { value = initialValue; } public AtomicReference () { } public final V get () { return value; } public final void set (V newValue) { value = newValue; } public final boolean compareAndSet (V expect, V update) { return unsafe.compareAndSwapObject(this , valueOffset, expect, update); } @SuppressWarnings("unchecked") public final V getAndSet (V newValue) { return (V)unsafe.getAndSetObject(this , valueOffset, newValue); } }
AtomicReference 所提供的某些方法可以进行原子性操作,如compareAndSet、getAndSet,这仅仅是对引用进行原子性操作
AtomicReference 不能保证对象中若存在属性值修改是线程安全的,如假设引用对象是person,修改person中name和age,多个线程同时从引用中获得对象,并进行修改,会出现线程不安全情况。下面我们通过代码来验证一下这条结论。
不安全的修改自定义对象属性的值1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package com.keytech.task;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicReference;public class AtomicReferenceTest { private static Integer clientTotal=5000 ; private static Integer threadTotal=200 ; private static Rumenz rumenz=new Rumenz (0 ,0 ); private static AtomicReference<Rumenz> rumenzReference=new AtomicReference <>(rumenz); public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore=new Semaphore (threadTotal); for (int i = 0 ; i < clientTotal; i++) { final Integer n=i; executorService.execute(()->{ try { semaphore.acquire(); update(n); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } }); } executorService.shutdown(); System.out.println("rumenzReference=" +rumenzReference.get().getAge()); System.out.println("rumenzReference=" +rumenzReference.get().getName()); } private static void update (int i) { rumenzReference.get().setAge(rumenzReference.get().getAge()+i); rumenzReference.get().setName(rumenzReference.get().getName()+i); } } class Rumenz { private Integer age; private Integer name; public Rumenz (Integer age, Integer name) { this .age = age; this .name = name; } public Integer getAge () { return age; } public void setAge (Integer age) { this .age = age; } public Integer getName () { return name; } public void setName (Integer name) { this .name = name; } }
安全的修改自定义对象 atomic
AtomicIntegerFieldUpdater 通过CAS修改变量值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.keytech.task;import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;public class AtomicIntegerFieldUpdaterTest { private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> upCount=AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class,"count" ); public int getCount () { return count; } public volatile int count=100 ; public static void main (String[] args) { AtomicIntegerFieldUpdaterTest obj=new AtomicIntegerFieldUpdaterTest (); if (upCount.compareAndSet(obj,100 ,200 )){ System.out.println("修改成功" +obj.getCount()); } if (upCount.compareAndSet(obj,100 ,200 )){ System.out.println("修改成功" ); }else { System.out.println("修改失败" ); } } }
源码分析1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public abstract class AtomicIntegerFieldUpdater <T> { @CallerSensitive public static <U> AtomicIntegerFieldUpdater<U> newUpdater (Class<U> tclass, String fieldName) { return new AtomicIntegerFieldUpdaterImpl <U> (tclass, fieldName, Reflection.getCallerClass()); } public int getAndSet (T obj, int newValue) { int prev; do { prev = get(obj); } while (!compareAndSet(obj, prev, newValue)); return prev; } private static class AtomicIntegerFieldUpdaterImpl <T> extends AtomicIntegerFieldUpdater <T> { private static final Unsafe unsafe = Unsafe.getUnsafe(); private final long offset; private final Class<T> tclass; private final Class<?> cclass; AtomicIntegerFieldUpdaterImpl(final Class<T> tclass, final String fieldName, final Class<?> caller) { final Field field; final int modifiers; try { field = AccessController.doPrivileged( new PrivilegedExceptionAction <Field>() { public Field run () throws NoSuchFieldException { return tclass.getDeclaredField(fieldName); } }); modifiers = field.getModifiers(); sun.reflect.misc.ReflectUtil.ensureMemberAccess( caller, tclass, null , modifiers); ClassLoader cl = tclass.getClassLoader(); ClassLoader ccl = caller.getClassLoader(); if ((ccl != null ) && (ccl != cl) && ((cl == null ) || !isAncestor(cl, ccl))) { sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass); } } catch (PrivilegedActionException pae) { throw new RuntimeException (pae.getException()); } catch (Exception ex) { throw new RuntimeException (ex); } Class<?> fieldt = field.getType(); if (fieldt != int .class) throw new IllegalArgumentException ("Must be integer type" ); if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException ("Must be volatile type" ); this .cclass = (Modifier.isProtected(modifiers) && caller != tclass) ? caller : null ; this .tclass = tclass; offset = unsafe.objectFieldOffset(field); } } }
AtomicIntegerFieldUpdater线程安全的更新自定义对象的属性值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 package com.keytech.task;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;public class AtomicReferenceTest { private static Integer clientTotal=5000 ; private static Integer threadTotal=200 ; public static Rumenz rumenz=new Rumenz (0 ,0 ); public static AtomicIntegerFieldUpdater<Rumenz> rumenzReferenceAge= AtomicIntegerFieldUpdater.newUpdater(Rumenz.class,"age" ); private static AtomicIntegerFieldUpdater<Rumenz> rumenzReferenceName= AtomicIntegerFieldUpdater.newUpdater(Rumenz.class,"name" ); public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore=new Semaphore (threadTotal); for (int i = 0 ; i < clientTotal; i++) { final Integer n=i; executorService.execute(()->{ try { semaphore.acquire(); update(n); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } }); } executorService.shutdown(); System.out.println("rumenzReference=" + rumenz.getAge()); System.out.println("rumenzReference=" + rumenz.getName()); } public static void update (int i) { rumenzReferenceAge.incrementAndGet(rumenz); rumenzReferenceName.incrementAndGet(rumenz); } } class Rumenz { public volatile int age; public volatile int name; public Rumenz (Integer age, Integer name) { this .age = age; this .name = name; } public Integer getAge () { return age; } public void setAge (Integer age) { this .age = age; } public Integer getName () { return name; } public void setName (Integer name) { this .name = name; } }
线程安全之原子性Atomic(AtomicInteger|LongAdder|AtomicLong) 线程安全性
,AtomicXXX : CAS、Unsafe.compareAndSwapXXX
CAS(Compare and swap)
线程安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.keytech.task;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.LongAdder;public class AtomicTest { private static Integer clientTotal=5000 ; private static Integer threadTotal=200 ; private static AtomicInteger count=new AtomicInteger (0 ); public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore= new Semaphore (threadTotal); for (int i = 0 ; i <clientTotal ; i++) { executorService.execute(()->{ try { semaphore.acquire(); update(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } }); } executorService.shutdown(); System.out.println("count:" +count); } private static void update () { count.incrementAndGet(); } }
getAndAddInt源码 1 2 3 4 5 6 7 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
compareAndSwapInt(this, stateOffset, expect, update)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.keytech.task;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.LongAdder;public class LongAddrTest { private static Integer clientTotal=5000 ; private static Integer threadTotal=200 ; private static LongAdder count=new LongAdder (); public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore=new Semaphore (threadTotal); for (int i = 0 ; i < clientTotal; i++) { try { semaphore.acquire(); update(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } } executorService.shutdown(); System.out.println("count" +count); } private static void update () { count.increment(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.keytech.task;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicLong;public class AtomicLongTest { private static Integer clientTotal=5000 ; private static Integer threadTotal=200 ; private static AtomicLong count=new AtomicLong (); public static void main (String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore=new Semaphore (threadTotal); for (int i = 0 ; i < clientTotal; i++) { try { semaphore.tryAcquire(); update(); semaphore.release(); }catch (Exception e){ e.printStackTrace(); } } executorService.shutdown(); System.out.println("count" +count); } private static void update () { count.incrementAndGet(); } }