五种模式解决go中的并发问题
For-Select-Done
我们应该防止程序中发生任何泄露。所以我们应该对于留在程序中的go例程发送信号,让它知道它可以退出。
最常见的就是将for-select循环与通道结合起来,向go程序发送一个关闭信号。我们称它为“完成”通道。
func printIntegers(done <-chan struct{}, intStream <-chan int) {
for{
select {
case i := <-intStream:
fmt.Println(i)
case <-done:
return
}
}
}
没有看懂,先记在这里……
扇入模式
func fanIn(ctx context.Context, fetchers ...<-chan interface{}) <-chan interface{} {
combinedFetcher := make(chan interface{})
// 1
var wg sync.WaitGroup
wg.Add(len(fetchers))
// 2
for _, f := range fetchers {
f := f
go func() {
// 3
defer wg.Done()
for{
select{
case res := <-f:
combinedFetcher <- res
case <-ctx.Done():
return
}
}
}()
}
// 4
// Channel cleanup
go func() {
wg.Wait()
close(combinedFetcher)
} ()
return combinedFetcher
}
从流中获取前 n 个值
func takeFirstN(ctx context.Context, dataSource <-chan interface{}, n int) <-chan interface{} {
// 1
takeChannel := make(chan interface{})
// 2
go func() {
defer close(takeChannel)
// 3
for i := 0; i< n; i++ {
select {
case val, ok := <-dataSource:
if !ok{
return
}
takeChannel <- val
case <-ctx.Done():
return
}
}
}()
return takeChannel
}
订阅模式
type Subscription interface {
Updates() <-chan Item
}
//On the other hand, we are going to use another interface as an abstraction to fetch the data we
//need:
//另一方面,我们将使用另一个接口作为抽象来获取我们需要的数据。
type Fetcher interface {
Fetch() (Item, error)
}
//For each of these we are going to have a concrete type implementing them.
//对于其中的每一个,我们都将有一个具体的类型来实现它们。
//For the subscription:
//对于订阅来说。
func NewSubscription(ctx context.Context, fetcher Fetcher, freq int) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item),
}
// Running the Task Supposed to fetch our data
go s.serve(ctx, freq)
return s
}
type sub struct {
fetcher Fetcher
updates chan Item
}
func (s *sub) Updates() <-chan Item {
return s.updates
}
//We are going to go into more details about what happens inside the serve method.
我们将更详细地介绍服务方法内部发生的事情。
//For the fetcher:
//对于取物者来说。
func NewFetcher(uri string) Fetcher {
f := &fetcher{
uri: uri,
}
return f
}
type fetcher struct {
uri string
}
//Inside the serve method
//The serve method consists of a for-select-done type of loop:
//服务方法由一个 for-select-done 类型的循环组成。
func (s *sub) serve(ctx context.Context, checkFrequency int) {
clock := time.NewTicker(time.Duration(checkFrequency) * time.Second)
type fetchResult struct {
fetched Item
err error
}
fetchDone := make(chan fetchResult, 1)
for {
select {
// Clock that triggers the fetch
case <-clock.C:
go func() {
fetched, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, err}
}()
// Case where the fetch result is
// Ready to be consumed
case result := <-fetchDone:
fetched := result.fetched
if result.err != nil {
log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error())
break
}
s.updates <-fetched
// Case where we need to close the server
case <-ctx.Done():
return
}
}
}
有一个修正,暂时先不提。
地图模式
func Map(done <-chan struct{}, inputStream <-chan int, operator func(int)int) <-chan int {
// 1
mappedStream := make(chan int)
go func() {
defer close(mappedStream)
// 2
for {
select {
case <-done:
return
// 3
case i, ok := <-inputStream:
if !ok { return }
//4
select {
case <-done:
return
case mappedStream <- operator(i):
}
}
}
}()
return mappedStream
func main() {
done := make(chan struct{})
defer close(done)
// Generates a channel sending integers
// From 0 to 9
range10 := rangeChannel(done, 10)
multiplierBy10 := func(x int) int {
return x * 10
}
for num := range Map(done, range10, multiplierBy10) {
fmt.Println(num)
}
}
过滤模式
func Filter(done <-chan struct{}, inputStream <-chan int, operator func(int)bool) <-chan int {
filteredStream := make(chan int)
go func() {
defer close(filteredStream)
for {
select {
case <-done:
return
case i, ok := <-inputStream:
if !ok {
return
}
if !operator(i) { break }
select {
case <-done:
return
case filteredStream <- i:
}
}
}
}()
return filteredStream
}
func main() {
done := make(chan struct{})
defer close(done)
// Generates a channel sending integers
// From 0 to 9
range10 := rangeChannel(done, 10)
isEven := func(x int) bool {
return x % 2 == 0
}
for num := range Filter(done, range10, isEven) {
fmt.Println(num)
}
}
这五个模式是构建更大、更复杂的 Golang 应用程序的基石。这些解决方案可以解决你在处
理 GO 中的并发问题时可能遇到的问题。此外,你还可以在此基础上修改、扩展和创建新的
模式。
版权声明:本文为weixin_43786143原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。