思路
-
每个leader负责poll新的config,将一个config发到raft中,在
applyCh
收到该config后更新配置; - leader依次poll下一个编号的config,并且只有在所有需要发送的shard已经被接收方接收、所有需要接收的shard已经收到后才会apply下一个config;
- 接收shard、删除shard操作都需要写到raft日志中,让一个group里的majority都执行同样的操作;
-
如果一个接收者5秒内没有接收一个shard(即发送
AddShard
RPC失败),则默认该服务器已经接收这个shard并且离线。实际生产中更实际的做法是将该shard保存起来以后再尝试发送; -
为了高效地查找某一个shard对应的所有k/v,将lab3中的
db
分割为
shardId2Shard map[int]Shard
,根据shardId查找一个shard。
实现
db.go
type Shard struct {
Data map[string]string
ConfigNum int // what version this Shard is in
}
common.go
package shardkv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each Shard.
// Shardmaster may change Shard assignment from time to time.
//
// You will have to modify these definitions.
//
const (
OK Err = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongGroup = "ErrWrongGroup"
ErrWrongLeader = "ErrWrongLeader"
ShardNotArrived = "ShardNotArrived"
ConfigNotArrived = "ConfigNotArrived"
)
const (
Put Operation = "Put"
Append = "Append"
Get = "Get"
Config = "Config"
AddShard = "AddShard"
RemoveShard = "RemoveShard"
)
type Operation string
type Err string
// Put or Append
type PutAppendArgs struct {
// You'll have to add definitions here.
Key string
Value string
Op Operation // "Put" or "Append"
ClientId int64
RequestId int
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
ClientId int64
RequestId int
}
type GetReply struct {
Err Err
Value string
}
type AddShardArg struct {
LastAppliedRequestId map[int64]int // for receiver to update its state
ShardId int
Shard Shard // Shard to be sent
ClientId int64
RequestId int
}
type AddShardReply struct {
Err Err
}
server.go
package shardkv
import "../shardmaster"
import (
"../labrpc"
"bytes"
"log"
"time"
)
import "../raft"
import "sync"
import "../labgob"
const (
Timeout = 800 * time.Millisecond // request timeout duration
PollPeriod = 100 * time.Millisecond // poll configuration period
SendPeriod = 180 * time.Millisecond // interval between sending a shard to a group
CheckReceivePeriod = 200 * time.Millisecond // interval between checking received shards
)
// Op is used to indicate an operation on application
// Op is passed to raft and is persisted in its log
// fields in an Op struct must not be changed once it's passed to raft
type Op struct {
ClientId int64
RequestId int
Type Operation // type of operation is either get/put/append/Config
// field used for client put/get/append requests
Key string
Value string
// field used for changing configuration
Config shardmaster.Config
// field used for shard migration add/remove
ShardId int
Shard Shard
LastAppliedRequestId map[int64]int // for receiver to update its state
}
// Notification is used to wake waiting RPC caller after Op arrived from applyCh
type Notification struct {
ClientId int64
RequestId int
Err Err
}
type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
makeEnd func(string) *labrpc.ClientEnd
gid int
masters []*labrpc.ClientEnd
maxRaftState int // snapshot if log grows this big
Config shardmaster.Config // current configuration
LastConfig shardmaster.Config // last configuration, server needs both configurations to judge what shard to send/receive
snapshotCh chan []byte // snapshotCh is used to send snapshot to underlying raft
ShardId2Shard []Shard // ShardId -> Shard, data == nil if this server is not in charge of a shard in Config/LastConfig
index2Ch map[int]chan Notification // log index -> notification channel, to notify waiting goroutine
LastAppliedRequestId map[int64]int // clientId -> last applied request id. Detect duplicate client request to avoid applying a duplicated command
masterClerk *shardmaster.Clerk // masterClerk is a client used to contact shard master
}
// pollConfig periodically ask shard master for next configuration relative to current configuration.
// pollConfig will check whether this server has completed all task required by current configuration before
// moving on to the next configuration. If not, pollConfig will pick up this job and get it done.
func (kv *ShardKV) pollConfig() {
kv.mu.Lock()
gid := kv.gid
oldConfig := kv.Config
rf := kv.rf
kv.mu.Unlock()
for {
// only leader needs to deal with configuration tasks
if _, isLeader := rf.GetState(); !isLeader {
time.Sleep(Timeout)
continue
}
kv.mu.Lock()
if !kv.allSent() {
lastApplied := make(map[int64]int)
for k, v := range kv.LastAppliedRequestId {
lastApplied[k] = v
}
for shardId, gid := range kv.LastConfig.Shards {
// need to send this shard away
if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.ShardId2Shard[shardId].ConfigNum < kv.Config.Num {
shardCopy := Shard{
Data: make(map[string]string),
ConfigNum: kv.Config.Num,
}
for k, v := range kv.ShardId2Shard[shardId].Data {
shardCopy.Data[k] = v
}
args := AddShardArg{
LastAppliedRequestId: lastApplied,
ShardId: shardId,
Shard: shardCopy,
ClientId: int64(gid),
RequestId: kv.Config.Num,
}
var reply AddShardReply
// shardId -> gid -> server names
serverNames := kv.Config.Groups[kv.Config.Shards[shardId]]
servers := make([]*labrpc.ClientEnd, len(serverNames))
for i, name := range serverNames {
servers[i] = kv.makeEnd(name)
}
go func(servers []*labrpc.ClientEnd, args *AddShardArg, reply *AddShardReply) {
index := 0
start := time.Now()
for {
ok := servers[index].Call("ShardKV.AddShard", args, reply)
DPrintf("[%d-%d] calls %s's AddShardRPC, args = %v, reply = %v", kv.gid, kv.me, serverNames[index], args, reply)
if ok && reply.Err == OK || time.Now().Sub(start) >= 5 * time.Second {
// this shard is successfully sent to destination
// start a command and remove this shard
kv.mu.Lock()
command := Op{
Type: RemoveShard,
ClientId: int64(kv.gid),
RequestId: kv.Config.Num,
ShardId: args.ShardId,
}
kv.mu.Unlock()
kv.startCommand(command, Timeout)
break
}
index = (index + 1) % len(servers)
if index == 0 {
time.Sleep(SendPeriod)
}
}
}(servers, &args, &reply)
}
}
kv.mu.Unlock()
time.Sleep(PollPeriod)
continue
}
if !kv.allReceived() {
kv.mu.Unlock()
time.Sleep(CheckReceivePeriod)
continue
}
// current configuration is configured, poll for the next configuration
oldConfig = kv.Config
masterClerk := kv.masterClerk
kv.mu.Unlock()
newConfig := masterClerk.Query(oldConfig.Num + 1)
if newConfig.Num != oldConfig.Num+1 {
// fail to get next configuration
time.Sleep(PollPeriod)
continue
}
command := Op{
Type: Config,
ClientId: int64(gid),
RequestId: newConfig.Num,
Config: newConfig,
}
kv.startCommand(command, Timeout)
}
// this server successfully applies new configuration
// tasks required by this configuration will be tackle next loop
}
// check if all shards required sending is sent out
func (kv *ShardKV) allSent() bool {
for shard, gid := range kv.LastConfig.Shards {
// are shards sent?
if gid == kv.gid && kv.Config.Shards[shard] != kv.gid && kv.ShardId2Shard[shard].ConfigNum < kv.Config.Num {
return false
}
}
return true
}
// check if all shards required receiving is arrived
func (kv *ShardKV) allReceived() bool {
for shard, gid := range kv.LastConfig.Shards {
// are shards received?
if gid != kv.gid && kv.Config.Shards[shard] == kv.gid && kv.ShardId2Shard[shard].ConfigNum < kv.Config.Num {
return false
}
}
return true
}
// move shards from caller to this server
func (kv *ShardKV) AddShard(args *AddShardArg, reply *AddShardReply) {
command := Op{
Type: AddShard,
ClientId: args.ClientId,
RequestId: args.RequestId,
ShardId: args.ShardId,
Shard: args.Shard,
LastAppliedRequestId: args.LastAppliedRequestId,
}
reply.Err = kv.startCommand(command, Timeout)
return
}
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.ShardId2Shard[shardId].Data == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
Type: Get,
ClientId: args.ClientId,
RequestId: args.RequestId,
Key: args.Key,
}
err := kv.startCommand(command, Timeout)
if err != OK {
reply.Err = err
return
}
kv.mu.Lock()
// double check if configuration has changed while we didn't hold the lock
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.ShardId2Shard[shardId].Data == nil {
reply.Err = ShardNotArrived
} else {
reply.Err = OK
reply.Value = kv.ShardId2Shard[shardId].Data[args.Key]
}
kv.mu.Unlock()
return
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.ShardId2Shard[shardId].Data == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
Type: args.Op,
ClientId: args.ClientId,
RequestId: args.RequestId,
Key: args.Key,
Value: args.Value,
}
reply.Err = kv.startCommand(command, Timeout)
return
}
func (kv *ShardKV) startCommand(command Op, timeoutPeriod time.Duration) Err {
kv.mu.Lock()
index, _, isLeader := kv.rf.Start(command)
if !isLeader {
kv.mu.Unlock()
return ErrWrongLeader
}
if _, ok := kv.index2Ch[index]; !ok {
kv.index2Ch[index] = make(chan Notification, 1)
}
ch := kv.index2Ch[index]
kv.mu.Unlock()
select {
case no := <-ch:
kv.mu.Lock()
delete(kv.index2Ch, index)
if no.RequestId != command.RequestId || no.ClientId != command.ClientId {
// One way to do this is for the server to detect that it has lost leadership,
// by noticing that a different request has appeared at the index returned by Start()
kv.mu.Unlock()
return ErrWrongLeader
}
kv.mu.Unlock()
return no.Err
case <-time.After(timeoutPeriod):
return ErrWrongLeader
}
}
// listener is in charge of reading from applyCh and execute state-changing commands
func (kv *ShardKV) listener() {
for msg := range kv.applyCh {
//DPrintf("listener: %v", msg)
if msg.CommandValid == false {
if msg.CommandType == raft.InstallSnapshot {
kv.mu.Lock()
snapshot := msg.Command.([]byte)
kv.installSnapshot(snapshot)
kv.mu.Unlock()
} else if msg.CommandType == raft.TakeSnapshot {
kv.mu.Lock()
snapshot := kv.snapshot()
kv.mu.Unlock()
kv.snapshotCh <- snapshot
}
continue
}
kv.mu.Lock()
op := msg.Command.(Op)
no := Notification{
ClientId: op.ClientId,
RequestId: op.RequestId,
Err: OK,
}
// only client requests need duplication detection
if op.Type == Put || op.Type == Get || op.Type == Append {
shardId := key2shard(op.Key)
if kv.Config.Shards[shardId] != kv.gid {
// this server doesn't serve requested Shard
no.Err = ErrWrongGroup
} else if kv.ShardId2Shard[shardId].Data == nil {
// this shard is not yet arrived
no.Err = ShardNotArrived
} else {
lastAppliedRequestId, ok := kv.LastAppliedRequestId[op.ClientId]
if !ok || lastAppliedRequestId < op.RequestId {
kv.LastAppliedRequestId[op.ClientId] = op.RequestId
switch op.Type {
case Put:
kv.ShardId2Shard[shardId].Data[op.Key] = op.Value
case Append:
kv.ShardId2Shard[shardId].Data[op.Key] += op.Value
case Get:
// do nothing, wake waiting RPC goroutine to return requested value
default:
log.Fatalf("invalid command type: %v.", op.Type)
}
}
}
} else {
// request from server of other group
switch op.Type {
case Config:
kv.doConfig(op)
case AddShard:
if kv.Config.Num < op.RequestId {
no.Err = ConfigNotArrived
break
}
kv.doAddShard(op)
case RemoveShard:
// remove operation is from previous Config
kv.doRemoveShard(op)
default:
log.Fatalf("invalid command type: %v.", op.Type)
}
}
// although this is a command that applied before
// some client may be waiting for its result, same command but append to log twice
// we should send message to the waiting goroutine anyway
ch, ok := kv.index2Ch[msg.CommandIndex]
kv.mu.Unlock()
if ok {
ch <- no
}
}
}
// doConfig updates server's current configuration
// shard sending and removing need to be handle by pollConfig
func (kv *ShardKV) doConfig(op Op) {
oldConfig := kv.Config
newConfig := op.Config
if oldConfig.Num >= newConfig.Num {
return
}
for shard, gid := range newConfig.Shards {
if gid == kv.gid && oldConfig.Shards[shard] == 0 {
// this is a Shard not assigned to any group before, simply create it
kv.ShardId2Shard[shard].Data = make(map[string]string)
kv.ShardId2Shard[shard].ConfigNum = newConfig.Num
}
}
kv.LastConfig = oldConfig
kv.Config = newConfig
DPrintf("[%d-%d] installs a new Config, oldConfig = %v, newConfig = %v",
kv.gid, kv.me, oldConfig, newConfig)
}
func (kv *ShardKV) doAddShard(op Op) {
// this shard is added or it is an outdated command
if kv.ShardId2Shard[op.ShardId].Data != nil || op.Shard.ConfigNum < kv.Config.Num {
return
}
DPrintf("[%d-%d] adds a shard, shardId = %d, shard = %v, op.RequestId = %d, kv.ShardId2Shard[%d].ConfigNum = %d",
kv.gid, kv.me, op.ShardId, op.Shard, op.RequestId, op.ShardId, op.Shard.ConfigNum)
// make a copy of received shard because shard in Op struct needs to be logged and persisted
shard := Shard{
Data: make(map[string]string),
ConfigNum: op.Shard.ConfigNum,
}
for k, v := range op.Shard.Data {
shard.Data[k] = v
}
kv.ShardId2Shard[op.ShardId] = shard
// update duplication detection state
for clientId, requestId := range op.LastAppliedRequestId {
if r, ok := kv.LastAppliedRequestId[clientId]; !ok || r < requestId {
kv.LastAppliedRequestId[clientId] = requestId
}
}
}
func (kv *ShardKV) doRemoveShard(op Op) {
if op.RequestId < kv.Config.Num {
return
}
DPrintf("[%d-%d] removes shard = %d, Config = %v, kv.ShardId2Shard[%d].ConfigNum = %d",
kv.gid, kv.me, op.ShardId, kv.Config, op.ShardId, kv.Config.Num)
kv.ShardId2Shard[op.ShardId].Data = nil
kv.ShardId2Shard[op.ShardId].ConfigNum = op.RequestId
}
// get snapshot data of kvserver
func (kv *ShardKV) snapshot() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
err := e.Encode(kv.ShardId2Shard)
err = e.Encode(kv.LastAppliedRequestId)
err = e.Encode(kv.maxRaftState)
err = e.Encode(kv.Config)
err = e.Encode(kv.LastConfig)
if err != nil {
log.Fatalf("[%d-%d] fails to take snapshot.", kv.gid, kv.me)
}
return w.Bytes()
}
// install a given snapshot
func (kv *ShardKV) installSnapshot(snapshot []byte) {
if snapshot == nil || len(snapshot) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var Id2Shard []Shard
var LastAppliedRequestId map[int64]int
var MaxRaftState int
var Config, LastConfig shardmaster.Config
if d.Decode(&Id2Shard) != nil || d.Decode(&LastAppliedRequestId) != nil ||
d.Decode(&MaxRaftState) != nil || d.Decode(&Config) != nil || d.Decode(&LastConfig) != nil {
log.Fatalf("[%d] fails to read persistent snapshot", kv.me)
} else {
kv.ShardId2Shard = Id2Shard
kv.LastAppliedRequestId = LastAppliedRequestId
kv.maxRaftState = MaxRaftState
kv.Config = Config
kv.LastConfig = LastConfig
DPrintf("[%d-%d] installs snapshot, db = %v, Config = %v, LastConfig = %v", kv.gid, kv.me, Id2Shard, kv.Config, kv.LastConfig)
}
}
// the tester calls Kill() when a ShardKV instance won't
// be needed again. you are not required to do anything
// in Kill(), but it might be convenient to (for example)
// turn off debug output from this instance.
func (kv *ShardKV) Kill() {
kv.rf.Kill()
// Your code here, if desired.
}
// servers[] contains the ports of the servers in this group.
me is the index of the current server in servers[].
//
// the k/v server should store snapshots through the underlying Raft
// implementation, which should call persister.SaveStateAndSnapshot() to
// atomically save the Raft state along with the snapshot.
//
// the k/v server should snapshot when Raft's saved state exceeds
// maxRaftState bytes, in order to allow Raft to garbage-collect its
// log. if maxRaftState is -1, you don't need to snapshot.
//
// gid is this group's GID, for interacting with the shardmaster.
//
// pass masters[] to shardmaster.MakeClerk() so you can send
// RPCs to the shardmaster.
//
// make_end(servername) turns a server name from a
// Config.Groups[gid][i] into a labrpc.ClientEnd on which you can
// send RPCs. You'll need this to send RPCs to other groups.
//
// look at client.go for examples of how to use masters[]
// and make_end() to send RPCs to the group owning a specific Shard.
//
// StartServer() must return quickly, so it should start goroutines
// for any long-running work.
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(ShardKV)
kv.me = me
kv.maxRaftState = maxraftstate
kv.makeEnd = make_end
kv.gid = gid
kv.masters = masters
// Your initialization code here.
kv.index2Ch = make(map[int]chan Notification)
kv.LastAppliedRequestId = make(map[int64]int)
kv.ShardId2Shard = make([]Shard, shardmaster.NShards)
kv.snapshotCh = make(chan []byte)
// Use something like this to talk to the shardmaster:
// kv.mck = shardmaster.MakeClerk(kv.masters)
kv.masterClerk = shardmaster.MakeClerk(kv.masters)
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.rf.SetSnapshotCh(kv.snapshotCh)
kv.rf.SetMaxRaftState(kv.maxRaftState)
go kv.listener()
go kv.pollConfig()
return kv
}
版权声明:本文为weixin_43116322原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。