sync.Map详解

  • Post author:
  • Post category:其他




前言

sync.Map是在Go 1.9版本中增加的支持并发安全的Map。在此之前,需要通过自行管理锁才能实现并发安全的map。

更多内容分享,欢迎关注公众号:

Go开发笔记



Map

Map的结构如下:

type Map struct {
	mu Mutex // 互斥锁,用以处理并发读写的问题

	read atomic.Value // 包含可以并发访问的map部分,可以直接Load,但Store必须先持有mu

	dirty map[interface{}]*entry //包含需要持有mu才能访问的部分,为确保dirty快速升级为read,dirty中还包含read中未删除的部分

	misses int //自上次更新后需要mu才能决定key是否存在的数目。misses大到超过dirty拷贝的消耗时,会直接将dirty提升至read,后续的store操作会生成新的dirty。 
}

Map的实现原理与自行实现基本一致,均通过锁及map实现,不同的是sync.Map的结构及实现的过程要相对复杂些。

read的类型为readOnly,其结构如下:

type readOnly struct {
	m       map[interface{}]*entry
	amended bool // true if the dirty map contains some key not in m.

}

type entry struct {
	p unsafe.Pointer // *interface{}
}

readOnly包含一个map及一个是否有新增key的标记,最终的数据存储在entry中,entry存储的是数据的指针。



Store

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// The entry was previously expunged, which implies that there is a
			// non-nil dirty map and this entry is not in it.
			m.dirty[key] = e
		}
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		e.storeLocked(&value)
	} else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

Store的处理过程:

  1. 加载read
  2. 如果当前read包含此key,则尝试存入,若成功则结束.
func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

尝试存入逻辑:

(1)若当前entry已删除则失败,返回。

(2)否则,更新entry value对应指针为存入值的指针

  1. 获取互斥锁
  2. 重新加载read
  3. 检测key是否存在

    (1)存在,若key之前在dirty中已删除,则重新存入dirty,更新对应entry指向的值

    (2)若dirty中已存在,直接更新对应entry指向的值

    (3)若dirty中没有包含m中不存在的key,如果read中的key未删除,则复制到dirty中,存入read.m,并标记;创建新的entry存入dirty
  4. 释放锁

需要说明的是:

新数据的存入路径为: dirty->read

但存入read的时机不在Store中,而是在missLocked中。



Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		// Avoid reporting a spurious miss if m.dirty got promoted while we were
		// blocked on m.mu. (If further loads of the same key will not miss, it's
		// not worth copying the dirty map for this key.)
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	return e.load()
}

func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*interface{})(p), true
}

Store的处理过程:

  1. 加载read
  2. 确认key是否存在
  3. 若不存在,且dirty中包含read中没有的key

    (1)获取互斥锁

    (2)加载read

    (3) 确认key是否存在

    (4) 若不存在,且dirty中包含read中没有的key,获取dirty中是否存在;若未匹配次数大于dirty的大小,则复制dirty至read中。

    (5)释放锁
  4. 若不存在则返回
  5. 若存在返回entry的值

注意:missLocked中,misses次数不小于dirty的size时,会触发dirty数据拷贝至read中,然后清空dirty及misses。



Delete

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
	m.LoadAndDelete(key)
}
// LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// Regardless of whether the entry was present, record a miss: this key
			// will take the slow path until the dirty map is promoted to the read
			// map.
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		return e.delete()
	}
	return nil, false
}

func (e *entry) delete() (value interface{}, ok bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == nil || p == expunged {
			return nil, false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return *(*interface{})(p), true
		}
	}
}

Delete的处理过程:

  1. 加载read
  2. 确认key是否存在
  3. 若不存在,且dirty中包含read中没有的key

    (1)获取互斥锁

    (2)加载read

    (3)确认key是否存在

    (4)若不存在,且dirty中包含read中没有的key,获取dirty中是否存在,标记

    (5)释放锁
  4. 若存在置entry为nil或expunged(删除标志)
  5. 若不存在则返回

Delete中也会触发missLocked。



Range

func (m *Map) Range(f func(key, value interface{}) bool) {
	// We need to be able to iterate over all of the keys that were already
	// present at the start of the call to Range.
	// If read.amended is false, then read.m satisfies that property without
	// requiring us to hold m.mu for a long time.
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		// m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
		// (assuming the caller does not break out early), so a call to Range
		// amortizes an entire copy of the map: we can promote the dirty copy
		// immediately!
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

Range的处理过程:

  1. 加载read
  2. 如果read外还有数据

    (1)获取互斥锁

    (2)加载read

    (3)如果read外还有数据,加载dirty

    (4)存储dirty中的内容至m.read,清空标记

    (5)释放锁
  3. 遍历read.m
  4. 若key不存在跳过
  5. 若key存在,则执行func f

Range中dirty中存在read中没有的数据时,会直接将dirty数据拷贝至read中。



总结

Store/Load/Delete整体思路:

  1. 先从read中获取key是否存在,若存在,进行Store/Load/Delete,对于存在的key此处无需用锁
  2. 若不存在或处理失败(如tryStore失败),获取互斥锁,再次加载确认read中是否存在,若存在,进行Store/Load/Delete
  3. 若dirty中不存在,Store新增/Load返回nil/Delete返回nil
  4. 释放锁

Store一个新key,其处理步骤是:

  1. 拷贝read中未删除的数据至dirty中,标记新增amended为true,存入dirty
  2. 后续Load/Delete/LoadAndDelete/LoadOrStore,若read中key不存在时,misses满足条件时,会将dirty中的数据存入read中,标记amended为false,从而实现数据的循环流转。

对于一个已存在的key,则更新其值,如果key之前已删除,则需先存入dirty中。

存储时会先拷贝read的数据至dirty中,有新增数据时,dirty中的数据是多于read的。正因为dirty中包含read中的数据,所以可以在

sync.Map实际就是通过更细粒度的数据划分,从而将锁的数据范围缩小,进而提升并发的性能。

关于删除的操作,并不是立刻删除,只是将entry置为删除或nil,具体的删除操作在dirty拷贝read数据时进行。



公众号

鄙人刚刚开通了公众号,专注于分享Go开发相关内容,望大家感兴趣的支持一下,在此特别感谢。



版权声明:本文为xz_studying原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。