golang-goroutine(协程)和channel(管道)

  • Post author:
  • Post category:golang


在这里插入图片描述



传统进程和线程概念介绍:



在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



并发和并行概念介绍:



在这里插入图片描述

在这里插入图片描述




golang协程(goroutine)



golang协程的特点

  • 有独立的栈空间(栈可理解为数值类型,由堆中copy或引用到个体栈空间)
  • 共享程序堆空间(堆可理解为引用数据类型)
  • 调度由用户控制(线程启动和停止都可由用户控制,java则不行 )
  • 协程是轻量级的线程(理论上轻松可启上万条线程)

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述
package main

import (
	"fmt"
	"strconv"
	"time"

)
func goroutine(){
	for i:=1;i<=10;i++{
		fmt.Println("hello world" + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

func main(){
	// 开启一个协程
	go goroutine()

	for i:=1;i<=10;i++{
		fmt.Println("hello goroutine" + strconv.Itoa(i))
		time.Sleep(time.Second)
	}
}

PowerShell输出:

PS E:\...\src\gojson\jsondemo-10-24\goroutine> go run .\main.go     
hello world1
hello goroutine1
hello goroutine2
hello world2
hello world3
hello goroutine3
hello goroutine4
hello world4
hello world5
hello goroutine5
hello goroutine6
hello world6
hello world7
hello goroutine7
hello goroutine8
hello world8
hello world9
hello goroutine9
hello goroutine10
hello world10


go协程执行过程:


在这里插入图片描述


go协程总结:


在这里插入图片描述

  • go协程一般是以函数为单位的
  • 协程依托于主线程执行,主线程执行完毕或停止退出则协程也直接退出,即使没有执行完毕,若协程先于主线程执行完毕则主线程继续执行完毕
  • go协程中,主线程可以理解为类似进程,协程类似于线程



goroutine调度模型MPG

  • M:主线程,直接运行在操作系统上,为物理线程
  • P:程序运行需要的资源或者说依赖
  • G:协程是一个逻辑程序依赖于M主线程 运行,有自己的栈独立运行
  • M可以有多个,多个M运行在一个cpu上叫做并发,多个M运行在多个cpu叫做并行
  • 如果一个协程发生了阻塞程序会另外开启一个线程把排队的协程引用到这个新开启的M中去,等到M0中的G不再堵塞则再进行M0的调度执行,保证程序得以继续并发执行

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述



golang中设置允许cpu数目



在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

package main

import (
	"fmt"
	"runtime"
)

func main(){
	// runtime.NumCPU() 查看本机有几颗逻辑cpu
	cpunum := runtime.NumCPU()
	fmt.Println(cpunum)

	// runtime.GOMAXPROCS() 可以设置可同时执行最大cpu数目
	runtime.GOMAXPROCS(cpunum-1)
	
	// runtime.Version() 返回go版本字符串,1.8以后默认多核允许不需要设置
	govarson := runtime.Version()
	fmt.Println(govarson)
}
PS E:\...\src\gojson\jsondemo-10-24\cupdemo> go run .\main.go                                     
2
go1.9.2

在这里插入图片描述



channel管道

在这里插入图片描述


多个协程资源竞争问题:

  • 下边这个程序我们通过一个 test() 函数,计算传入参数的阶乘并写入map中,主函数里开启200个协程来进行相应阶乘运算此时由于多个协程同时往一个map中写入数据就会出错,资源占用冲突
package main

import (
	"fmt"
	"time"
)

var mymap = make(map[int]int,10)

func test(n int){

	res := 1
	for i:=1;i<=n;i++{
		res *= i
	}
	mymap[n] = res
}

func main(){
	// 通过循环开启200个协程
	for i:=0;i<200;i++{
		go test(i)
	}

	// 休眠十秒钟
	time.Sleep(time.Second * 10)
	for k,v := range mymap{
		fmt.Println(k,v)
	}
}

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • Mutex是一个结构体,有各种绑定这个结构体的方法,如 lock() ,将传入的参数锁住,Unlock() 方法可以解锁,锁和线程无关可以由任何线程加锁和解锁

在这里插入图片描述

package main

import (
	"fmt"
	"time"
	// "sync"
)

var (
	// int类型默认为int64位大小为 2的63次方减一等于9223372036854776000
	// 也就是说只能存放这么大的数字超出范围置为0
	mymap = make(map[int]int)
	// 实例化sync.Nutex这个struct,以便使用绑定这个struct的方法
	// 这里也是一个小技巧,实例化结构体
	lock sync.Mutex
)

func test(n int){

	res := 1
	for i:=1;i<=n;i++{
		res *= i
	}
	// 加锁
	lock.Lock()
	mymap[n] = res
	// 解锁
	lock.Unlock()
}

func main(){
	for i:=0;i<20;i++{
		go test(i)
	}
	
	// 休眠5秒钟
	time.Sleep(time.Second * 5)

	lock.Lock()
	for k,v := range mymap{
		fmt.Println(k,v)
	}
	lock.Unlock()
}
PS E:\...\src\gojson\goroutine-10-25\main> go run .\main.go                                        
5 120
6 720
0 1
3 6
4 24
7 5040
1 1
8 40320
2 2
9 362880

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述



以下四条非常重要 !

  • 栈是先入后出,channel数据是先入先出
  • channel本质就是一个数据队列

在这里插入图片描述


管道的基本使用:


在这里插入图片描述


案例演示:

package main

import "fmt"

func main(){
	// 声明一个int type的channel
	var intchannel chan int
	// 注意这里channel容量必须设定不会自动扩容
	intchannel = make(chan int,5)

	// 给管道写数据时不能超出其容量
	intchannel <- 10
	num := 20
	intchannel <- num
	fmt.Printf("intchannel长度为: %v 容量为:%v\n",len(intchannel),cap(intchannel))

	// 从channel中取出一条数据
	var num2 int
	num2 = <- intchannel
	fmt.Printf("num2=%v\n",num2)
	fmt.Printf("intchannel长度为: %v 容量为:%v\n",len(intchannel),cap(intchannel))

	num3 := <- intchannel
	fmt.Printf("num3=%v\n",num3)
	fmt.Printf("intchannel长度为: %v 容量为:%v\n",len(intchannel),cap(intchannel))

	// 此时intchannel中数据已被取完再取将会报错deadlock死锁
	num4 := <- intchannel
	fmt.Println(num4)
}


shell输出:

PS E:\...\src\gojson\goroutine-10-25\main\channel> go run .\main.go                                intchannel长度为: 2 容量为:5
num2=10
intchannel长度为: 1 容量为:5
num3=20
intchannel长度为: 0 容量为:5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        E:/software/go-path/goproject/src/gojson/goroutine-10-25/main/channel/main.go:28 +0x529
exit status 2


案例二:


创建map类型的channel并进行数据的存取

package main

import "fmt"

func main(){
	var mapchannel chan map[string]string
	mapchannel = make(chan map[string]string,10)
	m1 := make(map[string]string,5)
	m1["name"] = "jack"
	m1["age"] = "tom"

	m2 := make(map[string]string,10)
	m2["name"] = "scpint"
	m2["age"] = "pg"
	
	// 把m1,m2存入mapchannel
	mapchannel <- m1
	mapchannel <- m2
	
	// 输出mapchannel的长度和容量
	fmt.Println(len(mapchannel),cap(mapchannel))
	
	// 取出数据存入p1,p2
	p1:=  <- mapchannel
	p2:=  <- mapchannel
	fmt.Println(p1,p2)
}
2 10
map[name:jack age:tom] map[name:scpint age:pg]


案例三:


struct类型channel存取

package main

import "fmt"

type Person struct{
	name string
	age int
}

func main(){
	var structchannel chan Person
	// make一下这个channel预置容量为10
	structchannel = make(chan Person,10)
	m1 := Person{"tom",20}
	m2 := Person{"jack",30}

	structchannel <- m1
	structchannel <- m2
	fmt.Printf("channel长度为:%v 容量为: %v\n",len(structchannel),cap(structchannel))
	fmt.Println(m1,m2)
}
channel长度为:2 容量为: 10{tom 20} {jack 30}


案例四:

  • interface{} (空接口) 类型channel存取,空接口默认任何类型都实现了接口,因此可以接收任何类型
  • 如果channel声明的是一个interface{}空接口,存入的是一个struct类型,把这个类型数据取出后想要之间访问这个struct中元素是不行的,因为在编译阶段编译器认为这个数据类型是一个interface{}空接口,如果想要访问其中的元素可以通过类型断言将其转为对应的struct类型,具体代码如下:
// 演示interface{} 类型channel的存取
package main

import "fmt"

type Person struct{
	name string
	age int
}

func main(){
	var interfacechannel chan interface{}
	interfacechannel = make(chan interface{},2)
	m1 := Person{"tom",20}

	interfacechannel <- m1

	newchannel := <- interfacechannel

	fmt.Printf("channel的长度为: %v 容量为: %v\n",len(interfacechannel),cap(interfacechannel))

	/* 报错:interfacechannel.go:22:28:newchannel.name未定义
	(类型interface {}是没有方法的接口)*/

	// newchannel运行时是main.Person{} 类型,编译时候是interface{}空接口

	/* 此时可以使用类型断言判断newchannel是否是来源于Person类型,如果是
	 就转为Person类型并赋值给typestruct 此时就可以取出结构体字段了*/
	typestruct := newchannel.(Person)
	fmt.Println(typestruct.name)
}
channel的长度为: 0 容量为: 2
tom

其他数据类型用法相似


对上边案例总结:


在这里插入图片描述

  • channel在make初始化时容量必须设定,不会自动扩容,不设定会报死锁
  • 给管道写数据时不能超出其容量
  • channel数据取完再取会报死锁


细节描述:

  • 通过

    <- stringchannel

    不指定接收者即为丢弃一个元素
	// 此时容量已满再写会报死锁
	// 通过下边这种方式丢弃channel一个元素(不指定接收方即为丢弃)
	<- stringchannel


channel的遍历和关闭:


在这里插入图片描述

在这里插入图片描述


channel关闭案例演示:

package main

import "fmt"

func main(){
	var intchannel chan int
	intchannel = make(chan int,3)
	intchannel <- 10
	intchannel <- 20
	// 关闭这个channel
	close(intchannel)
	// 再写入数据将会报错:panic: send on closed channel
	// intchannel <- 30
	// 此时无法写入数据但可以读取数据
	tom := <- intchannel
	fmt.Println(tom)
}
10


channel遍历演示:

  • channle遍历就是循环取出所有channel数据
  • 主要使用for-range进行遍历,因为for-in进行遍历主要是通过数组长度通过下标遍历,由于channel长度会随着元素取出动态变化所以不适用for-in
  • channel遍历完成后如果不关闭管道继续读取则会报错,所以在读取完成之前需关闭管道
  • 管道取数据遵循先入先出按序列取出原则,没有下标概念,所以for-range遍历不需要添加下标
package main

import "fmt"

func main(){
	var intchannel chan int
	intchannel = make(chan int,100)

	// 循环向channel写入100条数据
	for i:=0;i<100;i++{
		intchannel <- i*2
	}

	// 遍历之前关闭channel则可以正常遍历
	close(intchannel)
	// 管道取数据遵循先入先出原则,没有下标概念,所以for-range遍历不需要添加下标
	for v := range intchannel{
		fmt.Printf(v)
	}
}


gioroutine和channel协同使用:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 读取完成会将通道置为false,用户调度关闭管道也会将其置为false
  • channel解决主线程等待时间原理就是:添加一个channel再主线程最后for{}循环读取知道协程执行完毕后将这个主线程读取的channel管道关闭,然后退出主线程的循环,感觉挺直接的
  • 注意channel是引用数据类型所以操作的是同一个channel
  • 通过协程实现了一边写一边读(注:读写数值过短可能看不到交叉写读效果)
  • 管道读取完毕后如果不进行关闭继续读取则会报错
// 开启一个协程写入50个int类型数值
// 开启一个协程读取
package main

import "fmt"

// 定义一个函数传入管道进行写操作
func writeDate(write chan int){
	for i:=0;i<50;i++{
		write <- i
		fmt.Println("写入: ",i)
	}
	// 写入完成关闭channel读取则不会报错
	close(write)
	fmt.Println("写入完成!------------- ")
}

// 定义一个函数传入管道进行读操作
func readDate(read chan int,bo chan bool){
	// 最后发送的值被接收后停止该通道
	for{
		x, ok := <- read
		if !ok {
			break
		}
		fmt.Println("读取到: ",x)
	}
	fmt.Println("读取完成!------------- ")
	/* 读取完毕后关闭boolchannel管道,则读取boolchannel的ok会被置为
	false 则退出主线程 */
	close(bo)
}

func main(){
	// 定义两个channel 
	// 第一个用于存取数据便于goroutine读写
	// 第二个用于堵塞主线程当协程写读完毕后将此channel关闭则退出
	wrchannel := make(chan int,50)
	boolchannel := make(chan bool,1)

	go writeDate(wrchannel)
	go readDate(wrchannel,boolchannel)

	// 协程写读完毕关闭boolchannel协程,则ok置为false退出循环
	for{
		_,ok := <- boolchannel
		if !ok{
			break
		}
	}
}

在这里插入图片描述


课后练习:

  • 开一个协程存入1-200并存入一个channel
  • 遍历计算这个channel中每一个元素的阶乘,并存入第二个channel
  • 遍历输出第二个channel
package main

import "fmt"

func arrlist(intchannel chan int){
	for i:=1;i<=200;i++{
		intchannel <- i
		fmt.Println("正在写入intchannel...")
	}
	close(intchannel)
}

func readlist(intchannel chan int,intchannel2 chan int,boolchannel chan bool){
	var b int
	
	for v := range intchannel{
		for i:=1;i<=v;i++{
			b += i
		}
		fmt.Println("正在写入intchannel2...")
		intchannel2 <- b
	}

	close(intchannel2)
	for v := range intchannel2{
		fmt.Println(v)
	}

	boolchannel <- true
	close(boolchannel)
}

func main(){
	var intchannel chan int
	var intchannel2 chan int
	var boolchannel chan bool

	intchannel = make(chan int,200)
	intchannel2 = make(chan int,200)
	boolchannel = make(chan bool,1)

	go arrlist(intchannel)
	go readlist(intchannel,intchannel2,boolchannel)

	for{
		_,ok := <- boolchannel
		if !ok{
			break
		}
	}	
}


管道阻塞机制

  • 如果编译器发现一个管道只有写没有读的包则该管道会阻塞
  • 写管道和读管道频率不一致无所谓
  • 编译的时候底层会有检测机制查看管道是否有读写操作

    在这里插入图片描述


示例:

package main

import (
	"fmt"
	"time"
)

func Writerchannel(intchannel chan int){
	// intchannel能够存放10个元素,这里循环写入50个元素
	// 添加一个读管道的操作,可以写入过量元素 
	for i:=0;i<50;i++{
		intchannel <- i
		fmt.Println("写入数据: ",i)
	}
	close(intchannel)
}

func Readchannel(intchannel chan int,boolchannel chan bool){
	for{
		v,ok := <- intchannel
		if !ok{
			break
		}
		time.Sleep(time.Second)
		fmt.Println("读取到数据: ",v)
	}
	close(boolchannel)
}

func main(){
	var intchannel chan int
	var boolchannel chan bool

	boolchannel = make(chan bool,1)
	intchannel = make(chan int,10)

	go Writerchannel(intchannel)
	go Readchannel(intchannel,boolchannel)

	for{
		_,ok := <- boolchannel
		if !ok{
			break
		}
	}
}


阻塞机制解析:

  • 如果一个channel定义了10个元素正常情况下只能写入10个元素
  • 如果添加了一个对该channel读的操作则可以写入过量元素(超过10个)
  • go运行机制为只要检测到读操作就能继续写入,只是需要等待读操作取出这个channel中的元素有空位了则可以继续写入,一直读就能一直写


协程求素数:


在这里插入图片描述

在这里插入图片描述

package main

import (
	"fmt"
	"time"
)

func writer(writechannel chan int){
	for i:=2;i<=10000;i++{
		writechannel <- i
	}
	close(writechannel)
}

func receivefunc(writechannel chan int,receive chan int,boolchannel chan bool){
	for v := range writechannel{
		bok := true
		for i:=2;i<v;i++{
			if v % i == 0{
				bok = false
				break
			}
		}
		if bok{
			receive <- v
		}
	}

	fmt.Println("其中一个协程取不到数据退出")
	boolchannel <- true
}

func main(){
	var writechannel chan int
	var receive chan int
	var boolchannel chan bool

	writechannel = make(chan int,10000)
	receive = make(chan int,10000)
	boolchannel = make(chan bool,3)

	// 返回当前程序开始运行时间戳
	start := time.Now().UnixNano()
	fmt.Println(start)

	go writer(writechannel)
	// 开启二个协程从writechannel中取数据实现并行(本机双核)
	for i:=0;i<2;i++{
		go receivefunc(writechannel,receive,boolchannel)
	}

	// 匿名函数协程
	go func(){
		for i:=0;i<2;i++{
			<-boolchannel
		}
		// 从boolchannel取出四个元素就可以关闭boolchannel和receive
		fmt.Println("receive已关闭")
		close(boolchannel)
		close(receive)
	}()
	
	// 结束时间戳
	end := time.Now().UnixNano()
	fmt.Println(end)
	fmt.Println("双核二个协程并行运行程序时间为: ",end-start) 

	for{
		k,ok := <- receive
			if !ok{
				break
			}
			fmt.Println("素数=",k)
	}
	fmt.Println("主线程退出")
}


程序运行性能统计(纳秒):


可以看到使用协程和不适用协程性能差距还是很大的

双核cpu单协程写入双协程读取:
1000200/999600/999700/999800/1002500

双核cpu单协程写入读取:
999800/1000700

双核cpu普通方法写入读取(不通过协程运行):
64965300/60979100


channel使用细节和注意事项:

在这里插入图片描述


channel声明为只读和只写:

// channel默认为可读可写也可以声明为只读或只写
package main

import "fmt"

func main(){
	// 声明为只写: chan<- 此时则只能写入
	var readchannel chan<- int
	readchannel = make(chan int,2)
	// 只能写入
	readchannel <- 100

	// 声明为只读: <-chan 此时则只能读取
	var wirtechannel <-chan int
	wirtechannel = make(chan int,2)
	// 只能读取
	num := <- wirtechannel 
}

在这里插入图片描述


注意点三:

  • return 表示退出当前函数系统
  • break跳出当前循环
  • 传统遍历管道的方法如果不关闭管道则会阻塞导致deadlock死锁
  • 实际开发中可能不能确定什么时候关闭channel
  • 可以使用select方法解决
// 通过select可以解决不关闭channel进行循环程序报错问题
package main

import (
	"fmt"
	"time"
)
func main(){
	intchan := make(chan int,5)
	for i:=0;i<5;i++{
		intchan <- i
	}

	stringchan := make(chan string,5)
	for i:=0;i<5;i++{
		stringchan <- "hello" + fmt.Sprintf("%d",i)
	}

	for{
		select{
		// 如果intchan一直没有关闭不会死锁会自动匹配到下一个进行遍历
		case v := <- intchan:
			fmt.Println("读取到的数据: ",v)
			time.Sleep(time.Second)
		case v := <- stringchan:
			fmt.Println("读取到的数据: ",v)
			time.Sleep(time.Second)
		default :
			fmt.Println("娶不到数据即将退出...")
			return
		}
	}
}
读取到的数据:  0
读取到的数据:  1
读取到的数据:  2
读取到的数据:  3
读取到的数据:  4
读取到的数据:  hello0
读取到的数据:  hello1
读取到的数据:  hello2
读取到的数据:  hello3
读取到的数据:  hello4
娶不到数据即将退出...


注意点四:

  • 通过添加recover()异常捕获解决协程出现panic导致程序崩溃
  • 在goroutine中使用defer+recover()+匿名函数进行异常捕获即使这个goroutine发生了panic也不会影响主线程运行
// 使用recover可以进行异常捕获
package main

import (
	"fmt"
	"time"
)
func n1(){
	for i:=0;i<5;i++{
		fmt.Printf("n1=%v\n",i*2)
		time.Sleep(time.Second)
	}
}

func n2(){
// defer+recover()+匿名函数进行异常捕获
	defer func(){
		if err := recover();err != nil{
			fmt.Println("发生错误: ",err)
		} 
	}()
	
	var a map[int]string
	a[1] = "sayhello"
}

func main(){
	go n1()
	go n2()

	for i:=0;i<5;i++{
		fmt.Println("main()输出:",i * 3 )
		time.Sleep(time.Second)
	}
}
main()输出: 0
n1=0
发生错误:  assignment to entry in nil map
n1=2
main()输出: 3
main()输出: 6
n1=4
n1=6
main()输出: 9
n1=8
main()输出: 12


对栈和channel数据出入顺序个人理解:

  • 栈可以理解为数值类型,数据都是拷贝入栈,无tag无指针地址标识,所以先入栈后入栈的数据理论上优先级是相同的能用即可,当程序需要资源调度就到栈中取数据,就近原则依次取出
  • channel数据本质上是一个队列也就是需要排队,则有标识tag区分从而维护队列,需要资源调度就来找自己对应需要的数据,先入栈对应的程序理论上是先执行,对应的channel数据也就先被取出和栈数据最大区别有tag,数据对应不同的功能,或者说程序


多个goroutine之间进行通信和资源的共享访问的两种方式

  • 方式一:使用sync.Mutex 结构体绑定的方法,可以通过创建全局互斥锁的方式实现多个goroutine协程之间的通信和内存数据共享,通过对全局读写资源上锁,主线程添加一定时间等待(等待所有协程执行完毕)一个协程使用资源的时候其他协程进入队列等待,等前一个协程使用完毕以此类推
  • 上边这种方式有几个弊端:首先是主线程等待时间不好估算,锁会消耗大量系统资源
  • 方式二:这里就引出了channel(管道)顾名思义就是一个管子,把同类型的资源例如sting类型的放入一个管道中,channel是在多个goroutine协程之间传递数据和同步的重要手段,channel是一个引用类型,是带tag所以遵循先入先出原则(个人猜测)
  • go通过channel实现了CSP通信模型,主要用户goroutine之间的消息传递和事件通知
  • channel底层也是通过sync.Mutex 结构体中的方法特性实现的,进行了更为完善的方法封装供我们使用(毕竟是大牛写的)
  • sync.Mutex相较于channel则更为原生,使用其绑定的方法通过添加全局互斥锁也可以实现goroutine协程状态同步,数据共享但有弊端,推荐使用channel
  • CSP全称:Communicating Sequential Processes(沟通顺序流程),是一个通信模型概念,csp被认为是Go在并发编程成功的关键因素,大多数编程语言并发编程模型是基于线程和内存同步访问控制,go的并发编程模型用 goroutine(协程)和channel(管道) 代替,goroutine和线程类型,channel和mutex(用于内存同步访问控制)类似
  • 像java是通过内存共享实现线程数据共享和线程状态同步的,是基于内核态的,golang是通过channel或mutex实现数据共享和线程状态同步是一种代码逻辑,不会耗费过量的cpu资源,所以依托channel理论上可以轻松起上万条goroutine
  • go的并发哲学:不要通过共享内存来通信,而要通过通信来实现内存共享
  • 互斥锁是一种简单的加锁的方法来控制对共享资源的访问,互斥锁只有两种状态,即上锁( lock )和解锁( unlock )。即上锁后一个goroutine调度使用被锁资源其他协程不能使用,只有该goroutine使用完该资源解锁后,其他协程才能继续上锁使用



版权声明:本文为weixin_43867700原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。