0%

并发基础及锁

[TOC]

并发

当并发的访问某个类或者方法时,主调代码中不需要额外的同步或协同,类或者方法都能表现出正确的行为,这就是线程安全的。并发安全必须保证以下三要素

并发编程的三要素(并发安全性)

  • 原子性: 一系列操作要么全部执行要么失败(一些cpu指令全部执行成功,或全部失败。最好就是只有一条cpu指令)或者也叫互斥性,互斥性有两种实现方式,锁或原子操作
  • 有序性 程序按照代码的顺序先后执行 避免指令重排列。
  • 可见性 当多线程访问同一个变量 其中一个线程修改了 其他线程也要读新的值(这其实)。

锁的分类

互斥锁与自旋锁

最底层的两种就是会「互斥锁和自旋锁」,有很多高级的锁都是基于它们实现的,你可以认为它们是各种锁的地基,所以我们必须清楚它俩之间的区别和应用。

  • 互斥锁加锁失败后,线程会释放 CPU ,给其他线程;
  • 自旋锁加锁失败后,线程会忙等待,直到它拿到锁;

互斥锁

互斥锁是一种「独占锁」,比如当线程 A 加锁成功后,此时互斥锁已经被线程 A 独占了,只要线程 A 没有释放手中的锁,线程 B 加锁就会失败,于是就会释放 CPU 让给其他线程,既然线程 B 释放掉了 CPU,自然线程 B 加锁的代码就会被阻塞

上下文切换

互斥锁加锁失败时,会从用户态陷入到内核态,让内核帮我们切换线程,虽然简化了使用锁的难度,但是存在一定的性能开销成本。

那这个开销成本是什么呢?会有两次线程上下文切换的成本

  • 当线程加锁失败时,内核会把线程的状态从「运行」状态设置为「睡眠」状态,然后把 CPU 切换给其他线程运行;
  • 接着,当锁被释放时,之前「睡眠」状态的线程会变为「就绪」状态,然后内核会在合适的时间,把 CPU 切换给该线程运行。

线程的上下文切换的是什么?当两个线程是属于同一个进程,因为虚拟内存是共享的,所以在切换时,虚拟内存这些资源就保持不动,只需要切换线程的私有数据、寄存器等不共享的数据。

自旋锁

自旋锁是通过 CPU 提供的 CAS 函数(Compare And Swap)来实现的,在「用户态」完成加锁和解锁操作,不会主动产生线程上下文切换,所以相比互斥锁来说,会快一些,开销也小一些。

一般加锁的过程,包含两个步骤:

  • 第一步,查看锁的状态,如果锁是空闲的,则执行第二步;
  • 第二步,将锁设置为当前线程持有;

CAS 函数就把这两个步骤合并成一条硬件级指令,形成原子指令,这样就保证了这两个步骤是不可分割的,要么一次性执行完两个步骤,要么两个步骤都不执行。

使用自旋锁的时候,当发生多线程竞争锁的情况,加锁失败的线程会「忙等待」,直到它拿到锁。这里的「忙等待」可以用 while 循环等待实现,不过最好是使用 CPU 提供的 PAUSE 指令来实现「忙等待」,因为可以减少循环等待时的耗电量。

自旋锁是最比较简单的一种锁,一直自旋,利用 CPU 周期,直到锁可用。需要注意,在单核 CPU 上,需要抢占式的调度器(即不断通过时钟中断一个线程,运行其他线程)。否则,自旋锁在单 CPU 上无法使用,因为一个自旋的线程永远不会放弃 CPU。

自旋锁开销少,在多核系统下一般不会主动产生线程切换,适合异步、协程等在用户态切换请求的编程方式,但如果被锁住的代码执行时间过长,自旋的线程会长时间占用 CPU 资源,所以自旋的时间和被锁住的代码执行的时间是成「正比」的关系,我们需要清楚的知道这一点。

自旋锁与互斥锁使用层面比较相似,但实现层面上完全不同:当加锁失败时,互斥锁用「线程切换」来应对,自旋锁则用「忙等待」来应对

它俩是锁的最基本处理方式,更高级的锁都会选择其中一个来实现,比如读写锁既可以选择互斥锁实现,也可以基于自旋锁实现。

读写锁

读写锁的工作原理是:

  • 当「写锁」没有被线程持有时,多个线程能够并发地持有读锁,这大大提高了共享资源的访问效率,因为「读锁」是用于读取共享资源的场景,所以多个线程同时持有读锁也不会破坏共享资源的数据。
  • 但是,一旦「写锁」被线程持有后,读线程的获取读锁的操作会被阻塞,而且其他写线程的获取写锁的操作也会被阻塞。

乐观锁与悲观锁

  • 悲观锁做事比较悲观,它认为多线程同时修改共享资源的概率比较高,于是很容易出现冲突,所以访问共享资源前,先要上锁
  • 乐观锁做事比较乐观,它假定冲突的概率很低,它的工作方式是:先修改完共享资源,再验证这段时间内有没有发生冲突,如果没有其他线程在修改资源,那么操作完成,如果发现有其他线程已经修改过这个资源,就放弃本次操作

乐观锁

乐观锁的实现:

  • CAS 实现:Java 中java.util.concurrent.atomic包下面的原子变量使用了乐观锁的一种 CAS 实现方式。
  • 版本号控制:一般是在数据表中加上一个数据版本号 version 字段,表示数据被修改的次数。当数据被修改时,version 值会+1。当线程A要更新数据值时,在读取数据的同时也会读取 version 值,在提交更新时,若刚才读取到的 version 值与当前数据库中的 version 值相等时才更新,否则重试更新操作,直到更新成功。

MySQL 乐观锁 (CAS)

