6.824 Lab 4B: Sharded Key/Value Service

  • Post author:
  • Post category:其他




思路

  • 每个leader负责poll新的config,将一个config发到raft中,在

    applyCh

    收到该config后更新配置;
  • leader依次poll下一个编号的config,并且只有在所有需要发送的shard已经被接收方接收、所有需要接收的shard已经收到后才会apply下一个config;
  • 接收shard、删除shard操作都需要写到raft日志中,让一个group里的majority都执行同样的操作;
  • 如果一个接收者5秒内没有接收一个shard(即发送

    AddShard

    RPC失败),则默认该服务器已经接收这个shard并且离线。实际生产中更实际的做法是将该shard保存起来以后再尝试发送;
  • 为了高效地查找某一个shard对应的所有k/v,将lab3中的

    db

    分割为

    shardId2Shard map[int]Shard

    ,根据shardId查找一个shard。



实现



db.go

type Shard struct {
	Data      map[string]string
	ConfigNum int // what version this Shard is in
}



common.go

package shardkv

//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each Shard.
// Shardmaster may change Shard assignment from time to time.
//
// You will have to modify these definitions.
//

const (
	OK               Err = "OK"
	ErrNoKey             = "ErrNoKey"
	ErrWrongGroup        = "ErrWrongGroup"
	ErrWrongLeader       = "ErrWrongLeader"
	ShardNotArrived      = "ShardNotArrived"
	ConfigNotArrived     = "ConfigNotArrived"
)

const (
	Put         Operation = "Put"
	Append                = "Append"
	Get                   = "Get"
	Config                = "Config"
	AddShard              = "AddShard"
	RemoveShard           = "RemoveShard"
)

type Operation string

type Err string

// Put or Append
type PutAppendArgs struct {
	// You'll have to add definitions here.
	Key       string
	Value     string
	Op        Operation // "Put" or "Append"
	ClientId  int64
	RequestId int
}

type PutAppendReply struct {
	Err Err
}

type GetArgs struct {
	Key       string
	ClientId  int64
	RequestId int
}

type GetReply struct {
	Err   Err
	Value string
}

type AddShardArg struct {
	LastAppliedRequestId map[int64]int // for receiver to update its state
	ShardId              int
	Shard                Shard // Shard to be sent
	ClientId             int64
	RequestId            int
}

type AddShardReply struct {
	Err Err
}



server.go

package shardkv

import "../shardmaster"
import (
	"../labrpc"
	"bytes"
	"log"
	"time"
)
import "../raft"
import "sync"
import "../labgob"

const (
	Timeout            = 800 * time.Millisecond // request timeout duration
	PollPeriod         = 100 * time.Millisecond // poll configuration period
	SendPeriod         = 180 * time.Millisecond // interval between sending a shard to a group
	CheckReceivePeriod = 200 * time.Millisecond // interval between checking received shards
)

// Op is used to indicate an operation on application
// Op is passed to raft and is persisted in its log
// fields in an Op struct must not be changed once it's passed to raft
type Op struct {
	ClientId  int64
	RequestId int
	Type      Operation // type of operation is either get/put/append/Config
	// field used for client put/get/append requests
	Key   string
	Value string
	// field used for changing configuration
	Config shardmaster.Config
	// field used for shard migration add/remove
	ShardId              int
	Shard                Shard
	LastAppliedRequestId map[int64]int // for receiver to update its state
}

// Notification is used to wake waiting RPC caller after Op arrived from applyCh
type Notification struct {
	ClientId  int64
	RequestId int
	Err       Err
}

type ShardKV struct {
	mu                   sync.Mutex
	me                   int
	rf                   *raft.Raft
	applyCh              chan raft.ApplyMsg
	makeEnd              func(string) *labrpc.ClientEnd
	gid                  int
	masters              []*labrpc.ClientEnd
	maxRaftState         int                       // snapshot if log grows this big
	Config               shardmaster.Config        // current configuration
	LastConfig           shardmaster.Config        // last configuration, server needs both configurations to judge what shard to send/receive
	snapshotCh           chan []byte               // snapshotCh is used to send snapshot to underlying raft
	ShardId2Shard        []Shard                   // ShardId -> Shard, data == nil if this server is not in charge of a shard in Config/LastConfig
	index2Ch             map[int]chan Notification // log index -> notification channel, to notify waiting goroutine
	LastAppliedRequestId map[int64]int             // clientId -> last applied request id. Detect duplicate client request to avoid applying a duplicated command
	masterClerk          *shardmaster.Clerk        // masterClerk is a client used to contact shard master
}

