什么是CSP
- CSP经常被认为是Go在并发编程上成功的关键因素.CSP全称是“Communicating Sequential Processes”(通讯顺序进程)、
- CSP 也是一门自定义的编程语言,作者定义了输入输出语句,用于 processes 间的通信(communicatiton)。processes 被认为是需要输入驱动,并且产生输出,供其他 processes 消费,processes 可以是进程、线程、甚至是代码块。
- Go 是第一个将 CSP 的这些思想引入,并且发扬光大的语言。Go 一开始就把 CSP 的思想融入到语言的核心里,所以并发编程成为 Go 的一个独特的优势,而且很容易理解。
- 大多编语言的并发编程模型是基于线程和内存同步访问控制,Go的并发编程的模型规则用goroutine和channel来代替。Goroutine和线程类似,channel和mutex(用于内存访问控制)类似。
- Goruntine解放了程序员,让我们更能贴近业务去思考问题。而不用考虑各种线程库,线程开销,线程调度等等这些繁琐的底层问题,goroutine天生替你解决了
- Channel 则天生就可以和其他channel组合。我们可以收集各种子系统结果的channel输入到同一个channel。channel还可以和selest,cance,timeout 结合起来。而mutex就没有这些功能。l
什么是channel
- Goroutime 和 channel 是Go语言并发编程的两大基石。Goroutine用于执行并发任务,channel用于goroutine之间的同步通信。
- Channel 在 gouroutine 间架起了一条管道,在管道里传输数据,实现 gouroutine 间的通信;由于它是线程安全的,所以用起来非常方便;channel 还提供“先进先出”的特性;它还能影响 goroutine 的阻塞和唤醒。
相信大家一定见过一句话:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现共享内存
*理解:
前半句说的是sync包里的一些组件进行编发编程;后半句则是说Go推荐使用channel进行并发编程。两者其实都是必要且有效的。实际上看完channel源码分析,你就会发现,channel的底层就是通过mutex来并发的。只是channel是更高层次的并发编程原语,封装了更多的功能。
channel实现CSP
- channel是Go语言中一个非常重要的类型,是Go里的第一对象。通过channel,Go实现了通过通信来实现内存共享,Channel是在多个goroutine之间传递数据和同步的重要手段。
- channel字面意义是“通道”,类似于Linux中的管道。声明channel的语法如下:
chan T //声明一个双向通道
chan<- T //声明一个只能用于发送的通道
<-chan T //声明一个只能用于接收的通道
单向通道的声明,用
<-
来表示,它指明通道的方向。你只要明白,代码的书写顺序是从左到右就能马上掌握通道的方向是怎样的。
- 因为channel是一个引用类型,所以它被初始化之前,它的值是nil,channel使用make函数进行初始化。可以向它传递一个int值,代表channel缓冲区的大小(容量),构造出来的是一个缓冲型的channel;不传或传0,构造的就是一个非缓冲型的channel。
- 两者有一些差别:非缓冲型 channel 无法缓冲元素,对它的操作一定顺序是“发送-> 接收 -> 发送 -> 接收 -> ……”,如果连续向一个非缓冲 chan 发送 2 个元素,并且没有接收的话,第二次一定会被阻塞;对于缓冲型 channel 的操作,则要“宽松”一些,毕竟是带了“缓冲”光环
为什么要 channel
- Go通过channel实现CSP通信模型,主要用于goroutine之间消息传递和事件通知。
- 有了channel和goroutine之后,Go的并发编程变的异常容易和安全,得以让程序员把注意力留到业务上去,实现并发效率的提升。
channel的实现原理
- 对chan的发送和接收操作都会在编译期间换成为底层的发送接收函数
- Channel分为两种:带缓冲,不带缓冲。对不带缓冲的channel进行的操作实际上可以看作“同步模式”,带缓冲的则为“异步模式”
- 同步模式下,发送方和接收方要同步就绪,只有在两者都 ready(准备好) 的情况下,数据才能在两者间传输(后面会看到,实际上就是内存拷贝)。否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
- 异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
小结:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。
数据结构
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint
// receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters
// 保护 hchan 中所有字段
lock mutex
}
关于字段的含义都写在注释里了,再来重点说几个字段:
- buf 指向底层循环数组,只有缓冲型的channel才有。
- sendx,recvx均指向底层循环数组,表示当前可发送和接收的元素位置索引值(相对于底层数组)。
- sendq,recvq分别表示被阻塞的goroutine,这些goroutine由于尝试读取channel或向channel发送数据而被阻塞
- waitq是sudog的一个双向链表,而sudog实际上是对goroutine的一个封装:
type waitq struct{
first *sudog
last *sudog
}
-
lock 用来保证每个读channel或写channel都是原子的。
列如,创建一个容量为6的,元素为int型的channel数据结构如下
创建
一般而言,使用 make 创建一个能收能发的通道:
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)
创建 chan 的函数是 makechan:
func makechan(t *chantype, size int64) *hchan
具体来看下代码:
新建一个 chan 后,内存在堆上分配,大概长这样:
func goroutineA(a <-chan int) {
val :=<-a
fmt.Println("G1 received data",val)
return
}
func goroutineB(b <-chan int) {
val :=<-b
fmt.Println("G2 received data",val)
return
}
func main() {
ch :=make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
首先创建了一个无缓冲的 channel,接着启动两个 goroutine,并将前面创建的 channel 传递进去。然后,向这个 channel 中发送数据 3,最后 sleep 1 秒后程序退出。
程序第 14 行创建了一个非缓冲型的 channel,我们只看 chan 结构体中的一些重要字段,来从整体层面看一下 chan 的状态,一开始什么都没有:
接收
- 接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;
- 一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv1 函数处理不带 “ok” 的情形, chanrecv2 则通过返回 “received” 这个字段来反应 channel 是否被关闭。接收值则比较特殊,“放到”参数 elem 所指向的地址了,这很像 C/C++ 里的写法。如果代码里忽略了接收值,这里的 elem 为 nil。
无论如何,最终转向了 chanrecv 函数:
// 位于 src/runtime/chan.go
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 如果是一个 nil 的 channel
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
上面的代码注释地比较详细了,你可以对着源码一行行地去看,我们再来详细看一下。
- 如果channel是一个空值(nil),在非阻塞模式下,会直接返回,在阻塞模式下,会调用gopark函数挂起goroutine,这个会一直阻塞下去。因为channel是nil的情况下,要想不阻塞,只有关闭它,但关闭一个 nil 的 channel 又会发生 panic,所以没有机会被唤醒了。更详细地可以在 closechan 函数的时候再看。
- 和发送函数一样,接下来搞了一个在非阻塞模式下,不用获取锁,快速检测到失败并且返回的操作。
以上省略了好大一部分的讲解~~
因为看不懂,后续再来补充~
原文地址
我们继续之前的例子。前面说到第 14 行,创建了一个非缓冲型的 channel,接着,第 15、16 行分别创建了一个 goroutine,各自执行了一个接收操作。通过前面的源码分析,我们知道,这两个 goroutine (后面称为 G1 和 G2 好了)都会被阻塞在接收操作。G1 和 G2 会挂在 channel 的 recq 队列中,形成一个双向循环链表。
在程序的 17 行之前,chan 的整体数据结构如下:
buf 指向一个长度为 0 的数组,qcount 为 0,表示 channel 中没有元素。重点关注 recvq 和 sendq,它们是 waitq 结构体,而 waitq 实际上就是一个双向链表,链表的元素是 sudog,里面包含 g 字段, g 表示一个 goroutine,所以 sudog 可以看成一个 goroutine。recvq 存储那些尝试读取 channel 但被阻塞的 goroutine,sendq 则存储那些尝试写入 channel,但被阻塞的 goroutine。
此时,我们可以看到,recvq 里挂了两个 goroutine,也就是前面启动的 G1 和 G2。因为没有 goroutine 接收,而 channel 又是无缓冲类型,所以 G1 和 G2 被阻塞。sendq 没有被阻塞的 goroutine。
recvq(等待接收的groutine队列) 的数据结构如下。这里直接引用文章中的一幅图,用了三维元素,画得很好:
再从整体上来看一下 chan 此时的状态:
- G1 和 G2 被挂起了,状态是 WAITING。
- 关于goroutine调度器:goroutine是用户态的协程,由Go runtime 进行管理,作为对比,内核线程由OS进行管理。Goroutine更轻量,因此我们可以轻松创建数万goroutine
- 一个内核可以管理多个goroutine,当其中一个goroutine阻塞时内核线程可以调度其他goroutine来运行,内核线程本身不会阻塞,这就是我们通常说的M:N模型:
-
M:N模型通常由三部分构成:M,P,G。M是内核线程,负责运行goroutine;p是,保存goroutine运行所需要的上下文,它还维护了可运行(runnable)的goroutine列表;G是待运行的goroutine。M和P是G运行的基础。
继续回到例子。假设我们只有一个M,当G1(go goroutineA(ch))运行到val:=<-a时,它由本来的running状态变成waiting状态(调用了gopark结果)
G1脱离与M的关系,但调度器不会让M闲着,所以会接着调度另一个goroutine来运行:
G2也是同样的遭遇。现在G1和G2都被挂起了,等待这一个sender往channel里发送数据,才能得到解救
发送
接着上面的例子,G1 和 G2 现在都在 recvq(等待接收的goroutine) 队列里了。
第 17 行向 channel 发送了一个元素 3。
ch <- 3
- 如果检测到channel是空的,当前的goroutine会被挂起
- 对于不足塞的发送操作,如果channel未关闭并且没有多余的缓冲空间(说明:a.channel是非缓冲型的,且等待接收队列里没有goroutine;b. channel 是缓冲型的,但循环数组已经装满了元素)
以上省略了好大一部分的讲解~~
因为看不懂,后续再来补充~
原文地址
在发送小节里我们说到 G1 和 G2 现在被挂起来了,等待 sender 的解救。在第 17 行,主协程向 ch 发送了一个元素 3,来看下接下来会发生什么。
根据前面源码分析的结果,我们知道,sender 发现 ch 的 recvq 里有 receiver 在等待着接收,就会出队一个 sudog,把 recvq 里 first 指针的 sudo “推举”出来了,并将其加入到 P 的可运行 goroutine 队列中。
然后,sender 把发送元素拷贝到 sudog 的 elem 地址处,最后会调用 goready 将 G1 唤醒,状态变为 runnable。
当调度器光顾 G1 时,将 G1 变成 running 状态,执行 goroutineA 接下来的代码。G 表示其他可能有的 goroutine。
这里其实涉及到一个协程写另一个协程栈的操作。有两个 receiver 在 channel 的一边虎视眈眈地等着,这时 channel 另一边来了一个 sender 准备向 channel 发送数据,为了高效,用不着通过 channel 的 buf “中转”一次,直接从源地址把数据 copy 到目的地址就可以了,效率高啊!
上图是一个示意图,
3
会被拷贝到 G1 栈上的某个位置,也就是 val 的地址处,保存在 elem 字段。
关闭
关闭某个 channel,会执行函数 closechan:
以上省略了好大一部分的讲解~~
因为看不懂,后续再来补充~
原文地址
- close逻辑比较简单,对于一个channel,recvq和sendq中分别保存了阻塞的发送者和接收者。关闭channel后,对于等待接收者而言,会收到一个相应类型的零值。对于等待发送者,会直接panic,所以在不了解channel还有没有接收者的情况下,不能贸然关闭channel。
- close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒。
- 唤醒之后,该干嘛干嘛。sender 会继续执行 chansend 函数里 goparkunlock 函数之后的代码,很不幸,检测到 channel 已经关闭了,panic。receiver 则比较幸运,进行一些扫尾工作后,返回。这里,selected 返回 true,而返回值 received 则要根据 channel 是否关闭,返回不同的值。如果 channel 关闭,received 为 false,否则为 true。这我们分析的这种情况下,received 返回 false。
channel 进阶
操作 | nil channel | closed chan nel | not nil ,not closed channel |
---|---|---|---|
close | panic | panic | 正常关闭 |
接收(读) <- ch | 阻塞 | 读到对应类型的零值以及返回false | 阻塞或正常读取数据,缓冲型channel为空或非缓冲型channel没有发送者时会阻塞 |
发送(写) ch <- | 阻塞 | panic | 阻塞或正常写入数据。非缓冲channel没有接收者时或缓冲通道填满时 |
*小结:
1.发生panic的情况有三种:向一个关闭的channel进行写操作;关闭一个nil的channel;重复关闭一个channel。
2.读,写一个nil channel都会被阻塞
发送和接收元素的本质
Remember all transfer of value on the go channels happens with the copy of value.
- 就是说channel的发送和接收的操作都是“值拷贝”,无论是从sender goroutine的栈到chan buf,还是chan buf 到received goroutine,或者是直接从sender goroutine到recipe goroutine。
资源泄漏
如何优雅的关闭channel
关于 channel 的使用,有几点不方便的地方:
- 在不改变channel自身状态的情况下,无法获知一个channel是否关闭
- 关闭一个closed channel 会导致panic。所以,如果关闭channel的一方在不知道channel是否处于关闭状态时就去贸然关闭channel是很危险的事情
- 向一个closed channel 发送数据会导致panic。所以,如果向 channel 发送数据的一方不知道 channel 是否处于关闭状态时就去贸然向 channel 发送数据是很危险的事情。
有一条广泛流传的关闭 channel 的原则:
don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.
- 不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。
-
比较好理解,向 channel 发送元素的就是 sender,因此 sender 可以决定何时不发送数据,并且关闭 channel。但是如果有多个 sender,某个 sender 同样没法确定其他 sender 的情况,这时也不能贸然关闭 channel。
但是上面所说的并不是最本质的,最本质的原则就只有一条:
don’t close (or send values to) closed channels.
channel 应用
Channel 和 goroutine 的结合是 Go 并发编程的大杀器。而 Channel 的实际应用也经常让人眼前一亮,通过与 select,cancel,timer 等结合,它能实现各种各样的功能。接下来,我们就要梳理一下 channel 的应用。
停止信号
前面一节如何优雅关闭 channel 那一节已经讲得很多了,这块就略过了。
channel 用于停止信号的场景还是挺多的,经常是关闭某个 channel 或者向 channel 发送一个元素,使得接收 channel 的那一方获知道此信息,进而做一些其他的操作。
任务定时
与 timer 结合,一般有两种玩法:实现超时控制,实现定期执行某个任务。
有时候,需要执行某项操作,但又不想它耗费太长时间,上一个定时器就可以搞定:
select
{
case <-time.After(100* time.Millisecond):
case<-s.stopc:
return false
}
等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。
定时执行某个任务,也比较简单:
解耦生产方和消费方
服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for{} 无限循环里,从某个 channel 消费工作任务并执行:
5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。
程序输出:
控制并发数
有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。
下面的例子来自《Go 语言高级编程》:
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。
这里, limit<-1 放在 func 内部而不是外部,书籍作者柴大在读者群里的解释是:
如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。
limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。
还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。
未完待续~
原文链接