首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

五种基于 MapReduce 的并行计算框架介绍及性能测试(7)

五种基于 MapReduce 的并行计算框架介绍及性能测试(7)

Mars                MapReduceMars 框架中,Map 和 Reduce 的处理阶段都在 GPU 内进行,Map 和 Reduce 的分割数据阶段都在 CPU 内进行,这是与其他基于 CPU 的                MapReduce 框架的最大不同。Mars 更多的是利用 CPU、GPU 缓存来替代内存,执行数据分割、处理过程。
具体的 Word count 的流程如下所示:
  • 准备 key/value 键值对,将这些键值对存储在数组里面;
  • 初始化 MapReduce 上下文,设置参数 (根据不同的 GPU 需要根据 CUDA 核心数目设置并发线程数);
  • 数据预处理,首先打开文件,将文件所有内容读入内存,然后申请一块同文件大小的显存,将文件内容中小写字符转为大写 (这样的影响 word,Word                    算通一个单词)。
  • 开始 MapReduce 阶段。根据并发线程数和文件大写切换内存中的文件,每块切分后的任务记录下该任务在内存中的偏移位置和长度视为 value,                    显存的指针地址视为 key,将任务添加的任务池。将处理后的内存内容复制到刚刚申请的显存中。接着开始 Map 流程,将内存中的任务池复制到显存,申请显存块用于存放                    Map 产生的数据,开启多线程并发执行用户定义的 map 流程 MAP_COUNT_FUNC,这个是 Mars 由于 GPU 程序的特殊性而设计的,用于记录                    map 产生的 key 和 value 的长度 (sizeof)。调用 MAP_FUNC 方法,输入任务记录,输出单词以及单词所在的位置;
  • 如果 noSort 是 F,对结果排序;
  • 如果 noReduce 是 F,GPU 开始 reduce 阶段,生成最终的结果集。否则,立即输出最后的结果集;
  • 结果输出,从 GPU 设备拷贝最终的结果集到内存,然后输出到屏幕。
通过上述的 7 个步骤,WordCount 的计算过程全部完成并且输出结果集。
清单 7 . Mars 的 Map                    程序段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#ifndef __MAP_CU__
#define __MAP_CU__
#include "MarsInc.h"
#include "global.h"
__device__ int hash_func(char* str, int len)
{
int hash, i;
for (i = 0, hash=len; i < len; i++)
hash = (hash<<4)^(hash>>28)^str;
return hash;
}
__device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* ptrBuf = pKey->file + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
int lsize = 0;
int wsize = 0;
char* start = ptrBuf;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
if (wsize > 6)
{
//printf("%s, wsize:%d\n", start, wsize);
EMIT_INTER_COUNT_FUNC(wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
__device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* filebuf = pKey->file;
char* ptrBuf = filebuf + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
char* start = ptrBuf;
int lsize = 0;
int wsize = 0;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
int* o_val = (int*)GET_OUTPUT_BUF(0);
*o_val = wsize;
if (wsize > 6)
{
//printf("%s, %d\n", start, wsize);
EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
#endif //__MAP_CU__




清单 8 . Mars 的                    Reduce 程序段
1
2
3
4
5
6
7
8
9
10
#ifndef __REDUCE_CU__
#define __REDUCE_CU__
#include "MarsInc.h"
__device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
__device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
#endif //__REDUCE_CU__

返回列表