初始化
调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等
func schedinit() {
sched.maxmcount = 10000 // 设置m的最大值为10000
mcommoninit(_g_.m) //初始化当前m
// 确认P的个数
// 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调整P的个数,这里是新分配procs个P
// 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
procresize方法主要完成以下任务:
- 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
- 遍历p的缓存,将未初始化的p进行初始化
- 对于收缩的情况,将收缩的p进行回收处理
- 分别将空闲的p和有任务的p加入空闲链表和工作链表
下面是procresize()的源码:
//全局数据结构:
allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable
sched schedt //全局调度器(综述文中有介绍)
// 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
func procresize(nprocs int32) *p {
...
old := gomaxprocs
// 扩张allp数组
if nprocs > int32(len(allp)) {
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
// 分配nprocs个*p
nallp := make([]*p, nprocs)
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// 初始化新的p
for i := int32(0); i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
...
// 将pp保存到allp数组里, allp[i] = pp
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
...
}
// 释放无用的p
for i := nprocs; i < old; i++ {
p := allp[i]
// 任务转移
// 本地任务队列转换到全局队列
for p.runqhead != p.runqtail {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
globrunqputhead(gp)
}
// 优先执行的也转移到全局
if p.runnext != 0 {
globrunqputhead(p.runnext.ptr())
p.runnext = 0
}
// 后台标记的g也转移
if gp := p.gcBgMarkWorker.ptr(); gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
p.gcBgMarkWorker.set(nil)
}
// 做一些内存释放等操作
...
}
...
//将p放入队列
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
// 如果是当前的M绑定的P,不放入P空闲链表
// 否则更改P的状态为_Pidle,放入P空闲链表
if _g_.m.p.ptr() == p {
continue
}
p.status = _Pidle
if runqempty(p) {
pidleput(p)// 将空闲p放入全局空闲链表
} else {
// 非空闲的通过绑定m,链起来
p.m.set(mget())
p.link.set(runnablePs)
// 最后一个空闲的不加入空闲列表 直接返回去调度使用
runnablePs = p
}
}
}
新建的无任务p都会被放到空闲链表中:
func pidleput(_p_ *p) {
if !runqempty(_p_) {
throw("pidleput: P has non-empty run queue")
}
_p_.link = sched.pidle //通过p的link形成链表
sched.pidle.set(_p_)
// 将sched.npidle加1
atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}
默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。
完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作
golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是带价很大,会触发STW
lock(&sched.lock)
ret := int(gomaxprocs)
unlock(&sched.lock)
if n <= 0 || n == ret {
return ret
}
// 有stw和重启世界的过程
stopTheWorld("GOMAXPROCS")
// newprocs will be processed by startTheWorld
newprocs = int32(n)
startTheWorld()
return ret
以上便是golang初始化调度器的所有步骤,具体:
- 调用schedinit,初始化maxmcount和gomaxprocs的数量
- sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
- schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
- 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。
g的创建
在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。
func newproc(siz int32, fn *funcval) {
//用fn + PtrSize 获取第一个参数的地址,也就是argp
//这里要了解一下go的堆栈
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
//用siz - 8 获取pc地址 (汇编实现)
pc := getcallerpc()
// 用g0的栈创建G对象
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, pc)
})
}
了解一下funcval:
// funcval 是一个变长结构,第一个成员是函数指针
// 所以上面的 add 是跳过这个 fn
type funcval struct {
fn uintptr
// variable-size, fn-specific data here
}
newproc()获取到参数的地址和callerpc,然后调用newproc1().
流程如下图:
代码如下:
// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
_g_.m.locks++ // disable preemption because it can be holding p in a local var
siz := narg
// 从m中获取p
_p_ := _g_.m.p.ptr()
// 从gfree list获取g
newg := gfget(_p_)
// 如果没获取到g,则新建一个
if newg == nil {
// 分配栈为 2k 大小的G对象
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
// 添加到allg数组,防止gc扫描清除掉
allgadd(newg)
}
// 参数大小+稍微一点空间
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
// 新协程的栈顶计算,将栈顶减去参数占用的空间
sp := newg.stack.hi - totalSize
spArg := sp
// 如果有参数
if narg > 0 {
// copy参数到栈上
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
... //一些gc相关的工作省略
}
// 初始化G的gobuf,保存sp,pc,任务函数等
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
// sched.g保存当前新的G
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 将当前的pc压入栈,保存g的任务函数为pc
gostartcallfn(&newg.sched, fn)
// gopc保存newproc的pc
newg.gopc = callerpc
// 任务函数的地址
newg.startpc = fn.fn
...
// 更改当前g的状态为_Grunnable
casgstatus(newg, _Gdead, _Grunnable)
// 生成唯一的goid
newg.goid = int64(_p_.goidcache)
// 将当前新生成的g,放入队列
runqput(_p_, newg, true)
// 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
}
g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist
//从缓存列表获取一个空闲的g
func gfget(_p_ *p) *g {
retry:
gp := _p_.gfree
if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
//本地空闲队列为空的时候,从全局中获取,需要加锁
lock(&sched.gflock)
//一次转移最多32个空闲到本地p
for _p_.gfreecnt < 32 {
if sched.gfreeStack != nil {
gp = sched.gfreeStack //获取g
sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g
} else if sched.gfreeNoStack != nil {
gp = sched.gfreeNoStack
sched.gfreeNoStack = gp.schedlink.ptr()
} else {
break
}
_p_.gfreecnt++
sched.ngfree--
gp.schedlink.set(_p_.gfree)
_p_.gfree = gp
}
unlock(&sched.gflock)
goto retry
}
// 获取到g
if gp != nil {
// 调整链表头及个数
_p_.gfree = gp.schedlink.ptr()
_p_.gfreecnt--
// 堆栈为空就分配
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
...
}
}
return gp
}
当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。
func gfput(_p_ *p, gp *g) {
// 处理堆栈
stksize := gp.stack.hi - gp.stack.lo
// 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了)
if stksize != _FixedStack {
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
}
// 处理p的复用链表
gp.schedlink.set(_p_.gfree)
_p_.gfree = gp
_p_.gfreecnt++
// 超过64个,放回部分到全局队列
if _p_.gfreecnt >= 64 {
lock(&sched.gflock)
for _p_.gfreecnt >= 32 {
_p_.gfreecnt--
gp = _p_.gfree
_p_.gfree = gp.schedlink.ptr()
if gp.stack.lo == 0 {
gp.schedlink.set(sched.gfreeNoStack)
sched.gfreeNoStack = gp
} else {
gp.schedlink.set(sched.gfreeStack)
sched.gfreeStack = gp
}
sched.ngfree++
}
unlock(&sched.gflock)
}
}
malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。
//一般传入的堆栈大小默认为2k
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
stacksize = round2(_StackSystem + stacksize)// 对齐
systemstack(func() {
newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈
})
newg.stackguard0 = newg.stack.lo + _StackGuard // 设置 stackguard
newg.stackguard1 = ^uintptr(0)
}
return newg
}
创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g
var (
// 存储所有g的数组
allgs []*g
// 保护allgs的互斥锁
allglock mutex
allglen uintptr
)
func allgadd(gp *g) {
lock(&allglock)
allgs = append(allgs, gp)
allglen = uintptr(len(allgs))
unlock(&allglock)
}
当获取到一个可用的g之后:
- 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
- 添加到g到p的本地队列
- p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
- 若存在有空闲的p及未自旋的m,调用wakem()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
// 尝试将G放到P的本地队列
func runqput(_p_ *p, gp *g, next bool) {
if next {
retryNext:
oldnext := _p_.runnext
// 将G赋值给_p_.runnext
// 最新的G优先级最高,最可能先被执行。
// 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点,
// 让别的core执行,这样才能充分利用多核,提高并发能
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
gp = oldnext.ptr()
}
retry:
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 如果本地队列还有剩余的位置,将G插入本地队列的尾部
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 本地队列已满,放入全局队列
if runqputslow(_p_, gp, h, t) {
return
}
goto retry
}
// 如果本地满了以后,一次将本地的一半的G转移到全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
//首先转移一半到全局队列,省略
...
// 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。
lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1))
unlock(&sched.lock)
return true
}
放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务
到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。
这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力
。
系统线程m
在golang中有三种系统线程:
- 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
- 运行sysmon的线程
-
普通用户线程,用来与p绑定,运行g中的任务的线程,
主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。
在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:
// 尝试获取一个M来运行可运行的G
func wakep() {
// 如果有其他的M处于自旋状态,那么就不管了,直接返回
// 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
// startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回
// 获取到P后,在尝试获取M,如果获取不到就新建一个M
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
// 如果P为nil,则尝试获取一个空闲P
if _p_ == nil {
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
return
}
}
// 获取一个空闲的M
mp := mget()
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
// 如果获取不到,则新建一个,新建完成后就立即返回
newm(fn, _p_)
return
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning //标记该M是否在自旋
mp.nextp.set(_p_) // 暂存P
notewakeup(&mp.park) // 唤醒M
}
上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。
下面从新建m开始:
func newm(fn func(), _p_ *p) {
// 根据fn和p和绑定一个m对象
mp := allocm(_p_, fn)
// 设置当前m的下一个p为_p_
mp.nextp.set(_p_)
...
// 真正的分配os thread
newm1(mp)
}
// 分配一个m,且不关联任何一个os thread
func allocm(_p_ *p, fn func()) *m {
_g_ := getg()
_g_.m.locks++ // disable GC because it can be called from sysmon
if _g_.m.p == 0 {
acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。
}
...
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp) //初始化当前m
// 给g0分配一定的堆栈
if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
mp.g0 = malg(-1) //这些系统必须使用系统的栈
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k
}
mp.g0.m = mp
//绑定的p和当前m的p一样,解绑
if _p_ == _g_.m.p.ptr() {
releasep()
}
return mp
}
m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;
func mcommoninit(mp *m) {
_g_ := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if _g_ != _g_.m.g0 {
callers(1, mp.createstack[:])
}
lock(&sched.lock)
if sched.mnext+1 < sched.mnext {
throw("runtime: thread ID overflow")
}
mp.id = sched.mnext
sched.mnext++
// m数量检查
checkmcount()
...
// signal g创建初始化
mpreinit(mp)
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
}
//加入全局m链表
mp.alllink = allm //链表
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
}
func newm1(mp *m) {
// 对cgo的处理
...
execLock.rlock() // Prevent process clone.
// 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针
// 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
execLock.runlock()
}
每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:
//linux
// 分配一个系统线程,且完成 g0 和 g0上的栈分配
// 传入 mstart 函数,让线程执行 mstart
func newosproc(mp *m, stk unsafe.Pointer) {
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
if ret < 0 {
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
if ret == -_EAGAIN {
println("runtime: may need to increase max user processes (ulimit -u)")
}
throw("newosproc")
}
}
//windows
func newosproc(mp *m, stk unsafe.Pointer) {
const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000
// stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go.
const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit)
thandle := stdcall6(_CreateThread, 0, stackSize,
funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)),
_STACK_SIZE_PARAM_IS_A_RESERVATION, 0)
if thandle == 0 {
if atomic.Load(&exiting) != 0 {
// CreateThread may fail if called
// concurrently with ExitProcess. If this
// happens, just freeze this thread and let
// the process exit. See issue #18253.
lock(&deadlock)
lock(&deadlock)
}
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", getlasterror(), ")\n")
throw("runtime.newosproc")
}
// Close thandle to avoid leaking the thread object if it exits.
stdcall1(_CloseHandle, thandle)
}
创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。
下面将描述获取一个空闲的m
在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建
type schedt struct {
// idle状态的m
midle muintptr // idle m's waiting for work
// idle状态的m个数
nmidle int32 // number of idle m's waiting for work
// m允许的最大个数
maxmcount int32 // maximum number of m's allowed (or die)
}
func mget() *m {
//从idle 的m链表中搞一个
mp := sched.midle.ptr()
if mp != nil {
sched.midle = mp.schedlink
sched.nmidle--
}
return mp
}
被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。
// 停止M,使其休眠,但不会被系统回收
// 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行
// 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。
func stopm() {
_g_ := getg()
retry:
lock(&sched.lock)
mput(_g_.m)
unlock(&sched.lock)
// 在lock_futex.go 中
// 休眠,等待被唤醒
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
...
// 绑定p
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 把mp添加到midle列表
func mput(mp *m) {
mp.schedlink = sched.midle
sched.midle.set(mp)
sched.nmidle++
checkdead()
}
到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:
- 系统线程调度上下文切换是有消耗的
- m本身是有占资源的(g0分配了8k的堆栈)