golang调度学习-初始化,创建goroutine,系统线程m

  • Post author:
  • Post category:golang




初始化

调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等

func schedinit() {
    sched.maxmcount = 10000  // 设置m的最大值为10000
    mcommoninit(_g_.m) //初始化当前m
    // 确认P的个数
    // 默认等于cpu个数,可以通过GOMAXPROCS环境变量更改
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    // 调整P的个数,这里是新分配procs个P
    // 这个函数很重要,所有的P都是从这里分配的,以后也不用担心没有P了
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
    ...
}

procresize方法主要完成以下任务:

  1. 比较目标个数和原始p的个数,进行全局缓存的扩容或收缩
  2. 遍历p的缓存,将未初始化的p进行初始化
  3. 对于收缩的情况,将收缩的p进行回收处理
  4. 分别将空闲的p和有任务的p加入空闲链表和工作链表

下面是procresize()的源码:

//全局数据结构:
allp []*p // len(allp) == gomaxprocs; may change at safe points, otherwise immutable
sched schedt //全局调度器(综述文中有介绍)

// 所有的P都在这个函数分配,不管是最开始的初始化分配,还是后期调整
func procresize(nprocs int32) *p {
    ...
    old := gomaxprocs
	// 扩张allp数组
	if nprocs > int32(len(allp)) {
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			// 分配nprocs个*p
			nallp := make([]*p, nprocs)
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}
	// 初始化新的p
	for i := int32(0); i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		    ...
			// 将pp保存到allp数组里, allp[i] = pp
			atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
		}
        ...
	}
	// 释放无用的p
	for i := nprocs; i < old; i++ {
		p := allp[i]
		// 任务转移
		// 本地任务队列转换到全局队列
		for p.runqhead != p.runqtail {
			p.runqtail--
			gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
			globrunqputhead(gp)
		}
		// 优先执行的也转移到全局
		if p.runnext != 0 {
			globrunqputhead(p.runnext.ptr())
			p.runnext = 0
		}
		// 后台标记的g也转移
		if gp := p.gcBgMarkWorker.ptr(); gp != nil {
			casgstatus(gp, _Gwaiting, _Grunnable)
			globrunqput(gp)
			p.gcBgMarkWorker.set(nil)
		}
		// 做一些内存释放等操作
       ...
	}
    ...
    //将p放入队列
	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		// 如果是当前的M绑定的P,不放入P空闲链表
		// 否则更改P的状态为_Pidle,放入P空闲链表
		if _g_.m.p.ptr() == p {
			continue
		}
		p.status = _Pidle
		if runqempty(p) { 
			pidleput(p)// 将空闲p放入全局空闲链表
		} else {
           // 非空闲的通过绑定m,链起来     
			p.m.set(mget())
			p.link.set(runnablePs)
            // 最后一个空闲的不加入空闲列表 直接返回去调度使用
			runnablePs = p
		}
	}
}

新建的无任务p都会被放到空闲链表中:

func pidleput(_p_ *p) {
    if !runqempty(_p_) {
        throw("pidleput: P has non-empty run queue")
    }
    _p_.link = sched.pidle //通过p的link形成链表
    sched.pidle.set(_p_)
    // 将sched.npidle加1
    atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}

默认只有schedinit和startTheWorld会调用procresize()schedinit初始化p,startTheWorld会激活所有有任务的p。

完成调度器初始化后,系统会引导生成 main goroutine,之前是在全局的g0上执行初始化工作

golang支持在运行间修改p数量:runtime.GOMAXPROCS(),但是带价很大,会触发STW

 lock(&sched.lock)
    ret := int(gomaxprocs)
    unlock(&sched.lock)
    if n <= 0 || n == ret {
        return ret
    }
    // 有stw和重启世界的过程
    stopTheWorld("GOMAXPROCS")
    // newprocs will be processed by startTheWorld
    newprocs = int32(n)
    startTheWorld()
    return ret

