MapReduce优化小结一般来说有结构上的优化和算法上的优化。结构上的优化【1】【2】就比如修改调度器(scheduler),修改key-value pairs,这样来减少key的数量进行shuffle,这样的优化就是使用诸如MapWritable进行存储,这样就可以减少key的存储数量。进行shuffle和sort的时候就更快。MapReduce的基本介绍【9】【3】【6】【12】。Hadoop的Context的基本使用【4】Context是MapReduce里面的数据流模型,比如我们把数据写入mapper,经过shuffle和sort以后,在reducer里面就可以对mapper写入的数据进行使用。在最后context的内容会写入输出文件。Text【14】(hadoop中的string API)
另外一种呢就是算法的优化,算法的优化可以使得精度降低,但是速度大大提高。我跑的程序在aws上花了90分钟,使用结构上的优化可以到达60分钟,但是算法上的优化最后在aws上跑了8分钟,真正跑的时间那就更少了。所以可以看出优化还是使用算法优化见效更快。
MapWritable的使用【5】【7】【8】
在Writable这里,我举一个我最近写的计算hashtag similarity的例子。包含了mapper、combiner、reduce。
Mapper的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.*;import java.lang.*;import mapred.util.Tokenizer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.mapreduce.Mapper;public class HashtagMapper extends Mapper<LongWritable, Text, Text, MapWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = Tokenizer.tokenize(line); MapWritable tags_map = new MapWritable(); Vector<String> normal_words = new Vector<String>(); for (String word : words) { if(word.startsWith("#")==true) { Text tag = new Text(word); IntWritable cnt_tag = new IntWritable(1); if (tags_map.containsKey(tag)) { IntWritable cnt = (IntWritable) tags_map.get(tag); cnt_tag.set(cnt.get() + 1); } tags_map.put(tag, cnt_tag); } else { normal_words.addElement(word); } } Integer cnt_norm = normal_words.size(); for(Integer idx1 = 0; idx1 < cnt_norm; idx1++) { String norm_word = normal_words.get(idx1); context.write(new Text(norm_word), tags_map); } }}Combiner的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.lang.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Reducer;public class HashtagCombiner extends Reducer<Text, MapWritable, Text, MapWritable> { @Override protected void reduce(Text key, Iterable<MapWritable> value, Context context) throws IOException, InterruptedException { MapWritable tags_map = new MapWritable(); for (MapWritable val:value) { if(val.isEmpty()==false) { for (Writable ele:val.keySet()) { IntWritable cnt = (IntWritable)val.get((Text)ele); if(tags_map.containsKey((ele))) { tags_map.put((Text)ele, new IntWritable(cnt.get() + ((IntWritable)tags_map.get((Text)ele)).get())); } else { tags_map.put((Text)ele, cnt); } } } } context.write(key, tags_map); }} Reducer的写法:
package mapred.hashtagsim;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.lang.*;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.MapWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;public class HashtagReducer extends Reducer<Text, MapWritable, Text, Text> { @Override protected void reduce(Text key, Iterable<MapWritable> value, Context context) throws IOException, InterruptedException { Map<String, Integer> counts = new HashMap<String, Integer>(); for (MapWritable val:value) { if(val.isEmpty()==false) { for (Writable ele:val.keySet()) { int cnt = ((IntWritable)val.get(ele)).get(); String ele_key = ((Text)ele).toString(); if(counts.containsKey(ele_key)) { cnt += counts.get(ele_key); } counts.put(ele_key, cnt); } } } StringBuilder builder = new StringBuilder(); Integer cnt = 0; if(counts.size() > 0) { for (Map.Entry<String, Integer> e : counts.entrySet()) { isWrite = true; builder.append(e.getKey() + ":" + e.getValue() + ";"); } } }} |