Go语言实现分布式缓存(二) —— 单机并发缓存

  • Post author:
  • Post category:其他


我跟着极客兔兔的教程实现了分布式缓存,该系列文章是对实现过程的总结。

详细实现教程:

7天用Go从零实现分布式缓存GeeCache

这篇文章会使用互斥锁:

Mutex

来保证缓存并发读写时的安全性,然后在此基础上实现单机的缓存。



保证并发读写安全性


my-cache2\byteValue.go

type BytesValue struct {
	Bytes []byte
}

func (v BytesValue) Len() int {
	return len(v.Bytes)
}

func (v BytesValue) String() string {
	return string(v.Bytes)
}

func (v BytesValue) ByteSlice() []byte {
	return cloneBytes(v.Bytes)
}

func cloneBytes(bytes []byte) []byte {
	b := make([]byte, len(bytes))
	copy(b, bytes)
	return b
}
  • BytesValue是后面对缓存进行读写的主要数据类型。BytesValue中只有一个[ ]byte类型的字段,使缓存支持多种数据类型的读写(图片、视频等)。并且,BytesValue实现了Len()方法,相当于实现了Value接口。


my-cache2/mycache/mainCache.go

type mainCache struct {
	cache    *evict.Cache
	maxBytes uint64
	mu       sync.Mutex
}

func (c *mainCache) get(key string) (my_cache2.BytesValue, bool) {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.cache == nil {
		c.cache = evict.NewCache(c.maxBytes)
		c.cache.DeleteExpired()
	}
	value, ok := c.cache.Get(key)
	if !ok {
		return my_cache2.BytesValue{}, false
	}
	return value.(my_cache2.BytesValue), true
}

func (c *mainCache) add(key string, value my_cache2.BytesValue, expire int64) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.cache == nil {
		c.cache = evict.NewCache(c.maxBytes)
		c.cache.DeleteExpired()
	}
	return c.cache.Add(key, value, expire)
}

  • 这里用到了延迟加载,当第一次调用get或者add方法时,cache才会被初始化,并且开启定期定期淘汰过期缓存的协程。



Group类型的编写


my-cache2\mycache\group.go



回调函数

type RntValue struct {
	Bytes  []byte
	Expire int64
}
type getter interface {
	get(key string) (RntValue, error)
}
type GetterFunc func(key string) (RntValue, error)

func (g GetterFunc) get(key string) (RntValue, error) {
	return g(key)
}

  • 用户使用缓存进行查询时,若缓存未命中,一般会从数据库中获取数据,然后再保存到缓存。
  • 这里getter接口中的get是一个回调函数,并且这里的get函数会由使用者自己传入。
  • 在后面的代码中会实现:当缓存未命中时就会掉用这个回调函数,并且将回调函数的返回值保存到缓存中。所以这里我们将回调函数的返回值封装到了RntValue,里面有[ ]byte和int64两个类型的字段,Bytes是要缓存的真实值,Expire是缓存数据的过期时间。
  • 这里GetterFunc类型的使用是十分巧妙的,我们可以借助这个类型将

    func(key string) (RntValue, error)

    类型的方法转换为接口类型。

    例如:
func test(){
	var g getter = GetterFunc(func(key string) (RntValue, error) {
		//code......
	})
}

下面就是核心数据类型:Group



Group

type Group struct {
	name   string
	cache  *mainCache
	getter getter
}

var (
	rw     sync.RWMutex
	groups map[string]*Group = make(map[string]*Group)
)

func NewGroup(name string, maxBytes uint64, getter getter) *Group {
	rw.Lock()
	defer rw.Unlock()
	if getter == nil {
		panic("the getter is not allowed to be nil")
	}
	group := &Group{
		name:   name,
		getter: getter,
		cache:  &mainCache{maxBytes: maxBytes},
	}
	groups[name] = group
	return group
}

func GetGroup(name string) (*Group, bool) {
	rw.RLock()
	defer rw.RUnlock()
	group := groups[name]
	if group != nil {
		return group, true
	} else {
		return nil, false
	}
}

  • 一个Group相当于缓存的命名空间,每个group都有自己的name,两个不同name的Group,相当于两个独立的缓存。
  • 这里使用了读写锁来保证Group读写的安全性,这里不再赘述。



缓存的读操作

func (g *Group) Get(key string) (my_cache2.BytesValue, error) {
	bytesValue, ok := g.cache.get(key)
	if ok {
		log.Printf("[MyCache] %s is hit in cache\n", key)
		return bytesValue, nil
	}
	return g.loadLocally(key)
}

func (g *Group) loadLocally(key string) (my_cache2.BytesValue, error) {
	rntValue, err := g.getter.get(key)
	if err != nil {
		return my_cache2.BytesValue{}, nil
	}
	log.Printf("[Slow DB] %s is searched in DB", key)
	value := my_cache2.BytesValue{Bytes: rntValue.Bytes}
	err = g.syncToCache(key, value, rntValue.Expire)
	return value, err
}

func (g *Group) syncToCache(key string, value my_cache2.BytesValue, expire int64) error {
	return g.cache.add(key, value, expire)
}
  • Get 方法就是通过key,从缓存中获取数据,若未获取到数据,就会调用loadLocally。
  • loadLocally 方法是在缓存未命中的情况下调用的,该方法会调用回调函数从数据库获取数据。
  • syncToCache 是将数据同步到缓存。

至此,单机并发缓存模块已经完成了,下面是测试。



测试

//模拟数据库
var db = map[string]string{
	"jok": "545",
	"klo": "323",
	"los": "232",
}

func TestGroup(t *testing.T) {
	mycache.NewGroup("group1", 2<<10, mycache.GetterFunc(func(key string) (mycache.RntValue, error) {
		s := db[key]
		if s == "" {
			return mycache.RntValue{}, errors.New("key is not find in db")
		}
		return mycache.RntValue{
			Bytes:  []byte(s),
			Expire: 2,
		}, nil
	}))

	group, _ := mycache.GetGroup("group1")
	fmt.Println(group.Get("jok"))
	fmt.Println(group.Get("klo"))
	fmt.Println(group.Get("jok"))
	fmt.Println(group.Get("klo"))
	time.Sleep(2 * time.Second)
}

测试结果:

=== RUN   TestGroup
2022/11/14 00:42:48 [Slow DB] jok is searched in DB
545 <nil>
2022/11/14 00:42:48 [Slow DB] klo is searched in DB
323 <nil>
2022/11/14 00:42:48 [MyCache] jok is hit in cache
545 <nil>
2022/11/14 00:42:48 [MyCache] klo is hit in cache
323 <nil>
--- PASS: TestGroup (0.14s)
PASS



版权声明:本文为m0_62969222原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。