以上便是golang初始化调度器的所有步骤,具体:

  1. 调用schedinit,初始化maxmcount和gomaxprocs的数量
  2. sechdinit中调用procresize(),初始化所有的p,并放入空闲链表中
  3. schedinit结束后,引导创建main goroutine,执行main(之前是在全局的g0中执行),汇编执行引导,文中并没有描述
  4. 运行时可以调用runtime.GOMAXPROCS()函数修改p的数量,会触发STW,有带价。如果真的有需求,可以考虑启动前修改系统环境变量实现。



g的创建

在编写程序中,使用 go func() {}来创建一个goroutine(g),这条语句会被编译器翻译成函数 newproc()。

func newproc(siz int32, fn *funcval) {
    //用fn + PtrSize 获取第一个参数的地址,也就是argp
    //这里要了解一下go的堆栈
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    //用siz - 8 获取pc地址 (汇编实现)
    pc := getcallerpc()
    // 用g0的栈创建G对象
    systemstack(func() {
        newproc1(fn, (*uint8)(argp), siz, pc)
    })
}

了解一下funcval:

// funcval 是一个变长结构,第一个成员是函数指针
// 所以上面的 add 是跳过这个 fn
type funcval struct {
    fn uintptr
    // variable-size, fn-specific data here
}

newproc()获取到参数的地址和callerpc,然后调用newproc1().

流程如下图:

在这里插入图片描述

代码如下:

// 根据函数参数和函数地址,创建一个新的G,然后将这个G加入队列等待运行
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
    _g_ := getg()
    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    // 从m中获取p
    _p_ := _g_.m.p.ptr()
    // 从gfree list获取g
    newg := gfget(_p_)
    // 如果没获取到g,则新建一个
    if newg == nil {
        // 分配栈为 2k 大小的G对象
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead) //将g的状态改为_Gdead
        // 添加到allg数组,防止gc扫描清除掉
        allgadd(newg) 
    }
    // 参数大小+稍微一点空间
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize 
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    // 新协程的栈顶计算,将栈顶减去参数占用的空间
    sp := newg.stack.hi - totalSize
    spArg := sp
    // 如果有参数
    if narg > 0 {
        // copy参数到栈上
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        ... //一些gc相关的工作省略
    }
    // 初始化G的gobuf,保存sp,pc,任务函数等
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 保存goexit的地址到sched.pc,后面会调节 goexit 作为任务函数返回后执行的地址,所以goroutine结束后会调用goexit
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    // sched.g保存当前新的G
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 将当前的pc压入栈,保存g的任务函数为pc
    gostartcallfn(&newg.sched, fn)
    // gopc保存newproc的pc
    newg.gopc = callerpc
    // 任务函数的地址
    newg.startpc = fn.fn
    ...
    // 更改当前g的状态为_Grunnable
    casgstatus(newg, _Gdead, _Grunnable)
    // 生成唯一的goid
    newg.goid = int64(_p_.goidcache)
    // 将当前新生成的g,放入队列
    runqput(_p_, newg, true)
    // 如果有空闲的p 且 m没有处于自旋状态 且 main goroutine已经启动,那么唤醒某个m来执行任务
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
}

g 默认会复用,会从p的free中获取,当p free为空,从全局的schedt中的gfreeStack或者gfreeNoStack中拉取到本地freelist

//从缓存列表获取一个空闲的g
func gfget(_p_ *p) *g {
retry:
    gp := _p_.gfree
    if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
        //本地空闲队列为空的时候,从全局中获取,需要加锁
        lock(&sched.gflock)
        //一次转移最多32个空闲到本地p
        for _p_.gfreecnt < 32 {
            if sched.gfreeStack != nil {
                gp = sched.gfreeStack                    //获取g
                sched.gfreeStack = gp.schedlink.ptr() //链表头指向下一个g
            } else if sched.gfreeNoStack != nil {
                gp = sched.gfreeNoStack
                sched.gfreeNoStack = gp.schedlink.ptr()
            } else {
                break
            }
            _p_.gfreecnt++
            sched.ngfree--
            gp.schedlink.set(_p_.gfree)
            _p_.gfree = gp
        }
        unlock(&sched.gflock)
        goto retry
    }
    // 获取到g
    if gp != nil {
        // 调整链表头及个数
        _p_.gfree = gp.schedlink.ptr()
        _p_.gfreecnt--
        // 堆栈为空就分配
        if gp.stack.lo == 0 {
            // Stack was deallocated in gfput. Allocate a new one.
            systemstack(func() {
                gp.stack = stackalloc(_FixedStack)
            })
            gp.stackguard0 = gp.stack.lo + _StackGuard
        } else {
          ...
        }
    }
    return gp
}