// pollConfig periodically ask shard master for next configuration relative to current configuration.
// pollConfig will check whether this server has completed all task required by current configuration before
// moving on to the next configuration. If not, pollConfig will pick up this job and get it done.
func (kv *ShardKV) pollConfig() {
	kv.mu.Lock()
	gid := kv.gid
	oldConfig := kv.Config
	rf := kv.rf
	kv.mu.Unlock()

	for {
		// only leader needs to deal with configuration tasks
		if _, isLeader := rf.GetState(); !isLeader {
			time.Sleep(Timeout)
			continue
		}
		kv.mu.Lock()
		if !kv.allSent() {
			lastApplied := make(map[int64]int)
			for k, v := range kv.LastAppliedRequestId {
				lastApplied[k] = v
			}
			for shardId, gid := range kv.LastConfig.Shards {
				// need to send this shard away
				if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.ShardId2Shard[shardId].ConfigNum < kv.Config.Num {
					shardCopy := Shard{
						Data:      make(map[string]string),
						ConfigNum: kv.Config.Num,
					}
					for k, v := range kv.ShardId2Shard[shardId].Data {
						shardCopy.Data[k] = v
					}

					args := AddShardArg{
						LastAppliedRequestId: lastApplied,
						ShardId:              shardId,
						Shard:                shardCopy,
						ClientId:             int64(gid),
						RequestId:            kv.Config.Num,
					}
					var reply AddShardReply
					// shardId -> gid -> server names
					serverNames := kv.Config.Groups[kv.Config.Shards[shardId]]
					servers := make([]*labrpc.ClientEnd, len(serverNames))
					for i, name := range serverNames {
						servers[i] = kv.makeEnd(name)
					}
					go func(servers []*labrpc.ClientEnd, args *AddShardArg, reply *AddShardReply) {
						index := 0
						start := time.Now()
						for {
							ok := servers[index].Call("ShardKV.AddShard", args, reply)
							DPrintf("[%d-%d] calls %s's AddShardRPC, args = %v, reply = %v", kv.gid, kv.me, serverNames[index], args, reply)
							if ok && reply.Err == OK || time.Now().Sub(start) >= 5 * time.Second {
								// this shard is successfully sent to destination
								// start a command and remove this shard
								kv.mu.Lock()
								command := Op{
									Type:      RemoveShard,
									ClientId:  int64(kv.gid),
									RequestId: kv.Config.Num,
									ShardId:   args.ShardId,
								}
								kv.mu.Unlock()
								kv.startCommand(command, Timeout)
								break
							}
							index = (index + 1) % len(servers)
							if index == 0 {
								time.Sleep(SendPeriod)
							}
						}
					}(servers, &args, &reply)
				}
			}
			kv.mu.Unlock()
			time.Sleep(PollPeriod)
			continue
		}
		if !kv.allReceived() {
			kv.mu.Unlock()
			time.Sleep(CheckReceivePeriod)
			continue
		}

		// current configuration is configured, poll for the next configuration
		oldConfig = kv.Config
		masterClerk := kv.masterClerk
		kv.mu.Unlock()
		newConfig := masterClerk.Query(oldConfig.Num + 1)
		if newConfig.Num != oldConfig.Num+1 {
			// fail to get next configuration
			time.Sleep(PollPeriod)
			continue
		}
		command := Op{
			Type:      Config,
			ClientId:  int64(gid),
			RequestId: newConfig.Num,
			Config:    newConfig,
		}
		kv.startCommand(command, Timeout)
	}
	// this server successfully applies new configuration
	// tasks required by this configuration will be tackle next loop
}

