JUC并发编程第四章(共享模型之无锁)


JUC并发编程 共享模型之无锁

1. 问题提出

现在有如下代码:

interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        //将所有线程收集到集合中,统一来启动
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                //这里主线程需要等待所有的其他线程执行完毕才往下继续执行
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

class AccountUnsafe implements Account {
    private Integer balance;
    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }
    @Override
    public Integer getBalance() {
        return balance;
    }
    @Override
    public void withdraw(Integer amount) {
        balance -= amount;
    }
}

public class JavaTestApplication {

    public static void main(String[] args)  {
        Account account = new AccountUnsafe(10000);
        Account.demo(account);
    }

}

执行结果:

Connected to the target VM, address: '127.0.0.1:58176', transport: 'socket'
590 cost: 95 ms
Disconnected from the target VM, address: '127.0.0.1:58176', transport: 'socket'

Process finished with exit code 0

可见结果并不对,分析不安全之处。

1.1 为什么不安全

仔细分析,你会发现,不安全之处其实就是操作共享变量balance的方法withdraw

public void withdraw(Integer amount) {
    balance -= amount;
}

1.2 解决思路——加锁

结合之前的所学,可以很自然的想到可以给操作共享变量的地方加锁。

首先想到的是给 Account 对象加锁,这里加锁有多种方式:

  1. 使用ReentrantLock

    private final ReentrantLock lock = new ReentrantLock();
    @Override
    public void withdraw(Integer amount) {
        while (true){
            if (lock.tryLock()){
                try {
                    balance -= balance;
                } finally {
                    lock.unlock();
                }
                break;
            }
            //如果未获取到锁则请求重试
        }
    }
    

    结果正确。

  2. 使用synchronized

    @Override
    public void withdraw(Integer amount) {
        synchronized (this){
            balance -= amount;
        }
    }
    

    结果正确。

1.3 解决思路——无锁

class AccountSafe implements Account {
    private AtomicInteger balance; //注意这个变量的类型
    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }
    @Override
    public Integer getBalance() {
        return balance.get();
    }
    @Override
    public void withdraw(Integer amount) {
        while (true) {//如果有其他线程来改变了balance,那么我就重试
            //记录改变之前的余额
            int prev = balance.get();
            //记录改变之后的余额(此时还未真正改变)
            int next = prev - amount;
            //真正改变余额
            if (balance.compareAndSet(prev, next)) {//compareAndSet => cas机制
                break;
            }
        }
    }
}

public static void main(String[] args) {
    Account.demo(new AccountSafe(10000));
}

执行结果:

Connected to the target VM, address: '127.0.0.1:55371', transport: 'socket'
0 cost: 39 ms
Disconnected from the target VM, address: '127.0.0.1:55371', transport: 'socket'

Process finished with exit code 0

可以看见此时执行比加锁的更快。

2. CAS 与 volatile

2.1 CAS

前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

当你使用 AtomicInteger 来管理 balance 变量时,它会通过CAS(比较与交换)操作来确保线程安全。CAS是一种原子操作,用于在多线程环境中进行并发控制,它有以下关键特点:

  1. 原子性:CAS操作是原子的,即它要么完全执行,要么不执行,没有中间状态。这确保了多个线程并发执行CAS操作时不会导致数据不一致或竞争条件。

  2. 比较和交换:CAS操作涉及比较当前值预期值,如果它们相等,就将新值写入变量。如果当前值与预期值不相等,CAS操作不会修改变量的值。

在你的代码中,AtomicIntegercompareAndSet 方法用于执行CAS操作。具体来说,以下是你的 withdraw 方法的执行过程:

