首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

MapReduce优化小结

MapReduce优化小结

MapReduce优化小结一般来说有结构上的优化和算法上的优化。结构上的优化【1】【2】就比如修改调度器(scheduler),修改key-value pairs,这样来减少key的数量进行shuffle,这样的优化就是使用诸如MapWritable进行存储,这样就可以减少key的存储数量。进行shuffle和sort的时候就更快。MapReduce的基本介绍【9】【3】【6】【12】。Hadoop的Context的基本使用【4】Context是MapReduce里面的数据流模型,比如我们把数据写入mapper,经过shuffle和sort以后,在reducer里面就可以对mapper写入的数据进行使用。在最后context的内容会写入输出文件。Text【14】(hadoop中的string API)
另外一种呢就是算法的优化,算法的优化可以使得精度降低,但是速度大大提高。我跑的程序在aws上花了90分钟,使用结构上的优化可以到达60分钟,但是算法上的优化最后在aws上跑了8分钟,真正跑的时间那就更少了。所以可以看出优化还是使用算法优化见效更快。
MapWritable的使用【5】【7】【8】
在Writable这里,我举一个我最近写的计算hashtag similarity的例子。包含了mapper、combiner、reduce。
Mapper的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.*;import java.lang.*;import mapred.util.Tokenizer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.mapreduce.Mapper;public class HashtagMapper extends Mapper<LongWritable, Text, Text, MapWritable> {    @Override    protected void map(LongWritable key, Text value,            Context context)            throws IOException, InterruptedException {        String line = value.toString();        String[] words = Tokenizer.tokenize(line);        MapWritable tags_map = new MapWritable();        Vector<String> normal_words = new Vector<String>();        for (String word : words) {            if(word.startsWith("#")==true) {                Text tag = new Text(word);                IntWritable cnt_tag = new IntWritable(1);                if (tags_map.containsKey(tag)) {                    IntWritable cnt = (IntWritable) tags_map.get(tag);                    cnt_tag.set(cnt.get() + 1);                }                tags_map.put(tag, cnt_tag);            }            else {                normal_words.addElement(word);            }        }        Integer cnt_norm = normal_words.size();        for(Integer idx1 = 0; idx1 < cnt_norm; idx1++) {            String norm_word = normal_words.get(idx1);            context.write(new Text(norm_word), tags_map);         }    }}Combiner的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.lang.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Reducer;public class HashtagCombiner extends Reducer<Text, MapWritable, Text, MapWritable> {    @Override    protected void reduce(Text key, Iterable<MapWritable> value,            Context context)            throws IOException, InterruptedException {              MapWritable tags_map = new MapWritable();        for (MapWritable val:value) {            if(val.isEmpty()==false) {                for (Writable ele:val.keySet()) {                    IntWritable cnt = (IntWritable)val.get((Text)ele);                    if(tags_map.containsKey((ele))) {                        tags_map.put((Text)ele, new IntWritable(cnt.get() + ((IntWritable)tags_map.get((Text)ele)).get()));                     }                    else {                        tags_map.put((Text)ele, cnt);                       }                }               }        }        context.write(key, tags_map);       }}       Reducer的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.lang.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;public class HashtagReducer extends Reducer<Text, MapWritable, Text, Text> {    @Override    protected void reduce(Text key, Iterable<MapWritable> value,            Context context)            throws IOException, InterruptedException {              Map<String, Integer> counts = new HashMap<String, Integer>();        for (MapWritable val:value) {            if(val.isEmpty()==false) {                for (Writable ele:val.keySet()) {                    int cnt = ((IntWritable)val.get(ele)).get();                    String ele_key = ((Text)ele).toString();                    if(counts.containsKey(ele_key)) {                        cnt += counts.get(ele_key);                    }                    counts.put(ele_key, cnt);                }               }        }        StringBuilder builder = new StringBuilder();        Integer cnt = 0;        if(counts.size() > 0) {            for (Map.Entry<String, Integer> e : counts.entrySet()) {                isWrite = true;                builder.append(e.getKey() + ":" + e.getValue() + ";");              }        }    }}
返回列表