数据库核心层
前面实现完了处理器的逻辑,现在到了核心的数据层实现了,核心的数据库主要是来执行用户发送的指令并且进行数据存储
1. Database
数据层的顶级接口定义,在
interface/database/database.go
文件中定义,
其中定义了
Database
接口以及
DataEntity
结构体用于包装数据,其中
interface
类型表示可以存放任何类型的数据
//Database 数据库的业务层
type Database interface {
//Exec 核心层执行指令,返回一个响应体
Exec(client resp.Connection, args [][]byte) resp.Reply
//Close 关闭
Close()
//AfterClientClose 客户端关闭之后需要做的一些操作
AfterClientClose(c resp.Connection)
}
//DataEntity 数据结构,用于包装任何的数据类型
type DataEntity struct {
//数据可以指代任何的数据
Data interface{}
}
2. Dict
redis中存储字典的数据结构,我们会实现这个接口来进行数据的存储,在
datastruct/dict/dict.go
中定义接口
/**
Dict redis中存储字典的数据结构
*/
type Dict interface {
//Get 返回value值,exist代表是否存在
Get(key string) (val interface{}, exist bool)
//Len 字典存在的数据
Len() int
//Put 存入数据,返回存入的个数
Put(key string, value interface{}) (result int)
//PutIfAbsent 如果不存在则设置
PutIfAbsent(key string, value interface{}) (result int)
//PutIfExists 如果存在则设置,如果不存在就不设置
PutIfExists(key string, value interface{}) (result int)
//Remove 移除
Remove(key string) (result int)
//ForEach 遍历方法
ForEach(consumer Consumer)
//Keys 返回所有的key
Keys() []string
//RandomKeys 需要随机返回多少个key
RandomKeys(limit int) []string
//RandomDistinctKeys 返回指定个数不重复的key
RandomDistinctKeys(limit int) []string
//Clear 清空字典表
Clear()
}
//Consumer 自定义的遍历函数
type Consumer func(key string, val interface{}) bool
3. 实现类
3.1 SyncDict
对
Dict
接口进行实现,正常存储数据的地方,我们采用的
sync.Map
来进行数据的存储线程安全的
//SyncDict 使用并发安全的字典,为什么需要包装一层,因为底层的数据接口可能会进行更换
type SyncDict struct {
//m 并发安全的map
m sync.Map
}
//Get get获取方法
func (dict *SyncDict) Get(key string) (val interface{}, exist bool) {
return dict.m.Load(key)
}
//Len 默认使用 map 的range方法来进行遍历个数
func (dict *SyncDict) Len() int {
length := 0
dict.m.Range(func(key, value any) bool {
length++
return true
})
return length
}
//Put 存入数据
func (dict *SyncDict) Put(key string, value interface{}) (result int) {
_, existed := dict.m.Load(key)
dict.m.Store(key, value)
if existed {
return 0
}
return 1
}
//PutIfAbsent 如果不存在则存入数据
func (dict *SyncDict) PutIfAbsent(key string, value interface{}) (result int) {
_, existed := dict.m.Load(key)
if !existed {
dict.m.Store(key, value)
return 1
}
return 0
}
//PutIfExists 如果存在则插入数据
func (dict *SyncDict) PutIfExists(key string, value interface{}) (result int) {
_, existed := dict.m.Load(key)
if existed {
dict.m.Store(key, value)
return 1
}
return 0
}
//Remove 移除数据
func (dict *SyncDict) Remove(key string) (result int) {
_, existed := dict.m.Load(key)
dict.m.Delete(key)
if existed {
return 1
}
return 0
}
//ForEach 遍历数据
func (dict *SyncDict) ForEach(consumer Consumer) {
dict.m.Range(func(key, value any) bool {
consumer(key.(string), value)
return true
})
}
//Keys 获取所有的key
func (dict *SyncDict) Keys() []string {
//创建一个切片,长度为当前字典的长度
keys := make([]string, dict.Len())
dict.m.Range(func(key, _ any) bool {
keys = append(keys, key.(string))
return true
})
return keys
}
//RandomKeys 随机返回指定的key值
func (dict *SyncDict) RandomKeys(limit int) []string {
keys := make([]string, limit)
//遍历传进来的次数,每一次进入dict都只作用一个key上面,因为map每次遍历是无序的
for i := 0; i < limit; i++ {
dict.m.Range(func(key, _ any) bool {
keys[i] = key.(string)
//直接返回false,关闭当前遍历
return false
})
}
return keys
}
//RandomDistinctKeys 随机取不重复的值
func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
keys := make([]string, limit)
i := 0
dict.m.Range(func(key, _ any) bool {
keys[i] = key.(string)
i++
if i == limit {
//直接返回false,关闭当前遍历
return false
}
return true
})
return keys
}
//clear 直接换一个新的即可
func (dict *SyncDict) Clear() {
dict = MakeSyncDict()
}
func MakeSyncDict() *SyncDict {
return &SyncDict{}
}
3.2 DB
DB作为最底层支持的数据库存储层,DB对数据存储的
Dict
进行了一层包装对外提供了支持
字段说明
- index:数据库的索引,每个数据库都会有一个对应的索引id用于进行切换标记
-
data:数据存储的地方,这里我们定义了一个
Dict
的结构用于底层数据的存储 - addAof:定义的一个aof操作的匿名函数,后续来进行实现
//DB 每一个redis的分数据库
type DB struct {
//数据库的索引
index int
//数据存储的实现
data dict.Dict
//赋值一个addAof的方法,可以让指令执行的时候写入数据
addAof func(line CmdLine)
}
//Close 关闭数据库进行清空
func (db *DB) Close() {
db.data.Clear()
}
//AfterClientClose 关闭数据库需要做的操作
func (db *DB) AfterClientClose(c resp.Connection) {
}
//ExecFunc 后续所有的指令执行都要实现当前函数
type ExecFunc func(db *DB, args [][]byte) resp.Reply
//CmdLine 给二维字节组取一个别名
type CmdLine = [][]byte
//makeDB 创建数据库
func makeDB() *DB {
return &DB{
data: dict.MakeSyncDict(),
addAof: func(line CmdLine) {}, //设置一个空的默认方法,防止初始化开始的时候出现问题
}
}
//Exec 在数据库上进行执行
func (db *DB) Exec(c resp.Connection, args [][]byte) resp.Reply {
//获取到一个数据,存储的应该是指令 例如:setnx或者set;这里统一一下按照小写指令进行处理
cmdName := strings.ToLower(string(args[0]))
//获取到执行器
cmd, ok := cmdTable[cmdName]
if !ok {
//如果没有实现这个命令,恢复一个错误
return reply.MakeStandardErrReply("ERR unknown command " + cmdName)
}
//校验一下参数的个数是否合法,例如:set k,缺少了value
if !validateArity(cmd.arity, args) {
return reply.MakeArgNumErrReply(cmdName)
}
//获取到执行器
fun := cmd.executor
//set k,v --> 这里传入的参数就是 k,v
return fun(db, args[1:])
}
//validateArity 验证参数,参数分为两种情况 :set K v --> arity = 3 ; 如果是 exists k1 k2 k3 ---> arity 的参数最少都需要2个(属于变长的指令),统一定义为 -2,负号执行标记变长的
func validateArity(arity int, cmdLine CmdLine) bool {
argNum := len(cmdLine)
//指令为定长的
if arity >= 0 {
return argNum == arity
}
//这里负arity可以将变长的参数个数转换为正数进行判断,在注册指令的时候就指定好变长的参数
return argNum >= -arity
}
//GetEntity 公共的方法获取数据,db将dict进行包装了一层
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
//获取到最原始的值
raw, ok := db.data.Get(key)
if !ok {
return nil, false
}
//将raw原始的格式需要转换为 DataEntity
dataEntity, _ := raw.(*database.DataEntity)
return dataEntity, true
}
//PutEntity 存入entity对象
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
return db.data.Put(key, entity)
}
//PutIfExists 存入entity对象,如果存在
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
return db.data.PutIfExists(key, entity)
}
//PutIfAbsent 如果不存在进行存入
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
return db.data.PutIfAbsent(key, entity)
}
//Remove 移除
func (db *DB) Remove(key string) int {
return db.data.Remove(key)
}
//Removes 批量移除
func (db *DB) Removes(keys ...string) int {
result := 0
for _, key := range keys {
_, ok := db.data.Get(key)
if ok {
result += db.Remove(key)
}
}
return result
}
//Flush 清空
func (db *DB) Flush() {
db.data.Clear()
}
3.3 Database
database/database.go
文件中定义的一个
Database
结构体主要是对
DB
进行了一层封装
字段说明
- dbSet:创建的数据库,根据配置文件中的初始化数据库的数量进行创建
- aofHandler:自定义的 AofHandler 接口,后续用于消息持久化
type Database struct {
//存储多个数据库
dbSet []*DB
//aof处理器
aofHandler *aof.AofHandler
}
//NewDatabase 创建一个db数据库内核,初始化数据,根据配置文件中的数据库数量进行初始化
func NewDatabase() *Database {
database := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
//初始化16个数据库db
database.dbSet = make([]*DB, config.Properties.Databases)
for i := range database.dbSet {
db := makeDB()
db.index = i
database.dbSet[i] = db
}
//判断是否开启消息落盘的功能
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAofHandler(database)
if err != nil {
panic(err)
}
database.aofHandler = aofHandler
//初始化db里面的匿名处理器的方法
for _, db := range database.dbSet {
tmpDb := db
db.addAof = func(line CmdLine) {
/**
匿名函数会出现闭包的问题,如果使用 db.index 会一直是15索引
*/
database.aofHandler.AddAof(tmpDb.index, line)
}
}
}
return database
}
//Exec 内核层进行执行
func (d *Database) Exec(client resp.Connection, args [][]byte) resp.Reply {
defer func() {
if err := recover(); err != nil {
logger.Error(err)
}
}()
//需要处理 select 1 命令来切换数据库,后面的db核心数据库不需要处理select
cmdName := strings.ToLower(string(args[0]))
if cmdName == "select" {
//select命令
if len(args) != 2 {
return reply.MakeArgNumErrReply("select")
}
return execSelect(client, d, args)
}
dbIndex := client.GetDBIndex()
db := d.dbSet[dbIndex]
if db == nil {
return reply.MakeStandardErrReply("ERR DB is nil")
}
return db.Exec(client, args)
}
//Close 关闭数据库
func (d *Database) Close() {
for _, db := range d.dbSet {
db.Close()
}
}
//AfterClientClose 关闭连接之后需要执行的回调
func (d *Database) AfterClientClose(c resp.Connection) {
}
//execSelect 根据用户发送的指令,来修改数据库的索引,例如:select 1
func execSelect(c resp.Connection, database *Database, args [][]byte) resp.Reply {
//参数进行转换为数字
dbIndex, err := strconv.Atoi(string(args[1]))
if err != nil {
return reply.MakeStandardErrReply("ERR invalid DB index")
}
if dbIndex >= len(database.dbSet) {
return reply.MakeStandardErrReply("ERR DB index is out of range")
}
c.SelectDB(dbIndex)
return reply.MakeOkReply()
}
以上就是数据库层的实现,主要是
Database –> DB –> Dict
这么一个层次的结构,Database获取到用户需要在哪个数据库进行指令的执行,然后交给对应索引的 DB 进行执行,DB然后去回去到对应的指令执行器进行执行,下面是指令的实现
4. 指令实现
4.1 command
这里定义一个
command
的命令结构体,主要用于对执行命令的函数进行包装
参数说明
-
executor :执行器函数,这个函数的定义是在
database/db.go
中进行定义就是上面定义
DB
结构体文件中的函数,后续我们只需要将对应的函数注册到一个 map 中跟命令进行关联上就可以方便的取到
//ExecFunc 后续所有的指令执行都要实现当前函数
type ExecFunc func(db *DB, args [][]byte) resp.Reply
- arity:当前指令需要几个参数,例如:set key value,就是需要3个参数;这里的 arity 有正负两种情况,负数代表变长指令,例如:keys key1 key2,那么arity就是 -2 也就是说指令最少都得两个参数
方法说明
- RegisterCommand:提供的一个公共方法可以将执行的函数注册到map中,跟命令相互关联上
//用于记录所有指令跟 command 的关系
var cmdTable = make(map[string]*command)
//command 用于区分指令的类型以及执行方式
type command struct {
//执行器
executor ExecFunc
//需要几个参数
arity int
}
//RegisterCommand 注册指令的执行方法,存储的时候就是 一个指令对应一个执行器
func RegisterCommand(name string, executor ExecFunc, arity int) {
name = strings.ToLower(name)
cmdTable[name] = &command{
executor: executor,
arity: arity,
}
}
4.2 keys指令集
在
database/keys.go
中我们定义了 keys相关指令集的实现,其中方法名称随意定义,主要是在下面的
init()
方法中进行注册,go在启动时,会自动执行每个文件中的
init()
方法,这样就可以将执行的函数跟命令相互关联上了。每个方法都需要
*DB
和
args [][]bytes
两个参数,上面说了
DB
是对
Dict
进行封装的,
Dict
又是真正存储数据的地方,所以命令执行的方法只需要去调用
DB
中对应的方法即可
/**
实现以下keys指令集:
DEL
EXISTS
KEYS
FLUSHDB
TYPE
RENAME
RENAMENX
*/
//execDel DEL k1 k2 k3,外面就已经切掉了DEL
func execDel(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
for i, v := range keys {
keys[i] = string(v)
}
deleted := db.Removes(keys...)
if deleted > 0 {
//前面指令是被切掉了,这里需要恢复回来
db.addAof(utils.ToCmdLine2("del", args...))
}
return reply.MakeIntReply(int64(deleted))
}
//execExists Exists存在几个key
func execExists(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
result := int64(0)
for _, v := range keys {
_, exists := db.GetEntity(v)
if exists {
result++
}
}
return reply.MakeIntReply(result)
}
//execKeys keys k1 k2 k3
func execKeys(db *DB, args [][]byte) resp.Reply {
//获取到第一个参数是否是通配符
pattern := wildcard.CompilePattern(string(args[0]))
//用于所有匹配的key
result := make([][]byte, 0)
db.data.ForEach(func(key string, val interface{}) bool {
match := pattern.IsMatch(key)
if match {
result = append(result, []byte(key))
}
return true
})
return reply.MakeMultiBulkReply(result)
}
//execType 查询key的类型,例如:type key1
func execType(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
entity, ok := db.GetEntity(key)
if !ok {
return reply.MakeStatusReply("none") //tcp报文里面就是 :none\r\n
}
//类型断言
switch entity.Data.(type) {
case []byte:
return reply.MakeStatusReply("string")
//todo: 判断后续其它的类型
}
return reply.MakeUnknownErrReply()
}
//execRename 改名,例如:rename old newKey
func execRename(db *DB, args [][]byte) resp.Reply {
old := string(args[0])
newKey := string(args[1])
entity, exists := db.GetEntity(old)
if !exists {
return reply.MakeStandardErrReply("no such key")
}
//将新的key存入进行
db.PutEntity(newKey, entity)
//删除老的key
db.Remove(old)
db.addAof(utils.ToCmdLine2("rename", args...))
return reply.MakeOkReply()
}
//execRenameNx renamenx:在改到新名称的时候,判断新的名称会不会把原来已经存在的key给干掉,例如:renamenx K1 K2 ,需要判断原来K2是否存在
func execRenameNx(db *DB, args [][]byte) resp.Reply {
old := string(args[0])
newKey := string(args[1])
//判断新的key是否存在了,如果存在就什么都不做
_, ok := db.GetEntity(newKey)
if ok {
//如果什么都没有操作就返回0
return reply.MakeIntReply(0)
}
//继续判断原来的逻辑
entity, exists := db.GetEntity(old)
if !exists {
return reply.MakeStandardErrReply("no such key")
}
//将新的key存入进行
db.PutEntity(newKey, entity)
//删除老的key
db.Remove(old)
db.addAof(utils.ToCmdLine2("renamenx", args...))
//如果执行了就返回一个1
return reply.MakeIntReply(1)
}
//execFlushDb
func execFlushDb(db *DB, args [][]byte) resp.Reply {
db.Flush()
db.addAof(utils.ToCmdLine2("flushdb", args...))
return reply.MakeOkReply()
}
//init 初始化注册命令
func init() {
RegisterCommand("del", execDel, -2)
RegisterCommand("exists", execExists, -2)
RegisterCommand("flushdb", execFlushDb, -1) //-1的参数无论 flushdb后面跟随什么都直接忽略
RegisterCommand("type", execType, 2)
RegisterCommand("rename", execRename, 3)
RegisterCommand("renamenx", execRenameNx, 3)
RegisterCommand("keys", execKeys, -2)
}
4.3 string指令集
string
指令集在
database\string.go
中进行定义,跟上面的
keys
集一样的实现方式
/**
实现string类型的指令集
GET
SET
SETNX
GETSET
STRLEN
*/
//init go语言在启动的时候就会执行init方法
func init() {
RegisterCommand("get", execGet, 2)
RegisterCommand("set", execSet, 3)
RegisterCommand("setnx", execSetnx, 3)
RegisterCommand("getset", execGetset, 3)
RegisterCommand("strlen", execStrLen, 2)
}
//execGet 获取数据
func execGet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
entity, ok := db.GetEntity(key)
if !ok {
return reply.MakeNullBulkReply()
}
//目前存入数据都是存入 []byte
bytes, b := entity.Data.([]byte)
//如果存的类型有其它的,所以需要判断是否转换成功
if !b {
//todo:转换其它的类型
}
return reply.MakeBulkReply(bytes)
}
//execSet set key value
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
//按照字节数组的方式存储数据
value := args[1]
data := &database.DataEntity{
Data: value, //数据存储依照字节数组的方式进行存储
}
result := db.PutEntity(key, data)
db.addAof(utils.ToCmdLine2("set", args...))
return reply.MakeIntReply(int64(result))
}
//execSetnx setnx key value
func execSetnx(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
//按照字节数组的方式存储数据
value := args[1]
data := &database.DataEntity{
Data: value, //数据存储依照字节数组的方式进行存储
}
result := db.PutIfAbsent(key, data)
db.addAof(utils.ToCmdLine2("setnx", args...))
return reply.MakeIntReply(int64(result))
}
//execGetset getset key value
func execGetset(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
//先获取原来的值,再设置现在的值
entity, exists := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{Data: value})
if !exists {
return reply.MakeNullBulkReply()
}
db.addAof(utils.ToCmdLine2("getset", args...))
return reply.MakeBulkReply(entity.Data.([]byte))
}
//execStrLen strlen key 获取到的key的value长度
func execStrLen(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
//先获取原来的值,再设置现在的值
entity, exists := db.GetEntity(key)
if !exists {
return reply.MakeNullBulkReply()
}
return reply.MakeIntReply(int64(len(entity.Data.([]byte))))
}
4.4 ping命令
ping
命令实现比较简单
//Ping ping的命令
func Ping(db *DB, args [][]byte) resp.Reply {
return reply.MakePongReply()
}
//init 随便写在哪个包下面,go语言在启动的时候都会调用这个方法
func init() {
RegisterCommand("ping", Ping, 1)
}