@Override
public void withdraw(Integer amount) {
    while (true) {
        // 记录改变之前的余额
        int prev = balance.get();
        // 计算改变之后的余额(此时还未真正改变)
        int next = prev - amount;
        // 真正改变余额,但只有当当前余额等于prev时才会成功
        /**
         * compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
         * - 不一致了,next 作废,返回 false 表示失败
         * 比如,别的线程已经做了减法,当前值已经被减成了 990
         * 那么本线程的这次 990 就作废了,进入 while 下次循环重试
         * - 一致,以 next 设置为新值,返回 true 表示成功
         */
        if (balance.compareAndSet(prev, next)) {
            break;
        }
    }
}

在这个方法中,每个线程会不断尝试执行CAS操作,首先读取当前的 balance 值(prev),然后计算出新的 balance 值(next),接着使用 compareAndSet 尝试将新值写入 balance。如果当前的 balanceprev 不相等(即其他线程已经修改了 balance),CAS操作会失败,线程将继续循环重试。只有当CAS操作成功(表示没有其他线程在这期间修改了 balance),线程才会跳出循环。

这种方式保证了只有一个线程能够成功减少 balance,并且避免了竞争条件,因此不需要显式的锁定机制。这是一种高效的多线程并发控制方式,特别适用于需要在不加锁的情况下确保线程安全的场景。

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

sequenceDiagram
participant t1 as 线程1
participant a as Account对象
participant t2 as 线程2
t1->>a: 获取余额 100
t1->>t1: 余额-10=90
t2-->>a: 已经修改为90了
t1->>a: compareAndSet(100,90)——失败
t1->>a: 获取余额 90
t1->>t1: 余额-10=80
t2-->>a: 已经修改为80了
t1->>a: compareAndSet(90,80)——失败
t1->>a: 获取余额 80
t1->>t1: 余额-10=70
t1->>a: compareAndSet(80,70)——成功

对于上图的顺序图:

线程1开始获取余额为100,然后计算余额改变之后的值为90(此时余额并未真正改变),当线程1执行compareAndSet(100,90)之前,线程2先一步将余额改为了90,此时,当线程1执行compareAndSet(100,90)的时候,发现与自己读取到的余额不一致,就会导致cas失败,其他的以此内推。

2.2 volatile

对于AtomicInteger类,它里面的数值具体是存在value变量中:

volatile修饰了value

获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。这就是为什么这里使用原子类的一个原因

它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。

注意:

volatile 仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)

并且对于使用compareAndSet,CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果。

为什么无锁效率高:

无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:

  • 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS特点:

CAS(Compare And Swap)是一种多线程编程中常用的同步机制,它的特点在于可以实现无锁的并发操作,用于解决多线程环境下的数据竞争问题。在Java中,CAS是通过java.util.concurrent.atomic包中的原子类来实现的,这些原子类提供了一种无锁的方式来进行原子操作。

以下是Java中CAS的主要特点:

  1. 原子性操作:CAS操作是原子性的,即要么成功执行,要么不执行,不存在中间状态。这可以确保在多线程环境下对共享变量的操作是线程安全的。

  2. 无锁机制:CAS操作不需要使用传统的锁来保护共享资源,因此避免了锁带来的性能开销和潜在的死锁问题。它使用了底层硬件支持来实现原子性操作。

  3. 自旋重试:如果CAS操作失败(即其他线程已经修改了共享变量),则CAS会一直尝试重试,直到成功或者达到某个重试次数的上限。这避免了线程进入阻塞状态,但也可能导致一定的性能开销。

  4. 内存模型:CAS操作是基于内存模型的,它需要确保线程之间的可见性和内存操作的有序性。这意味着CAS操作可以用来实现一些高级的并发模式,如非阻塞算法和无锁数据结构。

  5. ABA问题:CAS操作可能存在ABA问题,即在CAS操作期间,共享变量的值从A变为B,然后再变回A。为了解决ABA问题,Java提供了AtomicStampedReferenceAtomicMarkableReference等原子类,它们可以通过版本号或标记来检测变量是否发生了ABA操作。

  6. 适用范围:CAS适用于一些简单的原子操作,如增加、减少、设置等操作。对于复杂的操作,CAS可能不够高效或无法满足需求。

