用 Linux 和 Apache Hadoop 进行云计算(3)
- UID
- 1066743
|
用 Linux 和 Apache Hadoop 进行云计算(3)
创建 MapReduce 应用程序 MapReduce 应用程序必须具备 “映射” 和 “缩减” 的性质,也就是说任务或作业可以分割为小片段以进行并行处理。然后,可以缩减每个子任务的结果,得到原任务的结果。这种任务之一是网站关键字搜索。搜索和抓取任务可以分割为子任务并分配给从节点,然后在主节点上聚合所有结果并得到最终结果。
试用示例应用程序Hadoop 附带一些用于测试的示例应用程序。其中之一是单词计数器,它统计某一单词在几个文件中出现的次数。通过运行这个应用程序检查 Hadoop 集群。
首先,把输入文件放在分布式文件系统中(conf/ 目录下面)。我们将统计单词在这些文件中出现的次数。
1
| $ bin/hadoop fs –put conf input
|
然后,运行这个示例应用程序,以下命令统计以 “dfs” 开头的单词出现的次数。
1
| $ bin/hadoop jar hadoop-*-examples.jar grep input output 'dfs[a-z.]+'
|
命令的输出说明映射和缩减过程。
前两个命令会在 HDFS 中生成两个目录,“input” 和 “output”。可以使用以下命令列出它们。
查看分布式文件系统中已经输出的文件。它以键-值对的形式列出以 “dfs*” 开头的单词出现的次数。
1
| $ bin/hadoop fs -cat ouput/*
|
现在,访问 JobTracker 站点查看完成的作业日志。
创建 Log Analyzer MapReduce 应用程序现在创建一个 Portal (IBM WebSphere® Portal v6.0) Log Analyzer 应用程序,它与 Hadoop 中的 WordCount 应用程序有许多共同点。这个分析程序搜索所有 Portal 的 SystemOut*.log 文件,显示在特定的时间段内应用程序在 Portal 上启动了多少次。
在 Portal 环境中,所有日志分割为 5MB 的片段,很适合由几个节点并行地分析。
hadoop.sample.PortalLogAnalyzer.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| publicclass PortalLogAnalyzer {
publicstaticclass Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
privatestatic String APP_START_TOKEN = "Application started:";
private Text application = new Text();
publicvoid map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
if(line.indexOf(APP_START_TOKEN) > -1) {
int startIndex = line.indexOf(APP_START_TOKEN);
startIndex += APP_START_TOKEN.length();
String appName = line.substring(startIndex).trim();
application.set(appName);
output.collect(application, new IntWritable(1));
}
}
}
publicstaticclass Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
publicvoid reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while(values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
publicstaticvoid main(String[] args) throws IOException {
JobConf jobConf = new JobConf(PortalLogAnalyzer.class);
jobConf.setJobName("Portal Log Analizer");
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
jobConf.setMapperClass(Map.class);
jobConf.setCombinerClass(Reduce.class);
jobConf.setReducerClass(Reduce.class);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
JobClient.runJob(jobConf);
}
}
|
对 Hadoop API 的完整解释请参见 Hadoop 网站上的 API 文档。这里只做简要说明。
Map 类实现映射功能,它搜索日志文件的每一行,寻找应用程序的名称。然后把应用程序名称以键-值对的形式放在输出集合中。
Reduce 类计算具有相同键(相同应用程序名称)的所有值的总和。因此,这个应用程序最终输出的键-值对表示每个应用程序在 Portal 上启动的次数。
Main 函数配置并运行 MapReduce 作业。
运行 PortalLogAnalyzer首先,把这些 Java 代码复制到主节点并编译。把 Java 代码复制到 <hadoop_home>/workspace 目录中。对它执行编译并存档在一个 Jar 文件中,后面 hadoop 命令将运行这个文件。
1
2
3
4
| $ mkdir classes
$ javac –cp ../hadoop-0.19.1-core.jar –d classes
hadoop/sample/PortalLogAnalyzer.java
$ jar –cvf PortalLogAnalyzer.jar –C classes/ .
|
把 Portal 日志复制到 workspace/input 中。假设有多个日志文件,其中包含 2009 年 5 月的所有日志。把这些日志放到 HDFS 中。
1
| $ bin/hadoop fs –put workspace/input input2
|
在运行 PortalLogAnalyzer 时,输出说明映射和缩减过程。
1
2
| $ bin/hadoop jar workspace/PortalLogAnalizer.jar hadoop.sample.PortalLogAnalizer input2
output2
|
图 3. 任务的输出应用程序执行完之后,输出应该与图 4 相似。
1
| $ bin/hadoop fs –cat output2/*
|
图 4. 部分输出在访问 JobTracker 站点时,会看到另一个完成的作业。注意图 5 中的最后一行。
图 5. 完成的作业 |
|
|
|
|
|