// check if all shards required sending is sent out
func (kv *ShardKV) allSent() bool {
	for shard, gid := range kv.LastConfig.Shards {
		// are shards sent?
		if gid == kv.gid && kv.Config.Shards[shard] != kv.gid && kv.ShardId2Shard[shard].ConfigNum < kv.Config.Num {
			return false
		}
	}
	return true
}

// check if all shards required receiving is arrived
func (kv *ShardKV) allReceived() bool {
	for shard, gid := range kv.LastConfig.Shards {
		// are shards received?
		if gid != kv.gid && kv.Config.Shards[shard] == kv.gid && kv.ShardId2Shard[shard].ConfigNum < kv.Config.Num {
			return false
		}
	}
	return true
}

// move shards from caller to this server
func (kv *ShardKV) AddShard(args *AddShardArg, reply *AddShardReply) {
	command := Op{
		Type:                 AddShard,
		ClientId:             args.ClientId,
		RequestId:            args.RequestId,
		ShardId:              args.ShardId,
		Shard:                args.Shard,
		LastAppliedRequestId: args.LastAppliedRequestId,
	}
	reply.Err = kv.startCommand(command, Timeout)
	return
}

func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
	shardId := key2shard(args.Key)
	kv.mu.Lock()
	if kv.Config.Shards[shardId] != kv.gid {
		reply.Err = ErrWrongGroup
	} else if kv.ShardId2Shard[shardId].Data == nil {
		reply.Err = ShardNotArrived
	}
	kv.mu.Unlock()
	if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
		return
	}
	command := Op{
		Type:      Get,
		ClientId:  args.ClientId,
		RequestId: args.RequestId,
		Key:       args.Key,
	}
	err := kv.startCommand(command, Timeout)
	if err != OK {
		reply.Err = err
		return
	}
	kv.mu.Lock()
	// double check if configuration has changed while we didn't hold the lock
	if kv.Config.Shards[shardId] != kv.gid {
		reply.Err = ErrWrongGroup
	} else if kv.ShardId2Shard[shardId].Data == nil {
		reply.Err = ShardNotArrived
	} else {
		reply.Err = OK
		reply.Value = kv.ShardId2Shard[shardId].Data[args.Key]
	}
	kv.mu.Unlock()
	return
}

func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	shardId := key2shard(args.Key)
	kv.mu.Lock()
	if kv.Config.Shards[shardId] != kv.gid {
		reply.Err = ErrWrongGroup
	} else if kv.ShardId2Shard[shardId].Data == nil {
		reply.Err = ShardNotArrived
	}
	kv.mu.Unlock()
	if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
		return
	}
	command := Op{
		Type:      args.Op,
		ClientId:  args.ClientId,
		RequestId: args.RequestId,
		Key:       args.Key,
		Value:     args.Value,
	}
	reply.Err = kv.startCommand(command, Timeout)
	return
}

func (kv *ShardKV) startCommand(command Op, timeoutPeriod time.Duration) Err {
	kv.mu.Lock()
	index, _, isLeader := kv.rf.Start(command)
	if !isLeader {
		kv.mu.Unlock()
		return ErrWrongLeader
	}
	if _, ok := kv.index2Ch[index]; !ok {
		kv.index2Ch[index] = make(chan Notification, 1)
	}
	ch := kv.index2Ch[index]
	kv.mu.Unlock()
	select {
	case no := <-ch:
		kv.mu.Lock()
		delete(kv.index2Ch, index)
		if no.RequestId != command.RequestId || no.ClientId != command.ClientId {
			// One way to do this is for the server to detect that it has lost leadership,
			// by noticing that a different request has appeared at the index returned by Start()
			kv.mu.Unlock()
			return ErrWrongLeader
		}
		kv.mu.Unlock()
		return no.Err
	case <-time.After(timeoutPeriod):
		return ErrWrongLeader
	}
}

