Go语言类库-sync

  • Post author:
  • Post category:其他


通道并不是Go支持的唯一的一种并发同步技术。而且对于一些特定的情形,通道并不是最有效和可读性最高的同步技术。 本文下面将介绍sync标准库包中提供的各种并发同步技术。相对于通道,这些技术对于某些情形更加适用。

sync标准库包提供了一些用于实现并发同步的类型。这些类型适用于各种不同的内存顺序需求。 对于这些特定的需求,这些类型使用起来比通道效率更高,代码实现更简洁。



基本原语

在这里插入图片描述



WaitGroup

每个sync.WaitGroup值在内部维护着一个计数,此计数的初始默认值为零。

type WaitGroup struct {
	noCopy noCopy
	state1 uint64
	state2 uint32
}
  • noCopy : 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
  • state1/state2 : 存储着状态和信号量;

*sync.WaitGroup类型有

三个方法

:Add(delta int)、Done()和Wait()。

对于一个可寻址的sync.WaitGroup值wg,

  • 我们可以使用方法调用wg.Add(delta)来改变值wg维护的计数。
  • 方法调用wg.Done()和wg.Add(-1)是完全等价的。
  • 如果一个wg.Add(delta)或者wg.Done()调用将wg维护的计数更改成一个负数,一个恐慌将产生。
  • 当一个协程调用了wg.Wait()时,

    • 如果此时wg维护的计数为零,则此wg.Wait()此操作为一个空操作(no-op);
    • 否则(计数为一个正整数),此协程将进入阻塞状态。 当以后其它某个协程将此计数更改至0时(一般通过调用wg.Done()),此协程将重新进入运行状态(即wg.Wait()将返回)。



等待多个协程返回

一般,一个sync.WaitGroup值用来让某个协程等待其它若干协程都先完成它们各自的任务。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wg sync.WaitGroup
	wg.Add(N)
	for i := 0; i < N; i++ {
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			fmt.Println("Done:", i)
			wg.Done() // <=> wg.Add(-1)
		}()
	}

	wg.Wait()
	// 所有的元素都保证被初始化了。
	fmt.Println("values:", values)
}

在此例中,主协程等待着直到其它5个协程已经将各自负责的元素初始化完毕此会打印出各个元素值。

我们可以将上例中的Add方法调用拆分成多次调用:

...
	var wg sync.WaitGroup
	for i := 0; i < N; i++ {
		wg.Add(1) // 将被执行5次
		i := i
		go func() {
			values[i] = 50 + rand.Int31n(50)
			wg.Done()
		}()
	}
...



多个接收其他某个协程的完成通知

一个*sync.WaitGroup值的Wait方法可以在多个协程中调用。 当对应的sync.WaitGroup值维护的计数降为0,这些协程都将得到一个(广播)通知而结束阻塞状态。

func main() {
	rand.Seed(time.Now().UnixNano())

	const N = 5
	var values [N]int32

	var wgA, wgB sync.WaitGroup
	wgA.Add(N)
	wgB.Add(1)

	for i := 0; i < N; i++ {
		i := i
		go func() {
			wgB.Wait() // 等待广播通知
			log.Printf("values[%v]=%v \n", i, values[i])
			wgA.Done()
		}()
	}

	// 下面这个循环保证将在上面的任何一个
	// wg.Wait调用结束之前执行。
	for i := 0; i < N; i++ {
		values[i] = 50 + rand.Int31n(50)
	}
	wgB.Done() // 发出一个广播通知
	wgA.Wait()
}

一个WaitGroup在它的一个Wait方法返回之后可以被重用。 但是请注意,当一个WaitGroup值维护的基数为零时,它的带有正整数实参的Add方法调用不能和它的Wait方法调用并发运行,否则将可能出现数据竞争。

通过对 sync.WaitGroup 的分析和研究,我们能够得出以下结论:

  • sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
  • sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的 Goroutine;
  • 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;



Once

type Once struct {
	done uint32
	m    Mutex
}

该结构体对外提供一个方法:func (o *Once) Do(f func()) {},用来保障传入的方法在多个协程中只被执行一次。

package main

import (
	"log"
	"sync"
)

