使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(3)
 
- UID
- 1066743
|

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(3)
TikaOutputFormat 指定了输出详细信息这个类确定作业的输出在何处和如何存储。它必须从 OutputFormat 类扩展。在本例中,它从 FileOutputFormat 扩展而来。如 清单 8 中所示,首先分配输出的路径,然后创建一个 TikaRecordWriter 实例来生成输出文件。就像 TikaInputFormat 类一样,这个类必须在 main 方法中指定为用作 OutputFormat 类。
清单 8. TikaOutputFormat.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| public class TikaOutputFormat extends FileOutputFormat<Text, Text>
{
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException
{
//to get output files in part-r-00000 format
Path path = getDefaultWorkFile(context, "");
FileSystem fs = path.getFileSystem(context.getConfiguration());
FSDataOutputStream output = fs.create(path, context);
return new TikaRecordWriter(output, context);
}
}
|
TikaRecordWriter 创建了输出这个类用于创建输出。它必须从 RecordWriter 抽象类扩展。
在 清单 9 中所示的构造函数中,您获得输出流、上下文和自定义的配置参数,这个参数用作文件名和文件内容之间的分隔符。这个参数可在运行时进行设置(main 方法)。如果未指定分隔符,默认情况下会选择 | 作为分隔符。
清单 9. TikaRecordWriter.java 构造函数1
2
3
4
5
6
7
8
| public TikaRecordWriter(DataOutputStream output, TaskAttemptContext context)
{
this.out = output;
String cDel = context.getConfiguration().get("com.ibm.imte.tika.delimiter");
if (cDel != null)
delimiter = cDel;
logger.info("Delimiter character: " + delimiter);
}
|
在 清单 10 中所示的 write 方法中,使用了映射器中创建的 Text 类型的键和值,它们将被写入到输出流中。键包含文件名,值包含文件的文本内容。将键和值写入到输出中后,可以使用分隔符将它们分开,然后使用换行符将每行分开。
清单 10. TikaRecordWriter.java write1
2
3
4
5
6
7
8
9
| @Override
public void write(Text key, Text value) throws IOException,
InterruptedException
{
out.writeBytes(key.toString());
out.writeBytes(delimiter);
out.writeBytes(value.toString());
out.writeBytes("\n");
}
|
TikaDriver 使用应用程序要运行 MapReduce 作业,需要定义一个驱动程序类 TikaDriver,它包含 main 方法,如 清单 11 中所示。可以将 TikaInputFormat 设置为自定义的 InputFormat,类似地,对于作业,可以将 TikaOutputFormat 设置为自定义的 OutputFormat。
清单 11. Main 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| public static void main(String[] args) throws Exception
{
int exit = ToolRunner.run(new Configuration(), new TikaDriver(), args);
System.exit(exit);
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();
//setting the input split size 64MB or 128MB are good.
conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 67108864);
Job job = new Job(conf, "TikaMapreduce");
conf.setStrings("com.ibm.imte.tika.delimiter", "|");
conf.setStrings("com.ibm.imte.tika.replaceCharacterWith", "");
job.setJarByClass(getClass());
job.setJobName("TikaRead");
job.setInputFormatClass(TikaInputFormat.class);
job.setOutputFormatClass(TikaOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
|
Tika 和 Log4j API 附件记住在运行任务时附加 Tika 和 Log4j API。要在 Eclipse 中完成此操作,可以单击 Run > Run Configurations 转到作业配置,并在 Java MapReduce 部分中,单击 JAR Settings 选项卡,通过将这些 API 添加到 Additional JAR Files 部分来找到它们。
请注意加粗的第一行。如果未定义最大碎片大小,那么该任务会将所有输入文件归为一个碎片,所以只有一个映射任务。为了预防出现这种情况,可以定义最大碎片大小。此值可通过为 mapreduce.input.fileinputformat.split.maxsize 配置参数定义一个值来更改。这样,每个碎片都有一个可配置的大小,该大小在本例中为 64MB。
现在,我们已经完成了 MapReduce 作业。它夺取 HDFS 输入文件夹中的所有文件,将它们转码为分隔的输出文件。然后,您可以很方便地使用文本分析工具继续分析数据,比如 IBM Annotation Query Language (AQL)。如果想要一种不同的输出格式或想要直接转换数据,则必须相应地修改代码。因为许多人都对 Java 代码编程不太熟悉,所以本文将介绍如何在 Jaql 模块中使用同样的技术。
使用 Jaql 模块而不是 Java 类本节将介绍如何使用上一节中的相同技术创建一个 Jaql 模块,如何使用此模块转换文档,从外部文件系统加载它们,以及如何直接分析它们。Jaql 模块使您能够使用一种直观的语法执行所有这些处理工作,无需编写任何 Java 代码。
前面描述的 InputFormat、OutputFormat、RecordReader 和 RecordWriter 类位于 org.apache.hadoop.mapreduce 和 org.apache.hadoop.mapreduce.lib.output 包中,这些包被称为新的 Hadoop API。
要为 Jaql 使用相同的方法,需要实现 org.apache.hadoop.mapred 包中的类,该包是一个较旧的 MapReduce API 版本。
首先,了解如何向旧包应用相同的方法。
TikaJaqlInputFormat 验证输入这个类用于验证作业的输入配置,拆分输入数据块,以及创建 RecordReader。它扩展自 org.apache.hadoop.mapred.MultiFileInputFormat 类,包含两个方法。
如 清单 12 中所示,构造函数创建了一个 TikaJaqlRecordReader 实例,isSplitable 方法被设置为返回 false 来覆盖阻止 InputFormat 拆分文件的默认行为。为了能够在载入 Jaql 中后操作输入,可以使用一般类型 JsonHolder。
清单 12. TikaJaqlInputFormat.java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| public class TikaJaqlInputFormat extends MultiFileInputFormat<JsonHolder, JsonHolder>
{
@Override
public RecordReader<JsonHolder, JsonHolder> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException
{
return new TikaJaqlRecordReader(job, (MultiFileSplit) split);
}
@Override
protected boolean isSplitable(FileSystem fs, Path filename)
{
return false;
}
}
|
|
|
|
|
|
|