// listener is in charge of reading from applyCh and execute state-changing commands
func (kv *ShardKV) listener() {
	for msg := range kv.applyCh {
		//DPrintf("listener: %v", msg)
		if msg.CommandValid == false {
			if msg.CommandType == raft.InstallSnapshot {
				kv.mu.Lock()
				snapshot := msg.Command.([]byte)
				kv.installSnapshot(snapshot)
				kv.mu.Unlock()
			} else if msg.CommandType == raft.TakeSnapshot {
				kv.mu.Lock()
				snapshot := kv.snapshot()
				kv.mu.Unlock()
				kv.snapshotCh <- snapshot
			}
			continue
		}

		kv.mu.Lock()
		op := msg.Command.(Op)
		no := Notification{
			ClientId:  op.ClientId,
			RequestId: op.RequestId,
			Err:       OK,
		}
		// only client requests need duplication detection
		if op.Type == Put || op.Type == Get || op.Type == Append {
			shardId := key2shard(op.Key)
			if kv.Config.Shards[shardId] != kv.gid {
				// this server doesn't serve requested Shard
				no.Err = ErrWrongGroup
			} else if kv.ShardId2Shard[shardId].Data == nil {
				// this shard is not yet arrived
				no.Err = ShardNotArrived
			} else {
				lastAppliedRequestId, ok := kv.LastAppliedRequestId[op.ClientId]
				if !ok || lastAppliedRequestId < op.RequestId {
					kv.LastAppliedRequestId[op.ClientId] = op.RequestId
					switch op.Type {
					case Put:
						kv.ShardId2Shard[shardId].Data[op.Key] = op.Value
					case Append:
						kv.ShardId2Shard[shardId].Data[op.Key] += op.Value
					case Get:
						// do nothing, wake waiting RPC goroutine to return requested value
					default:
						log.Fatalf("invalid command type: %v.", op.Type)
					}
				}
			}
		} else {
			// request from server of other group
			switch op.Type {
			case Config:
				kv.doConfig(op)
			case AddShard:
				if kv.Config.Num < op.RequestId {
					no.Err = ConfigNotArrived
					break
				}
				kv.doAddShard(op)
			case RemoveShard:
				// remove operation is from previous Config
				kv.doRemoveShard(op)
			default:
				log.Fatalf("invalid command type: %v.", op.Type)
			}
		}
		// although this is a command that applied before
		// some client may be waiting for its result, same command but append to log twice
		// we should send message to the waiting goroutine anyway
		ch, ok := kv.index2Ch[msg.CommandIndex]
		kv.mu.Unlock()
		if ok {
			ch <- no
		}
	}
}

// doConfig updates server's current configuration
// shard sending and removing need to be handle by pollConfig
func (kv *ShardKV) doConfig(op Op) {
	oldConfig := kv.Config
	newConfig := op.Config
	if oldConfig.Num >= newConfig.Num {
		return
	}
	for shard, gid := range newConfig.Shards {
		if gid == kv.gid && oldConfig.Shards[shard] == 0 {
			// this is a Shard not assigned to any group before, simply create it
			kv.ShardId2Shard[shard].Data = make(map[string]string)
			kv.ShardId2Shard[shard].ConfigNum = newConfig.Num
		}
	}
	kv.LastConfig = oldConfig
	kv.Config = newConfig
	DPrintf("[%d-%d] installs a new Config, oldConfig = %v, newConfig = %v",
		kv.gid, kv.me, oldConfig, newConfig)
}

func (kv *ShardKV) doAddShard(op Op) {
	// this shard is added or it is an outdated command
	if kv.ShardId2Shard[op.ShardId].Data != nil || op.Shard.ConfigNum < kv.Config.Num {
		return
	}
	DPrintf("[%d-%d] adds a shard, shardId = %d, shard = %v, op.RequestId = %d, kv.ShardId2Shard[%d].ConfigNum = %d",
		kv.gid, kv.me, op.ShardId, op.Shard, op.RequestId, op.ShardId, op.Shard.ConfigNum)
	// make a copy of received shard because shard in Op struct needs to be logged and persisted
	shard := Shard{
		Data:      make(map[string]string),
		ConfigNum: op.Shard.ConfigNum,
	}
	for k, v := range op.Shard.Data {
		shard.Data[k] = v
	}
	kv.ShardId2Shard[op.ShardId] = shard
	// update duplication detection state
	for clientId, requestId := range op.LastAppliedRequestId {
		if r, ok := kv.LastAppliedRequestId[clientId]; !ok || r < requestId {
			kv.LastAppliedRequestId[clientId] = requestId
		}
	}
}