当一次调度执行完g后,调度器会将g放回p或者全局队列,当空闲任务个数超过64个的时候,会调整部分到全局任务队列,直到p本地空闲队列为32个的时候停止。

func gfput(_p_ *p, gp *g) {
    // 处理堆栈
    stksize := gp.stack.hi - gp.stack.lo
    // 不是默认堆栈,直接释放(扩张后的堆栈可能会很大,留着占内存,下次重新分配就好了)
    if stksize != _FixedStack {
        stackfree(gp.stack)
        gp.stack.lo = 0
        gp.stack.hi = 0
        gp.stackguard0 = 0
    }
    // 处理p的复用链表
    gp.schedlink.set(_p_.gfree)
    _p_.gfree = gp
    _p_.gfreecnt++
    // 超过64个,放回部分到全局队列
    if _p_.gfreecnt >= 64 {
        lock(&sched.gflock)
        for _p_.gfreecnt >= 32 {
            _p_.gfreecnt--
            gp = _p_.gfree
            _p_.gfree = gp.schedlink.ptr()
            if gp.stack.lo == 0 {
                gp.schedlink.set(sched.gfreeNoStack)
                sched.gfreeNoStack = gp
            } else {
                gp.schedlink.set(sched.gfreeStack)
                sched.gfreeStack = gp
            }
            sched.ngfree++
        }
        unlock(&sched.gflock)
    }
}

malg()函数创建一个新的g,包括为该g申请栈空间(支持程序分配栈的系统)。系统中的每个g都是由该函数创建而来的。

//一般传入的堆栈大小默认为2k
func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        stacksize = round2(_StackSystem + stacksize)// 对齐
        systemstack(func() {
            newg.stack = stackalloc(uint32(stacksize))// 调用 stackalloc 分配栈
        })
        newg.stackguard0 = newg.stack.lo + _StackGuard        // 设置 stackguard
        newg.stackguard1 = ^uintptr(0)
    }
    return newg
}

创建成功会被放入到 allg的全局队列中,gc回收遍历扫描会使用,也防止gc回收分配好的g

var (
    // 存储所有g的数组
    allgs []*g
    // 保护allgs的互斥锁
    allglock mutex  
    allglen uintptr
)
func allgadd(gp *g) {
    lock(&allglock)
    allgs = append(allgs, gp)
    allglen = uintptr(len(allgs))
    unlock(&allglock)
}

当获取到一个可用的g之后:

  1. 初始化g的gobuf信息(上下文信息,包括sp,pc以及函数g执行完之后的返回指令pc(goexit函数))
  2. 添加到g到p的本地队列
  3. p的本地队列满了,便添加到全局队列,顺便转移部分本地队列的数据到全局队列,供其他的p获取。
  4. 若存在有空闲的p及未自旋的m,调用wakem()方法,这里会获取一个空闲的m或新建一个m,去和空闲的p绑点,调度。后文会有对该方法的解释
// 尝试将G放到P的本地队列
func runqput(_p_ *p, gp *g, next bool) {
    if next {
    retryNext:
        oldnext := _p_.runnext
        // 将G赋值给_p_.runnext
        // 最新的G优先级最高,最可能先被执行。
        // 剩下的G如果go运行时调度器发现有空闲的core,就会把任务偷走点,
        // 让别的core执行,这样才能充分利用多核,提高并发能
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        gp = oldnext.ptr()
    }
retry:
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 如果本地队列还有剩余的位置,将G插入本地队列的尾部
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 本地队列已满,放入全局队列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    goto retry
}