扣减库存问题,通过乐观锁可以实现如下:

1
2
3
4
// 查询商品库存信息, quantity查出来为3
select quantity from items where id = 1;
// 修改商品库存为 2
update items set quantity = 2 where id = 1 and quantity = 3;

在更新之前,先查询一下库存表中当前库存数(quantity),然后在做 update 的时候,以库存数作为一个修改条件。当提交更新的时候,判断数据库表对应记录的当前库存数与第一次取出来的库存数进行比对,如果数据库表当前库存数与第一次取出来的库存数相等,则予以更新,否则认为是过期数据。

基于版本号解决 ABA 问题

通过一个单独的可以顺序递增的 version 字段,解决 ABA 问题。

乐观锁每次在执行数据修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行 +1 操作,否则就执行失败。因为每次操作的版本号都会随之增加,所以不会出现 ABA 问题。除了 version 以外,还可以使用时间戳,因为时间戳天然具有顺序递增性。

锁粒度

以上 SQL 其实还是有一定的问题的,就是一旦遇上高并发的时候,就只有一个线程可以修改成功,那么就会存在大量的失败。对于像淘宝这样的电商网站,高并发是常有的事,总让用户感知到失败显然是不合理的。所以,还是要想办法减少乐观锁的粒度。一个比较好的建议,就是减小乐观锁力度,最大程度的提升吞吐率,提高并发能力!

1
2
// 修改商品库存为 2
update items set quantity = 2 where id = 1 and quantity - 1 > 0;

以上 SQL 语句中,如果用户下单数为 1,则通过quantity - 1 > 0的方式进行乐观锁控制。在执行过程中,会在一次原子操作中查询一遍 quantity 的值,并将其扣减掉 1。

Golang 实现

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"sync"
"time"
)

var gTotal int32 = 10000
var mutex sync.Mutex

// 加锁
func sellerSafe(id int) {
time.Sleep(1 * time.Second)
mutex.Lock()
defer mutex.Unlock()
gTotal--

}

// 这个函数之所以线程不安全,是因为没有支持 可见性,即在一个协程中的修改在另一个协程中不可见(不可见意味了另一个协程还在使用更新前的数据)
func sellerUnSafe(id int) {
gTotal--
}

func main() {
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
// 正确 9000
//sellerSafe(i)
// 不正确 != 9000
sellerUnSafe(i)
wg.Done()
}()
}
wg.Wait()
fmt.Println("gTotal: ", gTotal)
}

cas实现的乐观锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var valueCas int32 = 0

func main() {
fmt.Println("======old valueCas=======")
fmt.Println(valueCas)
fmt.Println("======CAS valueCas=======")
wg := sync.WaitGroup{}
for i := 0; i < 100000; i++ {
wg.Add(1)
go func(tmp int) {
time.Sleep(1 * time.Second)
addValue(1)
//addValueBad(int32(tmp))
//addValueBadSwap(1)
wg.Done()
}(i)
}
wg.Wait()
fmt.Println(valueCas)

}

/*
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
1. 调用函数后,会先判断参数addr指向的被操作值与参数old的值是否相等
(这是 cas 操作的精髓所在, 这能保证 只有在old值没有被修改时,才会做交换操作。所以将 old 从地址中取出来,并使用old参与其他的,这都是多线程安全的)。
2. 只有判断到是相等时(说明其他线程没有修改old),才会用参数new代表的新值替换掉原先的旧值,否则操作就会被忽略。
3. 因此, 需要用for循环不断进行尝试,直到成功为止。

cas: 以下几个操作,一般情况下是对应多条指令, 但是在 cas 操作中可以理解为为了一条cpu指令,包括从 addr中取址, 和old比较,赋值等
{
if *addr == old {
*addr = new
return true
}
return false
}


swap:
{
old = *addr
*addr = new
return old
}

*/

// cas
func addValue(delta int32) {
for {
// 这里直接读数据就可以了,因为 cas 会将 v 和 地址中的值做比较
old := valueCas
new_ := old + delta
// 1. 如果 v 和 valueCas 地址中的值不一致,不会 swap
// 2. v 和 valueCas 地址中的值不一致, swap
if atomic.CompareAndSwapInt32(&valueCas, old, new_) {
break
}
// hits 这里触发了, 说明发生了 race
//fmt.Println("hits")
}
}

// 这个数据是错误的
func addValueBad(delta int32) {
valueCas += delta
}

// swap 也是不能保证的, 因为 v + delta 不是原子的
func addValueBadSwap(delta int32) {
v := atomic.LoadInt32(&valueCas)
atomic.SwapInt32(&valueCas, v+delta)
}

CAS

CAS 的问题

ABA问题

因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

开销大

循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。

CAS 总结

可以用CAS在无锁的情况下实现原子操作,但要明确应用场合,非常简单的操作且又不想引入锁可以考虑使用CAS操作,当想要非阻塞地完成某一操作也可以考虑CAS。不推荐在复杂操作中引入CAS,会使程序可读性变差,且难以测试,同时会出现ABA问题。

并发和并行

  • 并行性是指两个或多个事件在同一时刻发生
  • 并发性是指两个或多个事件在同一时间间隔内发生
    • 并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机系统中每一时刻却仅能有一道程序执行,故在一个处理器上,微观上这些程序只能是分时地交替执行。倘若在计算机系统中有多个处理机,则这些可以并发执行的程序便可被分配到多个处理机上,实现并行执行,即利用每个处理机来处理一个可并发执行的程序,这样,多个程序便可同时执行

简单说,并发编程指的是赋予程序可以并行运行的能力。并行如上所说,指的是同时执行。

参考

面试官:你说说互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景

并发编程—CAS(Compare And Swap)

什么是乐观锁,什么是悲观锁