总的来说,CAS是一种高效的并发控制机制,适用于一些简单的原子操作,并且可以避免传统锁所带来的性能开销。然而,开发人员需要注意处理CAS操作失败的情况,以及潜在的ABA问题。在选择是否使用CAS时,需要根据具体的场景和需求进行权衡。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。

  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。

  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思

    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

    适用于线程数少、多核 CPU 的场景下。如果线程大于你的CPU核数的时候,CAS的效率也不一定会好于synchronized

Java中也提供了一些实现CAS的工具类,如下节所示:3-6

3. 原子整数

对于原子类的操作,所有的原子类再调用方法的时候,如果发现值以及被改变了,那么就会导致操作失败。

java.util.concurrent.atomic并发包提供了:

  • AtomicBoolean
  • AtomicBoolean
  • AtomicLong

AtomicInteger 的加减法为例:

AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());

// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());

// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());

// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());

// 获取并加值(i = 0, 结果 i = 5, 返回 0)类似于先获取i的值,之后再i += 5 
System.out.println(i.getAndAdd(5));

// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));

以上方法只能够完成简单的加法减法,如果要完成复杂一点的操作,就要使用到update相关的方法了。

对于上面的任意一个方法,如果再执行的时候,发现值已经被其他的线程改变了,那么就会本线程操作导致失败。

这样对于之前的代码:

@Override
public void withdraw(Integer amount) {
    while (true) {
        //记录改变之前的余额
        int prev = balance.get();
        //记录改变之后的余额(此时还未真正改变)
        int next = prev - amount;
        //真正改变余额
        /**
         * compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
         * - 不一致了,next 作废,返回 false 表示失败
         * 比如,别的线程已经做了减法,当前值已经被减成了 990
         * 那么本线程的这次 990 就作废了,进入 while 下次循环重试
         * - 一致,以 next 设置为新值,返回 true 表示成功
         */
        if (balance.compareAndSet(prev, next)) {//compareAndSet => cas机制
            break;
        }
    }
}

就不用在使用while(true)来保证原子性了,可以改为如下代码:

@Override
public void withdraw(Integer amount) {
    balance.getAndAdd(-1 * amount);
}

结果也是完全正确的。

AtomicInteger 的复杂操作为例:

public static void main(String[] args) {
    AtomicInteger i = new AtomicInteger(5);
    i.updateAndGet(value->value*2);//可以做乘法
    System.out.println(i);
}

update相关的方法也是原子性,并且也再执行的时候,发现值已经被其他的线程改变了,那么就会本线程操作导致失败。

对于他的底层原理实现,其实很像我们之前使用的while(true)的那种方式:

update相关原理

拓展:

// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));

// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

4. 原子引用

为什么需要原子引用类型?简单来说就是我们要保护的一些数据,并不一定是基本数据类型,有可能是包装类,也有可能是其他类型

原子引用类型是Java中提供的一种高级并发编程工具,用于解决一些特定的多线程编程问题,其中最常见的问题是ABA问题并发数据结构的设计。以下是一些需要原子引用类型的原因:

  1. ABA问题的解决:ABA问题是一种常见的并发问题,它发生在一个线程通过CAS操作将共享变量的值从A变为B,然后再变回A,而另一个线程可能会误以为共享变量没有发生变化。原子引用类型如AtomicStampedReferenceAtomicMarkableReference引入了版本号或标记,使得CAS操作不仅仅关注值的变化,还关注变化的上下文,从而解决了ABA问题。

  2. 并发数据结构的设计:在构建高性能的并发数据结构时,原子引用类型可以帮助确保数据结构的线程安全性。例如,原子引用可以用于实现无锁的链表、队列、堆栈等数据结构,从而提高并发性能。

  3. 状态标记:有些情况下,需要在共享对象上附加状态信息,以便进行特定的操作或检查。原子引用类型可以用于存储附加的状态信息,而不需要引入额外的锁或同步机制。

  4. 链接或映射关系:原子引用类型可以用于建立链接或映射关系,例如,将一个对象链接到另一个对象,而这个链接的建立需要原子性保证,以避免并发问题。

  5. 高级的并发模式:一些高级的并发模式,如无锁算法、自旋锁、乐观锁等,通常需要原子引用类型来实现。这些模式可以提高并发性能并减少锁竞争。