func main() {
	log.SetFlags(0)

	x := 0
	doSomething := func() {
		x++
		log.Println("Hello")
	}

	var wg sync.WaitGroup
	var once sync.Once
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			once.Do(doSomething)
			log.Println("world!")
		}()
	}

	wg.Wait()
	log.Println("x =", x) // x = 1
}

在此例中,Hello将仅被输出一次,而world!将被输出5次,并且Hello肯定在所有的5个world!之前输出。



小结

作为用于保证函数执行次数的

sync.Once

结构体,它使用互斥锁和

sync/atomic

包提供的方法实现了某个函数在程序运行期间只能执行一次的语义。在使用该结构体时,我们也需要注意以下的问题:


  • sync.Once.Do

    方法中传入的函数只会被执行一次,哪怕函数中发生了 panic;
  • 两次调用

    sync.Once.Do

    方法传入不同的函数只会执行第一次调传入的函数;



Mutext和RWMutext


sync.Mutex和

sync.RWMutex类型都实现了sync.Locker接口类型。 所以这两个类型都有两个方法:Lock()和Unlock(),用来保护一份数据不会被多个使用者同时读取和修改。

除了Lock()和Unlock()这两个方法,

sync.RWMutex类型还有两个另外的方法:RLock()和RUnlock(),用来支持多个读取者并发读取一份数据但防止此份数据被某个数据写入者和其它数据访问者(包括读取者和写入者)同时使用。

(注意:这里的

*

数据读取者





数据写入者

**不应该从字面上理解。有时候某些数据读取者可能修改数据,而有些数据写入者可能只读取数据。)



互质锁

一个Mutex值常称为一个互斥锁。 一个Mutex零值为一个尚未加锁的互斥锁。 一个(可寻址的)Mutex值m只有在未加锁状态时才能通过m.Lock()方法调用被成功加锁。 换句话说,一旦m值被加了锁(亦即某个m.Lock()方法调用成功返回), 一个新的加锁试图将导致当前协程进入阻塞状态,直到此Mutex值被解锁为止(通过m.Unlock()方法调用)。

package main

import (
	"fmt"
	"runtime"
	"sync"
)

type Counter struct {
	m sync.Mutex
	n uint64
}

func (c *Counter) Value() uint64 {
	c.m.Lock()
	defer c.m.Unlock()
	return c.n
}

func (c *Counter) Increase(delta uint64) {
	c.m.Lock()
	c.n += delta
	c.m.Unlock()
}

func main() {
	var c Counter
	for i := 0; i < 100; i++ {
		go func() {
			for k := 0; k < 100; k++ {
				c.Increase(1)
			}
		}()
	}

	// 此循环仅为演示目的。
	for c.Value() < 10000 {
		runtime.Gosched()
	}
	fmt.Println(c.Value()) // 10000
}

一个Counter值使用了一个Mutex字段来确保它的字段n永远不会被多个协程同时使用。



读写锁

一个RWMutex值常称为一个读写互斥锁,它的内部包含两个锁:一个写锁和一个读锁。 对于一个可寻址的RWMutex值rwm,数据写入者可以通过方法调用rwm.Lock()对rwm加写锁,或者通过rwm.RLock()方法调用对rwm加读锁。 方法调用rwm.Unlock()和rwm.RUnlock()用来解开rwm的写锁和读锁。 rwm的读锁维护着一个计数。当rwm.RLock()调用成功时,此计数增1;当rwm.Unlock()调用成功时,此计数减1; 一个零计数表示rwm的读锁处于未加锁状态;反之,一个非零计数(肯定大于零)表示rwm的读锁处于加锁状态。