func (kv *ShardKV) doRemoveShard(op Op) {
	if op.RequestId < kv.Config.Num {
		return
	}
	DPrintf("[%d-%d] removes shard = %d, Config = %v, kv.ShardId2Shard[%d].ConfigNum = %d",
		kv.gid, kv.me, op.ShardId, kv.Config, op.ShardId, kv.Config.Num)
	kv.ShardId2Shard[op.ShardId].Data = nil
	kv.ShardId2Shard[op.ShardId].ConfigNum = op.RequestId
}

// get snapshot data of kvserver
func (kv *ShardKV) snapshot() []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	err := e.Encode(kv.ShardId2Shard)
	err = e.Encode(kv.LastAppliedRequestId)
	err = e.Encode(kv.maxRaftState)
	err = e.Encode(kv.Config)
	err = e.Encode(kv.LastConfig)
	if err != nil {
		log.Fatalf("[%d-%d] fails to take snapshot.", kv.gid, kv.me)
	}
	return w.Bytes()
}

// install a given snapshot
func (kv *ShardKV) installSnapshot(snapshot []byte) {
	if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
		return
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)
	var Id2Shard []Shard
	var LastAppliedRequestId map[int64]int
	var MaxRaftState int
	var Config, LastConfig shardmaster.Config
	if d.Decode(&Id2Shard) != nil || d.Decode(&LastAppliedRequestId) != nil ||
		d.Decode(&MaxRaftState) != nil || d.Decode(&Config) != nil || d.Decode(&LastConfig) != nil {
		log.Fatalf("[%d] fails to read persistent snapshot", kv.me)
	} else {
		kv.ShardId2Shard = Id2Shard
		kv.LastAppliedRequestId = LastAppliedRequestId
		kv.maxRaftState = MaxRaftState
		kv.Config = Config
		kv.LastConfig = LastConfig
		DPrintf("[%d-%d] installs snapshot, db = %v, Config = %v, LastConfig = %v", kv.gid, kv.me, Id2Shard, kv.Config, kv.LastConfig)
	}
}

// the tester calls Kill() when a ShardKV instance won't
// be needed again. you are not required to do anything
// in Kill(), but it might be convenient to (for example)
// turn off debug output from this instance.
func (kv *ShardKV) Kill() {
	kv.rf.Kill()
	// Your code here, if desired.
}

// servers[] contains the ports of the servers in this group.

 me is the index of the current server in servers[].
//
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
//
// the k/v server should snapshot when Raft's saved state exceeds
// maxRaftState bytes, in order to allow Raft to garbage-collect its
// log. if maxRaftState is -1, you don't need to snapshot.
//
// gid is this group's GID, for interacting with the shardmaster.
//
// pass masters[] to shardmaster.MakeClerk() so you can send
// RPCs to the shardmaster.
//
// make_end(servername) turns a server name from a
// Config.Groups[gid][i] into a labrpc.ClientEnd on which you can
// send RPCs. You'll need this to send RPCs to other groups.
//
// look at client.go for examples of how to use masters[]
// and make_end() to send RPCs to the group owning a specific Shard.
//
// StartServer() must return quickly, so it should start goroutines
// for any long-running work.
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
	labgob.Register(Op{})

	kv := new(ShardKV)
	kv.me = me
	kv.maxRaftState = maxraftstate
	kv.makeEnd = make_end
	kv.gid = gid
	kv.masters = masters

	// Your initialization code here.
	kv.index2Ch = make(map[int]chan Notification)
	kv.LastAppliedRequestId = make(map[int64]int)
	kv.ShardId2Shard = make([]Shard, shardmaster.NShards)
	kv.snapshotCh = make(chan []byte)

	// Use something like this to talk to the shardmaster:
	// kv.mck = shardmaster.MakeClerk(kv.masters)
	kv.masterClerk = shardmaster.MakeClerk(kv.masters)

	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)
	kv.rf.SetSnapshotCh(kv.snapshotCh)
	kv.rf.SetMaxRaftState(kv.maxRaftState)

	go kv.listener()
	go kv.pollConfig()

	return kv
}



版权声明:本文为weixin_43116322原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。