总之,原子引用类型提供了一种更高级的并发控制机制,用于解决特定的并发问题和设计高性能的并发数据结构。它们可以帮助开发人员更精细地控制共享数据的访问,同时减少了锁带来的性能开销和复杂性。然而,在使用原子引用类型时,开发人员需要谨慎考虑并发问题和内存模型,以确保正确性和性能。

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

有如下方法:

public class JavaTestApplication {

    public static void main(String[] args) {
        DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal(10000)));
    }

}

class DecimalAccountSafeCas implements DecimalAccount {
    AtomicReference<BigDecimal> ref; //改用原子引用,也能够完成
    public DecimalAccountSafeCas(BigDecimal balance) {
        ref = new AtomicReference<>(balance);
    }
    @Override
    public BigDecimal getBalance() {
        return ref.get();
    }
    @Override
    public void withdraw(BigDecimal amount) {
        while (true) {
            BigDecimal prev = ref.get();
            BigDecimal next = prev.subtract(amount);
            if (ref.compareAndSet(prev, next)) {
                break;
            }
        }
    }
}

interface DecimalAccount {
    // 获取余额
    BigDecimal getBalance();
    // 取款
    void withdraw(BigDecimal amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(DecimalAccount account) {
        List<Thread> ts = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(BigDecimal.TEN);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(account.getBalance());
    }
}

4.1 ABA问题

下面有这么一端代码:

static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    String prev = ref.get();
    other();
    Thread.sleep(1000);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() {
    new Thread(() -> {
        log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
    }, "t1").start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
}

执行结果:

17:11:13.464 [t1] DEBUG com.example.javatest.JavaTestApplication - change A->B true
17:11:13.966 [t2] DEBUG com.example.javatest.JavaTestApplication - change B->A true
17:11:14.967 [main] DEBUG com.example.javatest.JavaTestApplication - change A->C true

可以看见,当主线程想要将变量A修改为C的时候,首先会获取最新的值prev(此时为A),然后调用other方法,主线程阻塞,在other方法中,有两个线程顺序执行,先是t1将A改为B,然后线程t2将B又改为A,此时1s后主线程苏醒之后,当执行ref.compareAndSet(prev, “C”)方法的时候,发现prev仍然为A,并未改变,此时修改成功。

此时里面其实就存在一个隐患:并不能监控其他线程是否在中途修改过值,然后又改回员状态。

主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:

只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

4.2 AtomicStampedReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
    log.debug("main start...");
    // 获取值 A
    // 这个共享变量被它线程修改过?
    String prev = ref.getReference();
    //获取版本号
    int stamp = ref.getStamp();
    log.debug("版本 {}", stamp);
    other();
    Thread.sleep(1000);
    // 尝试改为 C
    log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1));
}
private static void other() {
    new Thread(() -> {
        //获取版本号
        int stamp = ref.getStamp();
        log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t1").start();
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    new Thread(() -> {
        //获取版本号
        int stamp = ref.getStamp();
        log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1));
        log.debug("更新版本为 {}", ref.getStamp());
    }, "t2").start();
}

执行结果:

