首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

五种基于 MapReduce 的并行计算框架介绍及性能测试(5)Disco WordCount 实验

五种基于 MapReduce 的并行计算框架介绍及性能测试(5)Disco WordCount 实验

Disco WordCount                实验MapReduce 框架由于 Disco 有分布式文件系统存在,所以一般情况下都不会单独使用,都是从分布式文件系统内取数据后读入内存,然后再切分数据、进入                MapReduce 阶段。首先需要调用 ddfs 的 chunk 命令把文件上传到 DDFS,然后开始编写 MapReduce 程序,Disco 外层应用程序采用                Python 编写。Map 程序实例如清单 3 所示,Reduce 程序示例如清单 4 所示。
清单 3 . Map 程序段
1
2
3
def fun_map(line, params):
for word in line.split():
yield word, 1




清单 4 . Reduce                    程序段
1
2
3
4
def fun_reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)




清单 5 . Map/Reduce                    任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
map=map,
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
Note

返回列表