// 如果本地满了以后,一次将本地的一半的G转移到全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    //首先转移一半到全局队列,省略
    ...
    // 将拿到的G,添加到全局队列末尾, 全局数据处理是需要加锁的,所以slow。
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

放入队列时,p队列满了会分一半到全局队列,其他的p可以获取全局队列中的g执行。newproc1最后会唤醒其他m p去执行任务

到这里go fun()流程就完成了。 g不会被删除,但是会清理过大的栈空间,防止内存爆炸。gc过程中也会调用shrinkstack()将栈空间回收。

这就是golang和可以创建大量g来支持并发的原因之一,g是复用的并且初始栈大小只有2k,超过2k的栈在g空闲的时候是会被回收的,这也减轻了系统内存的压力



系统线程m

在golang中有三种系统线程:

  • 主线程:golang程序启动加载的时候就运行在主线程上,代码中由一个全局的m0表示
  • 运行sysmon的线程
  • 普通用户线程,用来与p绑定,运行g中的任务的线程,

    主线程和运行sysmon都是单实例,单独一个线程。而用户线程会有很多事例,他会根据调度器的需求新建,休眠和唤醒。

在newproc1中我们发现创建g成功后,会尝试wakep唤醒一个用户线程m执行任务,这里详细描述下这个方法:

// 尝试获取一个M来运行可运行的G
func wakep() {
    // 如果有其他的M处于自旋状态,那么就不管了,直接返回
    // 因为自旋的M回拼命找G来运行的,就不新找一个M(劳动者)来运行了。
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

// startm是启动一个M,先尝试获取一个空闲P,如果获取不到则返回
// 获取到P后,在尝试获取M,如果获取不到就新建一个M
func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    // 如果P为nil,则尝试获取一个空闲P
    if _p_ == nil {
        _p_ = pidleget()
        if _p_ == nil {
            unlock(&sched.lock)
            return
        }
    }
    // 获取一个空闲的M
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        // 如果获取不到,则新建一个,新建完成后就立即返回
        newm(fn, _p_)
        return
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning //标记该M是否在自旋
    mp.nextp.set(_p_) // 暂存P
    notewakeup(&mp.park) // 唤醒M
}

上述代码可以发现m回去调用mget()方法,获取不成功后才会选择创建,这里表明m也是支持复用的。获取不到任务的m也会被加入到空闲的m链表中,等待唤醒。

下面从新建m开始:

func newm(fn func(), _p_ *p) {
    // 根据fn和p和绑定一个m对象
    mp := allocm(_p_, fn)
    // 设置当前m的下一个p为_p_
    mp.nextp.set(_p_)
    ...
    // 真正的分配os thread
    newm1(mp)
}

// 分配一个m,且不关联任何一个os thread
func allocm(_p_ *p, fn func()) *m {
    _g_ := getg()
    _g_.m.locks++ // disable GC because it can be called from sysmon
    if _g_.m.p == 0 {
        acquirep(_p_) // 如果没有绑定p的话,申请一个p,只有p有cache,可以供m来申请内存。
    }
    ...
    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp)   //初始化当前m
    // 给g0分配一定的堆栈
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
        mp.g0 = malg(-1)   //这些系统必须使用系统的栈
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier) //go的栈是大小是8k
    }
    mp.g0.m = mp
    //绑定的p和当前m的p一样,解绑
    if _p_ == _g_.m.p.ptr() {
        releasep()
    }
    return mp
}

m初始化:检查数量,超过10000个异常停机;接受信号的g创建初始化;