18:04:47.038 [main] DEBUG com.example.javatest.JavaTestApplication - main start...
18:04:47.039 [main] DEBUG com.example.javatest.JavaTestApplication - 版本 0
18:04:47.040 [t1] DEBUG com.example.javatest.JavaTestApplication - change A->B true
18:04:47.040 [t1] DEBUG com.example.javatest.JavaTestApplication - 更新版本为 1
18:04:47.553 [t2] DEBUG com.example.javatest.JavaTestApplication - change B->A true
18:04:47.553 [t2] DEBUG com.example.javatest.JavaTestApplication - 更新版本为 2
18:04:48.557 [main] DEBUG com.example.javatest.JavaTestApplication - change A->C false

AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A ->C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。

但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference

结合AtomicMarkableReference实现如下流程:

graph TD
s(保洁阿姨)
m(主人)
g1(垃圾袋)
g2(新垃圾袋)
m--检查-->g1
s-.倒空.->g1
g1--还空-->g1
g1--已满-->g2

现在有两个线程,线程1(主人)会检查垃圾袋,如果垃圾袋已经满了,那么就要更换新的垃圾袋,如果垃圾袋还是空的,那么就继续使用这个垃圾袋,这时候还有个线程2(保洁阿姨),主要就是倒空垃圾袋里面的垃圾

4.3 AtomicMarkableReference

static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
    GarbageBag bag = new GarbageBag("装满了垃圾");
    // 参数2 mark 可以看作一个标记,表示垃圾袋满了
    AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
    log.debug("主线程 start...");
    GarbageBag prev = ref.getReference();
    log.debug(prev.toString());
    new Thread(() -> {
        log.debug("打扫卫生的线程 start...");
        bag.setDesc("空垃圾袋");
        //这里的旧值和新值相同,只是改变了里面的属性
        while (!ref.compareAndSet(bag, bag, true, false)) {}
        log.debug(bag.toString());
    }).start();
    Thread.sleep(1000);

    log.debug("主线程想换一只新垃圾袋?");
    boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
    log.debug("换了么?" + success);
    log.debug(ref.getReference().toString());
}

执行:

18:22:57.591 [main] DEBUG com.example.javatest.JavaTestApplication - 主线程 start...
18:22:57.592 [main] DEBUG com.example.javatest.JavaTestApplication - com.example.javatest.GarbageBag@2db0f6b2 装满了垃圾
18:22:57.592 [Thread-1] DEBUG com.example.javatest.JavaTestApplication - 打扫卫生的线程 start...
18:22:57.592 [Thread-1] DEBUG com.example.javatest.JavaTestApplication - com.example.javatest.GarbageBag@2db0f6b2 空垃圾袋
18:22:58.597 [main] DEBUG com.example.javatest.JavaTestApplication - 主线程想换一只新垃圾袋?
18:22:58.597 [main] DEBUG com.example.javatest.JavaTestApplication - 换了么?false
18:22:58.597 [main] DEBUG com.example.javatest.JavaTestApplication - com.example.javatest.GarbageBag@2db0f6b2 空垃圾袋

5. 原子数组

上一节我们了解了原子引用,它其实是改变的对象本身的引用,但是有时候我们不想改变对象本身的引用,而是只想改变它内部的值,就比如数组,我们不想改变数组的引用,只是想改变数组里面元素的一些值,这时候就不能用之前的原子引用了,只能够使用原子数组。

