go和java – 从协程池和线程池的实现看权衡和妥协

  • Post author:
  • Post category:java


更多技术交流文章,请关注微信公众号【时代码农】

时代码农

我们在Java和其他编程语言中,都可以看到线程池的概念。而Go中,没有线程,只有协程。协程与Java中的线程有一个很大的区别:

Java中的线程是一个实实在在的对象,可以获取,例如:Thread t = new Thread();

这也给我们一个印象,线程池持有一组Thread对象。

而Go中的协程,并不能持有,例如,我们经常这样使用协程:go func() {} ()

并不会返回协程对象,因此,协程池也变得虚无缥缈——无法持有一组协程对象。

但是从本质上来说,线程和协程能够被复用才是出现“池”这个概念的本意。只要我们不让协程结束,就可以被复用。

而想持有协程/线程对象的目的,无非是期望能够控制协程/den程。只要我们将一个或多个对象传入协程/线程。在协程/线程的执行过程中,利用这些对象就可以控制协程/线程。就像我们像其中安插了一个间谍,通过间谍,来控制协程的执行。

在Java中,持有线程对象,有一个很大的好处,线程提供了各种方法,例如start()等来实现,在协程中,我们只能自己实现。

package routine

import (
   "framework/mqant/utils/uuid"
   "sync"
   "time"
)

type RoutineObject struct {
   Id string
   Signals chan string
   Status string
}

type Pool struct {
   routines []*RoutineObject
   jobs chan Runnable
   maxJobs int
   wg sync.WaitGroup
   coreRoutineCount int

   Status string
}

func (pool *Pool) AddJob(job Runnable) {
   if len(pool.jobs) > pool.maxJobs {
      return
   }

   pool.jobs <- job
}

func (pool *Pool) Start() {
   pool.maxJobs = 100
   pool.coreRoutineCount = 10
   pool.jobs = make(chan Runnable, pool.maxJobs)

   for i:=0; i< pool.coreRoutineCount; i++ {
      routineObj := RoutineObject{
         Id : uuid.Rand().Hex(),
         Status : "Start",
         Signals: make(chan string, 10),
      }

      routineObj.Signals <- "Start"
      pool.routines = append(pool.routines, &routineObj)

      pool.wg.Add(1)
   }

   for _, routine := range pool.routines {
      go runGoroutine(pool, routine, pool.jobs)
   }
}

func runGoroutine(pool *Pool, routineObj *RoutineObject, jobs <-chan Runnable ) {
   defer pool.wg.Done()
   for {
      select {
          case job := <-jobs :
            job.Run()
          case signal := <-routineObj.Signals :
             if signal == "Interrupt" {
                return
            }
          default :
             time.Sleep(10 * time.Millisecond)
      }
   }
}

func (pool *Pool) Stop() {
   for _, routineObject := range pool.routines {
       routineObject.Signals <- "Interrupt"
   }

   pool.wg.Wait()
   pool.Status = "Stopped"
}

RoutineObject就是我们用来控制协程的间谍,只要我们持有RoutineObject的集合,就能控制多个协程,从而实现协程池。

以下为测试代码:

package routine

import (
   "fmt"
)

type Runnable interface {
   Run()
}

type Job1 struct {
   Name string
}

func (this *Job1) Run() {
   fmt.Println("job1")
}

type Job2 struct {
   Name string
}

func (this *Job2) Run() {
   fmt.Println("job2")
}

package main

import (
   "time"
)
import . "routinepool/routine"

func main() {
   job1 := Job1{Name : "job1"}
   job2 := Job2{Name : "job2"}

   pool := Pool{}
   pool.Start()
   pool.AddJob(&job1)
   pool.AddJob(&job2)

   time.Sleep(time.Second * 5)

   pool.AddJob(&job1)
   pool.AddJob(&job2)
   time.Sleep(time.Second * 30)
}

我们实现协程池是为了重用协程,也是改变每个异步处理,都新开一个协程这种粗放型的使用方式。这样,必然会导致任务的积压。但是却可以减少系统开支。因此,协程池的大小,就成为一个关键点,到底多大的协程数量,才能保证请求延迟的时间在可容忍的范围内,而又能最大限度节省系统资源。

这是一种权衡和妥协,也是我们在功能设计和系统实现时,永远绕不开的话题。很多问题,经过向上抽象,其实都是在向统一的方向靠拢,例如CAP理论也是在不断权衡和妥协中,进行选取。

这世界,没有十全十美



版权声明:本文为gogy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。