6.824 lab2a, 2b

简介

6.824 lab2a、2b要求实现raft算法的领导选举和日志追加部分。

相关资料

Raft算法

Raft算法的核心可参考Raft原论文In Search of an Understandable Consensus Algorithm的Figure 2,以下为中文翻译。

State

所有服务器上需要被持久化的状态(回复RPC之前在稳定存储介质中更新)

  • currentTerm:服务器看见的最新任期(第一次启动时初始化为0,单调增加)

  • votedFor:在当前任期收到的投票请求的候选者ID(若没有,则为null)

  • **log[]**:日志记录;每条记录包含用于状态机的指令,以及领导者收到此条记录时的任期(第一个下标为1)

所有服务器上可变的状态:

  • commitIndex:已知的被提交的最大下标(初始化为0,单调增加)

  • lastApplied:被应用到状态机的最大下标(初始化为0,单调增加)

领导者的可变状态(选举后重新初始化)

  • **nextIndex[]**:对于每个服务器,为即将发送给这个服务器的记录的下标(初始化为领导者最后一条记录的下标+1)

  • **matchIndex[]**:对于每个服务器,为已知的被复制到该服务器的最大下标(初始化为0,单调增加)

AppendEntries RPC

参数:

  • term:领导者任期

  • leaderId:使得参与者可以重定向客户端

  • prevLogIndex:新记录的直接前驱记录的下标

  • prevLogTerm:新纪录的直接前驱记录的任期

  • **entries[]**:新纪录(心跳则为空,为了效率可以发送多个记录)

  • leaderCommit:领导者的commitIndex

结果:

  • term:currentTerm,供领导者更新自己

  • success:如果参与者包含与prevLogIndex和prevLogTerm相匹配的记录,则为真

接收者实现:

  1. 如果term<currentTerm,返回假

  2. 如果不包含与prevLogIndex和prevLogTerm相匹配的记录,则返回假

  3. 如果一条现有的记录与新纪录冲突(下标相同但任期不同),删除这条现有的条目,以及所有后继的条目

  4. 将不在log当中的所有记录追加到log当中

  5. 如果leaderCommit>commitIndex,设置commitIndex=min(leaderCommit, 最后一条新记录的下标)

RequestVote RPC

参数:

  • term:候选者任期

  • candidateId:候选者ID

  • lastLogIndex:候选者最后一条记录的下标

  • lastLogTerm:候选者最后一条记录的任期

结果:

  • term:currentTerm,供候选者更新自己

  • voteGranted:为真意味着候选者收到一票

接收者实现:

  1. 如果term<currentTerm,返回假

  2. 如果votedFor为null或者candidateId,并且候选者的记录至少与接收者一样新(下面会定义),则投票

所有服务器的规则

所有服务器:

  • 如果commitIndex>lastApplied:递增lastApplied,将log[lastApplied]应用到状态机

  • 如果RPC请求或RPC回复中包含的任期T>currentTerm:设置currentTerm=T,转换为参与者

参与者:

  • 回复来自候选者和领导者的RPC

  • 如果在选举超时时间内,没有收到来自当前领导者的AppendEntries RPC或没有投票给候选者:转变为领导者

候选者:

  • 当转变为候选者时,开始选举:

    • 递增currentTerm

    • 投票给自己

    • 重置选举定时器

    • 发送RequestVote RPC给所有其他服务器

  • 如果收到来自大多数服务器的投票:转变为领导者

  • 如果收到来自新领导者的AppendEntries RPC:转变为参与者

  • 如果选举超时:开始新的选举

领导者:

  • 当选举之后:发送初始的空AppendEntries RPC(心跳)给每个服务器;在空闲时段内重复发送,以避免选举超时

  • 如果收到来自客户端的命令:追加命令到本地的log,在将命令应用到状态机之后回复客户端

  • 如果最后一条记录的下标>=某个参与者的nextIndex:发送由AppendEntries RPC,包含由nextIndex开始的记录

    • 如果成功:更新每个参与者的nextIndex和matchIndex

    • 如果由于记录的不一致而失败:递减nextIndex并重试

  • 如果存在一个N,使得N>commitIndex,大多数的matchIndex[i]>=N,且log[N].term == currentTerm:设置commitIndex=N