在Java中,原子数组是一种特殊的数据结构,用于解决多线程并发访问共享数据时可能出现的竞态条件和数据不一致性问题。原子数组提供了一种能够确保多个线程同时访问数组元素时操作的原子性的机制。下面详细解释为什么需要Java中的原子数组:

  1. 并发控制:在多线程应用程序中,多个线程可能同时访问和修改共享的数据结构,如数组。如果没有适当的同步机制,这可能导致竞态条件,即多个线程尝试同时修改相同的数据,导致数据不一致性和错误的结果。原子数组提供了一种方式来确保数组操作是原子性的,即它们要么全部成功,要么全部失败,从而避免了竞态条件。

  2. 性能优化:原子数组通常比使用锁来同步数组访问的方式性能更好。使用锁会引入竞争和上下文切换,而原子数组可以在硬件级别上实现原子性操作,减少了竞争和上下文切换的开销,从而提高了性能。

  3. 避免死锁:使用锁来同步数组访问可能导致死锁问题,特别是在复杂的多线程应用中。原子数组不需要锁,因此可以避免死锁问题,使代码更加可靠。

  4. 简化编程:使用原子数组可以简化编程,因为开发人员不需要手动管理锁和同步。这使得多线程编程更容易,降低了出错的可能性。

  5. 保持数据一致性:原子数组确保对数组元素的修改是原子性的,这意味着任何时刻都可以看到一个完全一致的数组状态。这对于需要确保数据一致性的应用程序非常重要,例如金融交易系统。

  6. 支持高级操作:原子数组通常支持一些高级操作,如CAS(Compare-and-Swap)操作,允许您在满足特定条件时原子地更新数组元素。这些操作对于某些并发算法和数据结构非常有用。

  • AtomicIntegerArray:要保护数组内的元素是Integer类型的
  • AtomicLongArray:要保护数组内的元素是Long类型的
  • AtomicReferenceArray:要保护数组内的元素是引用类型的

以下是一个简单的示例代码,演示了如何使用Java中的AtomicIntegerArray来实现一个多线程应用程序,其中多个线程同时增加一个整数数组中的元素的值。这个示例展示了如何使用原子数组来确保线程安全性。

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicArrayExample {
    public static void main(String[] args) {
        final int arraySize = 5;
        AtomicIntegerArray atomicArray = new AtomicIntegerArray(arraySize);
        
        // 创建多个线程,每个线程增加数组元素的值
        Thread[] threads = new Thread[arraySize];
        
        for (int i = 0; i < arraySize; i++) {
            final int index = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicArray.getAndIncrement(index);
                }
            });
            threads[i].start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        // 打印数组的最终状态
        for (int i = 0; i < arraySize; i++) {
            System.out.println("Element " + i + ": " + atomicArray.get(i));
        }
    }
}

在这个示例中,我们首先创建了一个包含5个元素的AtomicIntegerArray。然后,我们创建了多个线程,每个线程都会增加数组中的一个元素的值1000次。由于我们使用AtomicIntegerArray,每个增加操作都是原子性的,因此不会出现竞态条件或数据不一致性问题。

最后,我们等待所有线程完成,并打印数组的最终状态。由于原子数组的使用,最终数组中的每个元素都会正确地增加到5000。这个示例演示了如何使用原子数组来确保多线程环境下的数据安全性。

6. 字段更新器

字段更新器主要是对成员变量的修改,也就是当有多个线程来操作某个对象的成员变量(属性)的时候,为了保证操作的安全,可以使用字段更新器来完成。

在Java中,字段更新器(Field Updater)是一种用于原子性更新类的字段的工具。它们通常用于具有多个线程同时访问和修改字段的情况,以确保线程安全性和避免竞态条件。字段更新器可以用于原子地更新类的字段,而无需使用锁或其他同步机制。

Java中有三种主要类型的字段更新器:

  1. **AtomicIntegerFieldUpdater**:用于原子性地更新int类型字段。
  2. **AtomicLongFieldUpdater**:用于原子性地更新long类型字段。
  3. AtomicReferenceFieldUpdater :用于原子性地更新引用类型字段

这些字段更新器是使用反射机制来操作类的字段的,并提供了一种更高效的方式来进行原子性操作,而不需要锁

以下是一个简单的示例,演示了如何使用AtomicIntegerFieldUpdater来原子性地更新一个类的int字段:

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class FieldUpdaterExample {
    public static class MyClass {
        // 要更新的字段必须是volatile类型的,并且不能是static的
        public volatile int myField;
    }

    public static void main(String[] args) {
        MyClass myObject = new MyClass();
        
        // 创建字段更新器,指定要更新的字段和所属类
        AtomicIntegerFieldUpdater<MyClass> updater = AtomicIntegerFieldUpdater.newUpdater(MyClass.class, "myField");
        
        // 原子性地增加字段的值
        updater.incrementAndGet(myObject);
        
        // 打印更新后的字段值
        System.out.println("Updated field value: " + myObject.myField);
    }
}

