MIT 6.824 2018 Lab 1: MapReduce

不愧是MIT,课程质量确实非常高,每节课都明确列出了preparation,读完preparation再读lecture,学习效率高了很多。别看lecture就是简单的txt文件,里面可以说都是浓缩的精华,条理很清晰。后悔没早点学习这个课程!

本文主要对Lab 1做一个总结,所有代码均通过测试,可以在这里查看.

MapReduce

MapReduce的目标是让那些没有并行和分布式系统经验的程序员,可以轻松地写出可以在多台机器上并行处理大规模数据的程序. 程序员不需要关心划分数据、程序调度、故障恢复和机器间通讯等细节,只需要编写MapReduce的顺序代码,就可以利用大规模分布式系统并行执行程序进行数据处理.

MapReduce就是一个编程模型.

Map函数接收一组键值对,处理后输出另一组键值对作为中间结果,MapReduce库会将同一个键的中间值合并到一起,然后传给Reduce函数.

Reduce函数接收一个键及其值的集合,然后对值进行处理,得到最终结果.

抽象出来如下所示:

现实生活中有很多场景适用于MapReduce编程模型,最经典的就是WordCount了,其他还有建立倒排索引、统计URL访问次数、匹配模式等场景.

Lab 1

Part I: Map/Reduce input and output

这部分需要完成doMap()和doReduce()函数的代码编写.

doMap()对应一个map task,一个map task对应一个输入文件,流程如下:

  • 读取输入文件的内容
  • 调用mapF()得到中间结果的键值对集合
  • 设reduce task的个数为R,每个map task都要将中间结果分为R个中间文件保存,调用reduceName()得到R个中间文件的文件名
  • 根据hash值,将每个键分配给对应的中间文件
  • 将R个中间文件编码为SON格式并保存到文件系统中

doReduce()对应一个reduce task,每个map task的R个中间文件中,都有一个中间文件作为当前的reduce task的输入文件,设map task有M个,则一个reduce task有M个输入文件流程如下:

  • 调用reduceName()得到M个中间文件的文件名
  • 读取并解码M个JSON格式的中间文件,得到中间结果的键值对
  • 根据键排序
  • 对每个键调用reduceF()得到最终结果,编码为JSON格式保存到文件系统

Part II: Single-worker word count

这一部分需要实现WordCount的mapF()和ReduceF()函数.

mapF()将文件内容分割为单词集合,每个单词作为键,值都为1,将键值对集合作为结果返回.

reduceF()将一个键对应的值的集合相加,作为结果返回.

Part III: Distributing MapReduce tasks

前面两部分是顺序执行map和reduce任务的,这一部分要实现并行执行map和reduce任务,先并行执行map task,再并行执行reduce task.

schedule()负责为map和reduce任务启动一个goroutine,用RPC框架调用worker执行任务,这里用channel获得worker的RPC地址,worker利用单机的多核CPU并行执行任务.

Part IV: Handling worker failures

Part IV就是在Part III的基础上增加了worker执行失败时的处理逻辑,如果call()返回false,那么就继续循环等待下一个可用的worker继续执行,直到call()返回trueschedule.go的完整代码如下所示,将代码中for循环中的代码改为无论执行成功与否,都只执行一次,就是Part III部分的代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package mapreduce
import (
"fmt"
"sync"
)
//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
wg.Add(1)
args := DoTaskArgs{JobName: jobName, Phase: phase, TaskNumber: i, NumOtherPhase: n_other}
if phase == mapPhase {
args.File = mapFiles[i]
}
go func(registerChan chan string, args DoTaskArgs) {
defer wg.Done()
// the worker may fails, so keep looping until the task complete
for {
worker := <-registerChan
msg := call(worker, "Worker.DoTask", &args, nil)
if msg {
// must use `go` statement(why???)
go func() {
registerChan <- worker
}()
break
}
}
}(registerChan, args)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}

这里要特别注意,当worker成功执行完任务后,要将其RPC地址再次加入channel中,但是这一步必须用另启一个goroutine执行,不然会导致最后一个任务的goroutine卡在registerChan <- worker这一步,导致map phase的WaitGroup一直处于等待状态. 这是因为存在这样一种情况:worker是并行执行的,当倒数第二个任务调用的worker执行完之前,最后一个任务就能从channel里得到另一个worker开始执行,这样当倒数第二个任务执行完时,已经向channel里输入了对应的worker,但是这时已经没有任务需要调用worker了,所以当最后一个任务执行完时,想要将对应的worker输入到channel时,必须等待channel输出之前的worker,所以因为channel的阻塞导致最后一个任务的goroutine一直处于阻塞状态.

Part V: Inverted index generation

这一部分需要实现倒排索引的生成,要编写main/ii.go中的mapF()和reduceF()函数,理解了WordCount的处理流程,就很简单,原理是一样的,只不过mapF()生成的键值对中,键是单词,值是单词所在文档的文档名,reduceF()的返回值是包含当前单词的文档数以及这些文档名构成的字符串.


版权声明

作者:萝卜姓胡
许可协议:Creative Commons Attribution-ShareAlike 4.0 International License
本文永久链接:http://hw2007.com/2018/11/10/MIT-6-824-2018-Lab-1-MapReduce/