6.824 lab1

6.824简介

6.824为MIT的分布式系统课程。同TinyKV类似,6.824的主要内容也为实现Raft共识算法。以笔者做完Lab1的感受,相比TinyKV,6.824仅提供了基本框架,在测试用例当中并未对实现过程中所使用的数据结构进行具体要求,因此编程自由度较高。

相关资料

6.824 Home Page: Spring 2022

MIT6.824分布式系统课程中文翻译 - 知乎 (zhihu.com)

如何的才能更好地学习 MIT6.824 分布式系统课程? - 知乎 (zhihu.com)

【MIT 6.824 Distributed Systems Spring 2020 分布式系统 中文翻译版合集】_哔哩哔哩_bilibili

实验要求

实现一个分布式的MapReduce系统,实验文档中文翻译参照:

【翻译】6.824 lab1 (自用不负责) - Greenty - 博客园 (cnblogs.com)

MapReduce

MapReduce是Google提出的一个软件架构,用于大规模数据集的并行运算。MapReduce包含Map和Reduce两个步骤,可以用函数来表示。

  • Map函数读取一个文件,根据文件内容输出一个称为中间件的键值对列表。每个Map函数分别处理一个输入文件。
1
func Map(fileName string, content string) intermediates []KVPair
  • 所有Map函数输出的中间件集合在一起,作为完整的中间件。对于中间件中每个不同的key对应的所有键值对,分别执行一个Reduce函数。Reduce函数根据键值对列表的内容输出这个key对应的一个value。
1
func Reduce(key string, values []string) string

整个MapReduce流程的输入为文件列表,输出为键值对列表。

系统实现

MapReduce的分布式执行系统包含一个master(coordinator)和若干个worker。master负责任务分发、协调任务的运行,worker负责执行具体的map和reduce函数。master启动后,执行注册RPC的操作,使得其方法对worker可见。worker通过本地网络对master的方法进行远程调用。

注意:使用RPC包时,外部方法名称和被传输结构体当中的成员名称的首字母均需要大写,否则会出现数据无法正常传输的错误。

worker

由于master无法知道worker的存在,也无法知道worker是否故障,因此worker的主循环中需要反复向master请求分发任务,在任务完成之后需要向master发送完成信息,分别对应master的DispatchTaskHandleTaskDone两个方法。worker主循环如下,master返回TASK_WAIT时代表map任务未全部完成,worker等待一段时间之后再次向master请求可能的reduce任务或者超时的map任务。

1
2
3
4
5
6
7
8
9
10
11
12
for {
reply := SendRequest()
if reply == nil || reply.TaskType == TASK_NONE {
break
} else if reply.TaskType == TASK_MAP {
doMap(reply, mapf)
} else if reply.TaskType == TASK_REDUCE {
doReduce(reply, reducef)
} else { // TASK_WAIT
time.Sleep(100 * time.Millisecond)
}
}

master

master结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Coordinator struct {
// Your definitions here.
files []string
nReduce int
nodeId int // dispatch task with a node id to identify temp files generated by different nodes

mapStates []int // 0:not dispatched, 1:dispatched, 2:done, 3:dispatched_timeout
mapIds []int
mapDone bool
mapMutex sync.Mutex

reduceStates []int // 0:not dispatched, 1:dispatched, 2:done, 3:dispatched_timeout
reduceIds []int
reduceDone bool
reduceMutex sync.Mutex
}

master将每个map任务和每个reduce任务的状态分别保存与mapStatesreduceStates切片中,0表示未分发,1表示已分发但未完成,2表示完成,3表示超时(在下一节会详细介绍超时处理)。在DispatchTask方法中,master首先根据mapDone变量判断map是否完成,然后相应地遍历mapStatesreduceStates切片,找到第一个未分发的任务进行分发。由于两个切片和两个done变量可能同时被多个worker访问,需进行加锁。我用了mapMutexreduceMutex两把锁。

超时处理

超时检测

当master持续一段时间未收到来自worker的完成信息时,可认为对应的worker故障或者速度太慢,此时需要重新分发任务。可以通过新建一个goroutine进行后台计时。goroutine会在父函数返回的时候退出,因此不能在DispatchTask方法当中对于每个任务单独创建一个用于计时的goroutine,否则,当RPC调用返回的时候,这个goroutine也会随之结束。

由上,goroutine只能在MakeCoordinator函数中创建。出于简洁性,我实现了如下函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go func() {
for {
if c.Done() {
break
}
//fmt.Printf("checking timeout\n")
time.Sleep(10 * time.Second)
c.mapMutex.Lock()
for i, state := range c.mapStates {
if state == 1 {
c.mapStates[i] = 3
} else if state == 3 {
fmt.Printf("map timeout %v\n", i)
c.mapStates[i] = 0
}
}
c.mapMutex.Unlock()
c.reduceMutex.Lock()
for i, state := range c.reduceStates {
if state == 1 {
c.reduceStates[i] = 3
} else if state == 3 {
fmt.Printf("reduce timeout %v\n", i)
c.reduceStates[i] = 0
}
}
c.reduceMutex.Unlock()
}
}()

该goroutine每隔一段时间检查state的值,当state为1时,将其设为3。当state为3时视为超时。该实现的超时时间不是固定的,而是一个范围,但在实现上较为简洁。

临时文件处理

假设超时是由于节点速度过慢,当新节点被指派重复执行该任务的时候,新节点和旧节点产生的临时文件会冲突。因此,我在worker生成的临时文件的文件名当中加上了节点的ID以进行区分,当worker向master报告任务完成之后,master才将对应节点产生的临时文件重命名为所需要的形式。为了避免因某个任务的基础执行时间超过了超时阈值而导致的重复超时,master在接收到任意一个worker报告的完成信息时立即将该任务标记为完成,即使该worker已经超时。