需要注意的是,使用字段更新器来原子性地更新字段有一些限制和要求:

  • 要更新的字段必须volatile类型的,以确保可见性。否则会出现异常:java.lang.IllegalArgumentException
  • 字段不能static的,必须是实例字段。
  • 字段的访问级别必须足够,以便字段更新器可以访问它。

举例:

private volatile int field;

public static void main(String[] args) {
    /**
     * newUpdater有两个参数
     * 参数一:表示保存字段的对象的类
     * 参数二:表示要更新的字段的名称
     */
    AtomicIntegerFieldUpdater<AtomicReferenceFieldUpdaterTest> fieldUpdate =
            AtomicIntegerFieldUpdater.newUpdater(AtomicReferenceFieldUpdaterTest.class, "field");
    AtomicReferenceFieldUpdaterTest test = new AtomicReferenceFieldUpdaterTest();
    /**
     * compareAndSet参数的含义
     * 参数一:要对哪一个类进行操作
     * 参数二:修改之前的值为多少(期望值)
     * 参数三:要修改为的值为多少(新值)
     */
    boolean isSuccess = fieldUpdate.compareAndSet(test, 0, 10);
    System.out.println("是否修改成功:"+isSuccess);
    System.out.println("修改过后field的值为:"+test.field);
}

执行结果:

是否修改成功:true
修改过后field的值为:10

如果我修改之前的值不为0:

修改失败,并且值仍然为修改之前的值

7. 原子累加器

前面我们了解了AtomicIntegerAtomicLong等原子整数,他们都能够做累加操作,为什么这里又要提出原子累加器的概念呢?

因为在Java8之后,新增了一些类,来增强累加的操作,比原始的原子整数的累加操作效率更高

就比如以前有AtomicLong来对Long类型的数据进行原子累加,但是他的效率可能并不是很好,所以后面引入了LongAdder,它可以处理(intlong)类型的数据。还有DoubleAdder,它可以处理浮点数。

性能对比:

public class AtomicReferenceFieldUpdaterTest {
    //为了查看JVM多次执行可能优化的场景,我们这里多执行几次更有说服力
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            demo(() -> new LongAdder(), adder -> adder.increment());
        }
        for (int i = 0; i < 5; i++) {
            demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
        }
    }

    /**
     *
     * @param adderSupplier ()->返回值 提供者:无中生有
     * @param action (值)->无返回值 消费者
     * @param <T>
     */
    private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
        T adder = adderSupplier.get();
        long start = System.nanoTime();
        List<Thread> ts = new ArrayList<>();
        // 40 个线程,每人累加 50 万 总共累加到20000000
        for (int i = 0; i < 40; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    action.accept(adder);
                }
            }));
        }
        ts.forEach(t -> t.start());
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start)/1000_000);
    }
}
20000000 cost:54
20000000 cost:14
20000000 cost:22
20000000 cost:10
20000000 cost:8
20000000 cost:288
20000000 cost:282
20000000 cost:282
20000000 cost:232
20000000 cost:266

可以看见,使用了原子累加器相关的类比普通的原子整数类的效率要高很多。

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,最后再把各个累加单元(Cell)汇总即可。因此减少了 CAS 重试失败,从而提高性能。

7.1 源码之 LongAdder

LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧

LongAdder 类有几个关键域:

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;

// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;

// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

不是说CAS不加锁吗?但是在关键域中当cellsBusy=1的时候又表示加锁,这是怎么回事呢?

这是因为当多个线程同时来执行累加的时候,初始时是没有cells数组的,多个线程会竞争的来创建cells数组,同理在扩容的时候也是一样,所以这里会有一个标记来判断是否加锁。