对于一个可寻址的RWMutex值rwm,下列规则存在:

  • rwm的写锁只有在它的写锁和读锁都处于未加锁状态时才能被成功加锁。 换句话说,rwm的写锁在任何时刻最多只能被一个数据写入者成功加锁,并且rwm的写锁和读锁不能同时处于加锁状态。
  • 当rwm的写锁正处于加锁状态的时候,任何新的对之加写锁或者加读锁的操作试图都将导致当前协程进入阻塞状态,直到此写锁被解锁,这样的操作试图才有机会成功。
  • 当rwm的读锁正处于加锁状态的时候,新的加写锁的操作试图将导致当前协程进入阻塞状态。 但是,一个新的加读锁的操作试图将成功,只要此操作试图发生在任何被阻塞的加写锁的操作试图之前(见下一条规则)。 换句话说,一个读写互斥锁的读锁可以同时被多个数据读取者同时加锁而持有。 当rwm的读锁维护的计数清零时,读锁将返回未加锁状态。
  • 假设rwm的读锁正处于加锁状态的时候,为了防止后续数据写入者没有机会成功加写锁,后续发生在某个被阻塞的加写锁操作试图之后的所有加读锁的试图都将被阻塞。
  • 假设rwm的写锁正处于加锁状态的时候,(至少对于标准编译器来说,)为了防止后续数据读取者没有机会成功加读锁,发生在此写锁下一次被解锁之前的所有加读锁的试图都将在此写锁下一次被解锁之后肯定取得成功,即使所有这些加读锁的试图发生在一些仍被阻塞的加写锁的试图之后。

后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。

package main

import (
	"fmt"
	"time"
	"sync"
)

func main() {
	var m sync.RWMutex
	go func() {
		m.RLock()
		fmt.Print("a")
		time.Sleep(time.Second)
		m.RUnlock()
	}()
	go func() {
		time.Sleep(time.Second * 1 / 4)
		m.Lock()
		fmt.Print("b")
		time.Sleep(time.Second)
		m.Unlock()
	}()
	go func() {
		time.Sleep(time.Second * 2 / 4)
		m.Lock()
		fmt.Print("c")
		m.Unlock()
	}()
	go func () {
		time.Sleep(time.Second * 3 / 4)
		m.RLock()
		fmt.Print("d")
		m.RUnlock()
	}()
	time.Sleep(time.Second * 3)
	fmt.Println()
}
//输出abdc

请注意:一个锁并不会绑定到一个协程上,即一个锁并不记录哪个协程成功地加锁了它。 换句话说,一个锁的加锁者和此锁的解锁者可以不是同一个协程,尽管在实践中这种情况并不多见。



读写锁提高读性能

在上一个例子中,如果Value方法被十分频繁调用而Increase方法并不频繁被调用,则Counter类型的m字段的类型可以更改为sync.RWMutex,从而使得执行效率更高,如下面的代码所示。

type Counter struct {
	//m sync.Mutex
	m sync.RWMutex
	n uint64
}

func (c *Counter) Value() uint64 {
	//c.m.Lock()
	//defer c.m.Unlock()
	c.m.RLock()
	defer c.m.RUnlock()
	return c.n
}



Mutex和RWMutex值实现通知

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m sync.Mutex
	m.Lock()
	go func() {
		time.Sleep(time.Second)
		fmt.Println("Hi")
		m.Unlock() // 发出一个通知
	}()
	m.Lock() // 等待通知
	fmt.Println("Bye")
}

在此例中,Hi将确保在Bye之前打印出来。



Cond

Go 语言标准库中还包含条件变量 sync.Cond,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。每一个 sync.Cond 结构体在初始化时都需要传入一个互斥锁。

type Cond struct {
	noCopy  noCopy
	L       Locker
	notify  notifyList
	checker copyChecker
}
type notifyList struct {
	wait uint32
	notify uint32

	lock mutex
	head *sudog
	tail *sudog
}
  • noCopy : 用于保证结构体不会在编译期间拷贝;
  • copyChecker : 用于禁止运行期间发生的拷贝;
  • L : 用于保护内部的 notify 字段,Locker 接口类型的变量;
  • notify : 一个 Goroutine 的链表,它是实现同步机制的核心结构;

在 sync.notifyList 结构体中,head 和 tail 分别指向的链表的头和尾,wait 和 notify 分别表示当前正在等待的和已经通知到的 Goroutine 的索引。

sync.Cond 对外暴露的 sync.Cond.Wait 方法会将当前 Goroutine 陷入休眠状态,它的执行过程分成以下两个步骤:

  1. 调用 runtime.notifyListAdd 将等待计数器加一并解锁;
  2. 调用 runtime.notifyListWait 等待其他 Goroutine 的唤醒并加锁;
  3. runtime.notifyListWait 会获取当前 Goroutine 并将它追加到 Goroutine 通知链表的最末端;

