线程同步

在 Go 语言中,经常会遇到并发的问题,当然我们会优先考虑使用通道,同时 Go 语言也给出了传统的解决方式 Mutex(互斥锁)RWMutex(读写锁) 来处理竞争条件。

临界区

​ 首先我们要理解并发编程中临界区的概念。当程序并发地运行时,多个 Go 协程不应该同时访问那些修改共享资源的代码。这些修改共享资源的代码称为临界区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"sync"
)

type Bank struct {
balance int
}

func (b *Bank) Deposit(amount int) {
b.balance += amount
}

func (b *Bank) Balance() int {
return b.balance
}

func main() {
var wg sync.WaitGroup
b := &Bank{}

n := 1000
wg.Add(n)
for i := 1; i <= n; i++ {
go func() {
b.Deposit(1000)
wg.Done()
}()
}
wg.Wait()
fmt.Println(b.Balance()) //969000,990000,941000
}

​ 当然,对于只有一个协程的程序来说,上面的代码没有任何问题。但是,如果有多个协程并发运行时,就会发生错误,这种情况就称之为数据竞争(data race)。使用下面的互斥锁 Mutex 就能避免这种情况的发生。

互斥锁 Mutex

互斥锁(Mutex,mutual exclusion) 用于提供一种 加锁机制(Locking Mechanism) ,可确保在某时刻只有一个协程在临界区运行,以防止出现竞争。也是为了来保护一个资源不会因为并发操作而引起冲突导致数据不准确。

Mutex 有两个方法,分别是 Lock()Unlock() ,即对应的加锁和解锁。在 Lock()Unlock() 之间的代码,都只能由一个协程执行,就能避免竞争条件。

如果有一个协程已经持有了锁(Lock),当其他协程试图获得该锁时,这些协程会被阻塞,直到Mutex解除锁定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
)

type Bank struct {
balance int
m sync.Mutex
}

func (b *Bank) Deposit(amount int) {
b.m.Lock()
b.balance += amount
b.m.Unlock()
}

func (b *Bank) Balance() int {
return b.balance
}

func main() {
var wg sync.WaitGroup
b := &Bank{}

n := 1000
wg.Add(n)
for i := 1; i <= n; i++ {
go func() {
b.Deposit(1000)
wg.Done()
}()
}
wg.Wait()
fmt.Println(b.Balance())
}


// 输出1000000

关键代码:

1
2
3
4
5
6
7
8
9
10
type Bank struct {
balance int
m sync.Mutex
}

func (b *Bank) Deposit(amount int) {
b.m.Lock()
b.balance += amount
b.m.Unlock()
}

​ 注意同一协程里不要在尚未解锁时再次加锁,也不要对已经解锁的锁再次解锁。当然,使用通道也可以处理竞争条件。

读写锁 RWMutex

sync.RWMutex 类型实现读写互斥锁,适用于读多写少的场景,它规定了当有人还在读取数据(即读锁占用)时,不允许有人更新这个数据(即写锁会阻塞);为了保证程序的效率,多个人(协程)读取数据(拥有读锁)时,互不影响不会造成阻塞,它不会像 Mutex 那样只允许有一个人(协程)读取同一个数据。读锁与读锁兼容,读锁与写锁互斥,写锁与写锁互斥。

  • 可以同时申请多个读锁。
  • 有读锁时申请写锁将阻塞,有写锁时申请读锁将阻塞。
  • 只要有写锁,后续申请读锁和写锁都将阻塞。

定义一个 RWMuteux 读写锁:

1
var rwMutex sync.RWMutex

RWMutex 里提供了两种锁,每种锁分别对应两个方法,为了避免死锁,两个方法应成对出现,必要时请使用 defer

  • 读锁:调用 RLock 方法开启锁,调用 RUnlock 释放锁;
  • 写锁:调用 Lock 方法开启锁,调用 Unlock 释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
)

type Bank struct {
balance int
rwMutex sync.RWMutex
}

func (b *Bank) Deposit(amount int) {
b.rwMutex.Lock() // write lock
b.balance += amount
b.rwMutex.Unlock() // write unlock
}

func (b *Bank) Balance() (balance int) {
b.rwMutex.RLock() // read lock
balance = b.balance
b.rwMutex.RUnlock() // read unlock
return
}

func main() {
var wg sync.WaitGroup
b := &Bank{}

n := 1000
wg.Add(n)
for i := 1; i <= n; i++ {
go func() {
b.Deposit(1000)
wg.Done()
}()
}
wg.Wait()
fmt.Println(b.Balance())
}

条件变量 sync.Cond

用途

​ 与互斥量不同,条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。条件变量总是与互斥量组合使用互斥量为共享数据的访问提供互斥支持,而条件变量可以就共享数据的状态的变化向相关线程发出通知。

声明

1
2
3
4
5
lock := new(sync.Mutex)
cond := sync.NewCond(lock)

// 或者
cond := sync.NewCond(new(sync.Mutex))

方法

​ Cond 实现了一个条件变量,在 Locker 的基础上增加的一个消息通知的功能,保存了一个通知列表,用来唤醒一个或所有因等待条件变量而阻塞的 Go 程,以此来实现多个 Go 程间的同步。

1
2
3
4
5
6
7
8
9
10
11
12
// 创建一个带锁的条件变量,Locker 通常是一个 *Mutex 或 *RWMutex
func NewCond(l Locker) *Cond

// 唤醒所有因等待条件变量 c 阻塞的 goroutine
func (c *Cond) Broadcast()

// 唤醒一个因等待条件变量 c 阻塞的 goroutine
func (c *Cond) Signal()

// 等待 c.L 解锁并挂起 goroutine,在稍后恢复执行后,Wait 返回前锁定 c.L,
// 只有当被 Broadcast 和 Signal 唤醒,Wait 才能返回。
func (c *Cond) Wait()

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"fmt"
"os"
"os/signal"
"sync"
"time"
)

func listen(name string, s []string, c *sync.Cond) {
c.L.Lock()
c.Wait()
fmt.Println(name, "姓名:", s)
c.L.Unlock()
}

func broadcast(event string, c *sync.Cond) {
time.Sleep(time.Second)
c.L.Lock()
fmt.Println(event)
c.Broadcast()
c.L.Unlock()
}

func main() {
s1 := []string{"张三"}
s2 := []string{"赵四"}
s3 := []string{"刘能"}

var m sync.RWMutex

// 创建一个带锁的条件变量, Locker通常是Mutex或者RWMutex
cond := sync.NewCond(&m)

/*
Broadcast 唤醒所有因等待条件变量c阻塞的goroutine
Signal 唤醒所有因等待条件变量c阻塞的goroutine
Wait 等待c.L解锁并且挂起goroutine, 在稍后恢复执行后, Wait返回锁定c.L, 只有当被Broadcast或者Signal唤醒, wait才返回
*/

// listener 1
go listen("室友1", s1, cond)
go listen("室友2", s2, cond)
go listen("室友3", s3, cond)
go broadcast("唤醒:", cond)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}

channel和sync区别

一般并发就是指多个程序逻辑上同时运行,但肯定会遇到一些非线程安全的东西需要做同步(无法并行,只能串行),这个时候就有sync和channel2种方式实现同步。只不过channel除了做同步,还能用于传递变量(即通过通信来共享变量),而sync只有同步作用(通过共享变量来通信)