cellsBusy的类似实现:其实底部就是用的CAS机制来解决。

@Slf4j
public class AtomicReferenceFieldUpdaterTest {
    public static void main(String[] args) {
        LockCas lock = new LockCas();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }).start();
        new Thread(() -> {
            log.debug("begin...");
            lock.lock();
            try {
                log.debug("lock...");
            } finally {
                lock.unlock();
            }
        }).start();
    }
}

@Slf4j
// 不要用于实践!!!
class LockCas {
    private AtomicInteger state = new AtomicInteger(0);

    /**
     * 加锁操作
     * 如果有多个线程同时进入lock
     * 当有一个线程执行了compareAndSet之后,其他线程就会发现expect的值不一样,就会不断重试
     */
    public void lock() {
        while (true) {
            if (state.compareAndSet(0, 1)) {
                break;
            }
        }
    }

    /**
     * 解锁操作只有成功加锁的线程可以执行,其他线程都还在lock中循环(重试)
     */
    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }
}

执行结果:

10:03:13.900 [Thread-0] DEBUG com.example.javatest.AtomicReferenceFieldUpdaterTest - begin...
10:03:13.900 [Thread-1] DEBUG com.example.javatest.AtomicReferenceFieldUpdaterTest - begin...
10:03:13.901 [Thread-0] DEBUG com.example.javatest.AtomicReferenceFieldUpdaterTest - lock...
10:03:14.908 [Thread-0] DEBUG com.example.javatest.LockCas - unlock...
10:03:14.908 [Thread-1] DEBUG com.example.javatest.AtomicReferenceFieldUpdaterTest - lock...
10:03:14.908 [Thread-1] DEBUG com.example.javatest.LockCas - unlock...

可见,刚开始时,两个线程同时开始,但是只有线程0获取到了锁,线程1在线程0获取到锁的操作期间就不断的在while(true)循环,当线程0花费1秒中执行结束,并且释放了锁之后,线程1才能够获取到锁进行操作。

7.2 原理之伪共享

其中 Cell 即为累加单元,查看他的源码如下:

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
    // 省略不重要代码
}

Cell源码

可以看见上面有一个注解@sun.misc.Contended,这个注解的作用就是防止缓存行伪共享。

那么什么是缓存行?什么又是伪共享呢?

这一点得从缓存说起

CPU缓存结构

可见上图中CPU核心下面有很多缓存,读取数据的时候CPU是先读取缓存中的数据,并没有直接和内存进行交互,为什么要这样做呢?这就得说说缓存得内存的速度问题了:

缓存与内存的速度比较

从CPU到 大约需要的时钟周期
寄存器 1 cycle (4GHz 的 CPU 约为0.25ns)
L1 3~4 cycle
L2 10~20 cycle
L3 40~45 cycle
内存 120~240 cycle

可见CPU读取不同级别的缓存以及读取内存各个之间的速度差距还是很大的。

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。

缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long);但是缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效;

模拟累加单元被多个CPU核心使用的问题

前面我们说了,不同的线程需要读取不同的Cell[i]单元,所以,假设线程1对应左边的CPU核心,线程2对应右边的CPU核心,现在线程1读取Cell[0],线程2读取Cell[1],但是前面又提到,CPU不会一开始就和内存交互,而是先和缓存交互,所以会将Cell加载到缓存行中,但是一个缓存行是否可以将上图中的两个Cell给放下呢

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。这样问题来了:

  • Core-0 要修改 Cell[0]
  • Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000,要累加Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效。

那么如何解决上面的问题呢?其实放在不同的缓存行就可以了,一行的失效不会影响到另外一行。

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效。

伪共享其实就是一个缓存行加入了多个Cell。

本章后面还有点源码,暂时未记录笔记,后续会补上。


文章作者: 念心卓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 念心卓 !
  目录