除了将当前 Goroutine 追加到链表的末端之外,我们还会调用 runtime.goparkunlock 将当前 Goroutine 陷入休眠,该函数也是在 Go 语言切换 Goroutine 时经常会使用的方法,它会直接让出当前处理器的使用权并等待调度器的唤醒。

sync.Cond.Signal 和 sync.Cond.Broadcast 就是用来唤醒陷入休眠的 Goroutine 的方法,它们的实现有一些细微的差别:

  • sync.Cond.Signal 方法会唤醒队列最前面的 Goroutine;
  • sync.Cond.Broadcast 方法会唤醒队列中全部的 Goroutine;
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

runtime.notifyListNotifyOne 只会从 sync.notifyList 链表中找到满足 sudog.ticket == l.notify 条件的 Goroutine 并通过 runtime.readyWithTime 唤醒,runtime.notifyListNotifyAll 会依次通过 runtime.readyWithTime 唤醒链表中 Goroutine。Goroutine 的唤醒顺序也是按照加入队列的先后顺序,先加入的会先被唤醒,而后加入的可能 Goroutine 需要等待调度器的调度。

在一般情况下,我们都会先调用 sync.Cond.Wait 陷入休眠等待满足期望条件,当满足唤醒条件时,就可以选择使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 唤醒一个或者全部的 Goroutine。

我们可以通过下面的例子了解它的使用方法:

var status int64

func main() {
	c := sync.NewCond(&sync.Mutex{})
	for i := 0; i < 10; i++ {
		go listen(c)
	}
	time.Sleep(1 * time.Second)
	go broadcast(c)

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	<-ch
}

func broadcast(c *sync.Cond) {
	c.L.Lock()
	atomic.StoreInt64(&status, 1)
	c.Broadcast()
	c.L.Unlock()
}

func listen(c *sync.Cond) {
	c.L.Lock()
	for atomic.LoadInt64(&status) != 1 {
		c.Wait()
	}
	fmt.Println("listen")
	c.L.Unlock()
}

$ go run main.go
listen
...
listen

上述代码同时运行了 11 个 Goroutine,这 11 个 Goroutine 分别做了不同事情:

  • 10 个 Goroutine 通过 sync.Cond.Wait 等待特定条件的满足;
  • 1 个 Goroutine 会调用 sync.Cond.Broadcast 唤醒所有陷入等待的 Goroutine;

在这里插入图片描述



小结

sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:

  • sync.Cond.Wait 在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
  • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
  • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;



Map

Go中原生的map不是线程安全的,但是Go提供了一个sync.Map的结构体支持线程安全。

type Map struct {
    mu Mutext
    read atomic.Value
    dirty map[interface{}]*entity
    misses int
}

字段read是atomic.Value类型,可以并发读,但是如果需要更新read,则需要加锁保护。

字段dirty是非线程安全的原生map,包含新写入的key,并且包含read中的所有未被删除的key。

对外提供以下方法:

Store:更新或者添加某对key-value

Load:查找map中的key

Delete:删除key

LoadOrStore:如果map中存在这个key就返回这个key的value;否则将k-v存入map

Range:参数是一个func(key,value interface{})bool函数,由使用者提供实现,Range将遍历调用时刻的map中所有k-v,将他们传入这个方法,如果返回false,则停止遍历。



Pool

sync.Pool可以作为保存临时取还对象的一个池子。大量重复创建很多对象可能引起GC压力大,可以使用Pool存储对象。对于很多需要重复分配、回收内存的地方sync.Pool是一个很好的选择。sync.Pool可以将暂时不用的对象缓存起来,待下次需要的时候直接使用,不用再次经过内存分配。



使用方法

sync.Pool是协程安全的。使用前设置好创建对象的New函数,之后可以通过Get()和Put()方法可以取、还对象。

type Person struct {
    Name string
}
func initPool(){
    pool := &sync.Pool{
        fmt.Println("new一个对象")
        return new(Person)
    }
}
func main(){
    intPool()
    p := pool.Get().(*Person)
    fmt.Println(p)
    p.Name = "keke"
    pool.Put(p)
}



版权声明:本文为jeremy_ke原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。