func mcommoninit(mp *m) {
    _g_ := getg()
    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        callers(1, mp.createstack[:])
    }
    lock(&sched.lock)
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext
    sched.mnext++
    // m数量检查
    checkmcount()
    ...
    // signal g创建初始化
    mpreinit(mp)
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }
    //加入全局m链表
    mp.alllink = allm //链表
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)
}
func newm1(mp *m) {
    // 对cgo的处理
    ...
    execLock.rlock() // Prevent process clone.
    // 创建一个系统线程,并且传入该 mp 绑定的 g0 的栈顶指针
    // 让系统线程执行 mstart 函数,后面的逻辑都在 mstart 函数中
    newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
    execLock.runlock()
}

每个操作系统分配系统线程的流程是不一样的,下面代码展示了在linux和windows系统下该函数的实现,其他的环境暂时不做讨论:

//linux
// 分配一个系统线程,且完成 g0 和 g0上的栈分配
// 传入 mstart 函数,让线程执行 mstart
func newosproc(mp *m, stk unsafe.Pointer) { 
    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)
    if ret < 0 {
        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
        if ret == -_EAGAIN {
            println("runtime: may need to increase max user processes (ulimit -u)")
        }
        throw("newosproc")
    }
}

//windows
func newosproc(mp *m, stk unsafe.Pointer) {
    const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000
    // stackSize must match SizeOfStackReserve in cmd/link/internal/ld/pe.go.
    const stackSize = 0x00200000*_64bit + 0x00100000*(1-_64bit)
    thandle := stdcall6(_CreateThread, 0, stackSize,
        funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)),
        _STACK_SIZE_PARAM_IS_A_RESERVATION, 0)
    if thandle == 0 {
        if atomic.Load(&exiting) != 0 {
            // CreateThread may fail if called
            // concurrently with ExitProcess. If this
            // happens, just freeze this thread and let
            // the process exit. See issue #18253.
            lock(&deadlock)
            lock(&deadlock)
        }
        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", getlasterror(), ")\n")
        throw("runtime.newosproc")
    }
    // Close thandle to avoid leaking the thread object if it exits.
    stdcall1(_CloseHandle, thandle)
}

创建m的时候,会给m的g0分配分配栈空间。g0是该m私有的,golang中系统命令都是在g0上执行的,函数systemstack(func())(汇编实现)会将方法转到g0栈上执行,然后转回当前的g。管理命令操作执行都在g0栈上执行,隔离了业务内容和指令的执行,避免做g共享内存。

下面将描述获取一个空闲的m

在startm中,m是优先去空闲队列中获取,未获取到空闲队列才选择创建

type schedt struct {
    // idle状态的m
    midle muintptr // idle m's waiting for work
    // idle状态的m个数
    nmidle int32 // number of idle m's waiting for work
    // m允许的最大个数
    maxmcount int32 // maximum number of m's allowed (or die)

}
func mget() *m {
   //从idle 的m链表中搞一个
    mp := sched.midle.ptr()
    if mp != nil {
        sched.midle = mp.schedlink
        sched.nmidle--
    }
    return mp
}

被唤醒的进入工作状态的m会陷入调度循环,竭尽全力获取g执行,当找不到可执行的任务,或者任务用时过长,系统调用阻塞等原因被剥夺p,m会再次进入休眠状态。

// 停止M,使其休眠,但不会被系统回收
// 调用notesleep使M进入休眠,唤醒后就会从休眠出直接开始执行
// 线程可以处于三种状态: 等待中(Waiting)、待执行(Runnable)或执行中(Executing)。
func stopm() {
    _g_ := getg()
retry:
    lock(&sched.lock)
    mput(_g_.m)
    unlock(&sched.lock)
    // 在lock_futex.go 中
    // 休眠,等待被唤醒
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
   ...
    // 绑定p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

// 把mp添加到midle列表
func mput(mp *m) {
    mp.schedlink = sched.midle
    sched.midle.set(mp)
    sched.nmidle++
    checkdead()
}

到这里可以看到,m也是不会主动删除释放的,支持复用。当大量的m被创建的时候,对性能是有影响的:

  • 系统线程调度上下文切换是有消耗的
  • m本身是有占资源的(g0分配了8k的堆栈)



版权声明:本文为diaosssss原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。