实现注意事项

  • RequestVote RPC中比较日志新旧的方法在5.4.1中提到:如果两个日志最后的记录的任期不同,则包含更新任期记录的日志更新;如果两份日志最后的记录的任期相同,则更长的日志更新

  • 日志记录的下标从1开始,为了方便访问和避免下标越界,每个服务器在启动时都会先包含一个空记录

  • 实验提供的代码模板当中,日志记录包含下标字段。但下标可用切片的下标直接代替,目前实验暂不需要用到下标字段。考虑到后续实验有可能会用到,仍使用该字段而不是切片下标作为日志记录的下标

  • 实验提供的代码模板当中的日志记录不包含任期字段,暂不清楚原因,实现中手动添加了该字段

  • 实验提供的代码模板中未包含领导者应用状态机之后回复客户端的方式,未实现

  • 所有服务器的规则2在每次处理RPC请求和收到RPC回复的时候均需要实现,容易遗漏

  • AppendEntries RPC的接收者实现步骤3、4可以合并为:删除prevLogIndex之后的所有记录,然后追加所有新记录。在后续持久化的实验当中可能需要再进行更改。步骤3的原意是prevLogIndex之后的记录与新记录可能存在部分匹配,暂未想到出现这种情况的场景。

实现方式

goroutine的使用

除了RPC和外部接口之外,系统的所有功能都通过goroutine实现。按照功能划分,这些goroutine可分为两种类型:监听型和触发型。

监听型goroutine

Raft算法包含一些需要在后台不断监听,在符合条件时执行的操作,包括:

  • 候选者和参与者的选举超时

  • 领导者的心跳发送

  • 领导者的AppendEntries RPC发送

  • 每个服务器日志记录的应用

  • 领导者commitIndex变量的更新

这些操作各自通过一个goroutine实现,在Make函数中创建,goroutine循环检查rf.killed()的值,为真时goroutine退出,否则,在一段时间间隔之后执行相应操作,在执行操作之前可能还需要检查动作触发条件是否满足,如服务器是否为某个状态。

触发型goroutine

Raft算法中包含一些由动作触发的过程,包括:

  • 候选者规则1、2、3

一般的函数调用是顺序执行的,调用者在函数返回前会阻塞。如果调用者是一个监听型goroutine,会导致系统暂时无法监听该功能。因此上述过程也需要通过goroutine实现,调用者与该goroutine进行通信,触发该功能运行,然后立即返回。

这种一个调用者触发一个或多个特定的被调用者执行的并行模型可以使用Go的sync.Cond包实现。数据结构定义如下:

1
2
3
cond       *sync.Cond
condLock sync.Mutex
wakeUpType int

包含一个sync.Cond类,一个锁和一个条件变量。教程中提到这个锁可以是互斥锁或读写锁,但我查阅了sync.Cond的实现,发现sync.Cond类中关联的锁只实现了Lock()Unlock()这两个方法,对应读写锁中的写锁操作,而无法执行读锁操作。因此我暂时还没搞明白如何搭配sync.Cond和读写锁。上述并行模型采用互斥锁即可实现,这个锁传入NewCond()方法与Cond类进行关联,可以单独定义,也可以使用cond.L字段。

初始化:

1
2
rf.cond = sync.NewCond(&rf.condLock)
rf.wakeUpType = NONE

 调用者实现:

1
2
3
4
5
6
if rf.state != LEADER {
rf.condLock.Lock()
rf.wakeUpType = NEW_ELECTION
rf.condLock.Unlock()
rf.cond.Broadcast()
}

调用者通过cond.Broadcast()方法唤醒所有被cond阻塞的goroutine,唤醒时无需锁的保护。

被调用者实现:

1
2
3
4
5
6
7
8
9
10
for rf.killed() == false {
rf.condLock.Lock()
for rf.wakeUpType != NEW_ELECTION {
rf.cond.Wait()
}
rf.wakeUpType = NONE
rf.condLock.Unlock()

[功能实现]
}

其中,cond.Wait()方法会释放锁,阻塞当前goroutine,直到被唤醒,被唤醒时会加锁,然后再次检查for循环的条件是否满足,如果满足,才跳出循环,释放锁,执行功能。此处的for循环是必要的,不能用if代替,因为Broadcast()方法会唤醒所有被cond阻塞的goroutine,但只有符合对应功能的goroutine需要被执行,其他goroutine需要再次阻塞。

锁的使用

服务器的状态用一把互斥锁rf.mu保护。

锁的使用注意事项:

  • 对服务器状态的任何读写操作均需要锁保护

  • 一个完整的过程需要全程用锁保护,以保证状态的一致性和过程的原子性。

  • RPC调用应当是一个非阻塞的过程,调用前应当解锁。调用后的服务器状态可能会发生变化(例如候选者发送选举请求后发现有其他任期更高的领导者而转变成参与者),应当在加锁之后检查服务器状态是否发生改变。

错误与调试

这个错误花了我比较长的时间debug,还伴随着其他错误:

— FAIL: TestFailAgree2B (13.48s)
config.go:609: one(106) failed to reach agreement

阅读调试信息后发现,“106”指令被重复多次地追加到参与者的日志当中,且一直没有被apply。判断为matchIndex的值错误导致commitIndex没有及时更新的问题。从而发现在AppendEntries RPC返回之后,nextIndex被错误地写成了加1,而不是加上AppendEntries RPC参数中log的长度,导致matchIndex的值错误。