MIT6.5840-2024 Lab1: MapReduce

MIT6.5840-2024 Lab1: MapReduce

Lab1 URL: http://nil.csail.mit.edu/6.5840/2024/labs/lab-mr.html

Your Job

Your job is to implement a distributed MapReduce, consisting of two programs, the coordinator and the worker. You should put your implementation in mr/coordinator.go, mr/worker.go, and mr/rpc.go.

  • Coordinator:
    • There will be just one coordinator process
    • Notice if a worker hasn’t completed its task in a reasonable amount of time and give the same task to a different worker.
  • Worker:
    • One or more worker processes executing in parallel
    • In a loop, ask the coordinator for a task,
    • Read the task’s input from one or more files, execute the task, write the task’s output to one or more files, and again ask the coordinator for a new task.
  • RPC:
    • The workers will talk to the coordinator via RPC.

首先要仔细阅读A few rules和Hints.

实验涉及的要点

  • RPC传输自定义数据结构要先注册。但是RPC没必要传整个Task,这样还得给gob编码器注册这个自定义类型。可以只传输必要信息,比如TaskID, TaskType等。
// 在 Go 语言中,init() 函数是一个特殊的函数,它会在​​包被导入时自动执行​​。这是 Go 语言设计的一个核心特性,用于完成包的初始化工作。
func init() {
gob.Register(&MapTask{}) // 注册 MapTask
gob.Register(&ReduceTask{}) // 注册 ReduceTask
}
  • Coordinator是必须是并发的(并发是指多个计算任务在同一时间段内交替执行),因为它需要同时处理多个Worker的请求。

  • 对Coordinator中的共享数据的操作需要原子操作互斥锁线程安全数据结构。例如completed++不是原子的,在底层,completed++分为 读取->修改->写入 三步,如果两个Worker同时执行completed++,可能导致计数错误(Data Race)。解决方案就是使用sync.Mutex保护completed或者改用atomic.AddInt32;Task队列可以用chan。Go语言的chan是线程安全的队列,可以安全地分发任务(无需锁)。

    • 原子操作 (Atomic Operation):是指不可分割的操作,即在执行过程中不会被其他操作中断。在并发编程中,原子操作用于确保共享数据的正确性。原子操作的特性:不可分割线程安全适用于简单操作
    • 复杂操作应该还是要加互斥锁。
    • Go的chan​​不仅线程安全,还支持阻塞(Blocking):阻塞指一个线程或协程在执行某个操作时,必须等待该操作完成才能继续执行后续代码。如果操作不能立即完成,线程会进入等待状态(暂停执行),直到条件满足(注意:使用select+default就是非阻塞读取了)。
  • 如果Task队列是阻塞的,那么Worker的轮询是多余的(chan已经实现了等待逻辑)

  • Coordinator并不能可靠地区分出crashed worker,有时候worker是alive的但是因为某些原因停止了,或者worker执行太慢了没有用。这时候最好的解决方法就是让coordinator等一定的时间,超过这个时间就放弃这个worker并且重新把这个任务交给不同的worker。这里可以让coordinator等10s,10s后没收到任务完成就假设worker挂了。

  • To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile (or os.CreateTemp if you are running Go 1.17 or later) to create a temporary file and os.Rename to atomically rename it.

  • TaskHandler(args, reply)里有一句reply.Task = task。RPC处理函数(TaskHandler())返回后,RPC框架会调用gob编码器序列化replygob编码器会递归访问task的所有字段并读取它们的值。如果另一个 goroutine(如 checkTimeouts)正在修改 task 的字段​​,就会导致​​并发读写冲突​​(data race)。即使在RPC处理函数里加锁也可能发生,因为锁只保护了reply.Task = task的赋值操作,而gob编码发生在RPC返回后(锁已释放),此时checkTimeOuts可能正在修改task的字段。这时只能用Task的深拷贝。

    • 锁​​的局限性​​:锁只能保护​显式的代码段​​,但 gob 编码是 RPC 框架隐式触发的,无法通过常规锁控制。
  • 除非全部任务完成,否则Worker不要退出。

Reference