首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(3)

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(3)

TikaOutputFormat                                指定了输出详细信息这个类确定作业的输出在何处和如何存储。它必须从 OutputFormat 类扩展。在本例中,它从                                        FileOutputFormat 扩展而来。如 清单 8 中所示,首先分配输出的路径,然后创建一个                                        TikaRecordWriter 实例来生成输出文件。就像 TikaInputFormat                                类一样,这个类必须在 main 方法中指定为用作 OutputFormat 类。
清单 8. TikaOutputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TikaOutputFormat extends FileOutputFormat<Text, Text>
{

    @Override
    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context)
            throws IOException, InterruptedException
    {
        //to get output files in part-r-00000 format
        Path path = getDefaultWorkFile(context, "");
        FileSystem fs = path.getFileSystem(context.getConfiguration());
        FSDataOutputStream output = fs.create(path, context);
        return new TikaRecordWriter(output, context);
    }

}




TikaRecordWriter 创建了输出这个类用于创建输出。它必须从 RecordWriter 抽象类扩展。
在 清单 9                                        中所示的构造函数中,您获得输出流、上下文和自定义的配置参数,这个参数用作文件名和文件内容之间的分隔符。这个参数可在运行时进行设置(main                                方法)。如果未指定分隔符,默认情况下会选择 | 作为分隔符。
清单 9. TikaRecordWriter.java                                构造函数
1
2
3
4
5
6
7
8
public TikaRecordWriter(DataOutputStream output, TaskAttemptContext context)
{
    this.out = output;
    String cDel = context.getConfiguration().get("com.ibm.imte.tika.delimiter");
    if (cDel != null)
        delimiter = cDel;
    logger.info("Delimiter character: " + delimiter);
}




在 清单 10 中所示的 write 方法中,使用了映射器中创建的 Text                                类型的键和值,它们将被写入到输出流中。键包含文件名,值包含文件的文本内容。将键和值写入到输出中后,可以使用分隔符将它们分开,然后使用换行符将每行分开。
清单 10. TikaRecordWriter.java                                write
1
2
3
4
5
6
7
8
9
@Override
public void write(Text key, Text value) throws IOException,
        InterruptedException
{
    out.writeBytes(key.toString());
    out.writeBytes(delimiter);
    out.writeBytes(value.toString());
    out.writeBytes("\n");
}




TikaDriver 使用应用程序要运行 MapReduce 作业,需要定义一个驱动程序类 TikaDriver,它包含 main 方法,如 清单 11 中所示。可以将 TikaInputFormat 设置为自定义的                                        InputFormat,类似地,对于作业,可以将 TikaOutputFormat 设置为自定义的                                        OutputFormat。
清单 11. Main                                方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws Exception
{
    int exit = ToolRunner.run(new Configuration(), new TikaDriver(), args);
    System.exit(exit);
}

@Override
public int run(String[] args) throws Exception
{
    Configuration conf = new Configuration();
    //setting the input split size 64MB or 128MB are good.
    conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 67108864);
    Job job = new Job(conf, "TikaMapreduce");
    conf.setStrings("com.ibm.imte.tika.delimiter", "|");
    conf.setStrings("com.ibm.imte.tika.replaceCharacterWith", "");
    job.setJarByClass(getClass());
    job.setJobName("TikaRead");
     
    job.setInputFormatClass(TikaInputFormat.class);
    job.setOutputFormatClass(TikaOutputFormat.class);
     
    FileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
         
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
}




Tika 和 Log4j API 附件记住在运行任务时附加 Tika 和 Log4j API。要在 Eclipse 中完成此操作,可以单击 Run > Run                                                Configurations 转到作业配置,并在 Java MapReduce 部分中,单击 JAR                                                Settings 选项卡,通过将这些 API 添加到 Additional JAR Files                                        部分来找到它们。

请注意加粗的第一行。如果未定义最大碎片大小,那么该任务会将所有输入文件归为一个碎片,所以只有一个映射任务。为了预防出现这种情况,可以定义最大碎片大小。此值可通过为                                        mapreduce.input.fileinputformat.split.maxsize                                配置参数定义一个值来更改。这样,每个碎片都有一个可配置的大小,该大小在本例中为 64MB。
现在,我们已经完成了 MapReduce 作业。它夺取 HDFS                                输入文件夹中的所有文件,将它们转码为分隔的输出文件。然后,您可以很方便地使用文本分析工具继续分析数据,比如 IBM Annotation Query Language                                (AQL)。如果想要一种不同的输出格式或想要直接转换数据,则必须相应地修改代码。因为许多人都对 Java 代码编程不太熟悉,所以本文将介绍如何在 Jaql                                模块中使用同样的技术。
使用 Jaql 模块而不是 Java 类本节将介绍如何使用上一节中的相同技术创建一个 Jaql 模块,如何使用此模块转换文档,从外部文件系统加载它们,以及如何直接分析它们。Jaql                                模块使您能够使用一种直观的语法执行所有这些处理工作,无需编写任何 Java 代码。
前面描述的 InputFormat、OutputFormat、RecordReader 和                                        RecordWriter 类位于 org.apache.hadoop.mapreduce 和                                        org.apache.hadoop.mapreduce.lib.output 包中,这些包被称为新的 Hadoop API。
要为 Jaql 使用相同的方法,需要实现 org.apache.hadoop.mapred 包中的类,该包是一个较旧的 MapReduce                                API 版本。
首先,了解如何向旧包应用相同的方法。
TikaJaqlInputFormat 验证输入这个类用于验证作业的输入配置,拆分输入数据块,以及创建 RecordReader。它扩展自                                        org.apache.hadoop.mapred.MultiFileInputFormat 类,包含两个方法。
如 清单 12 中所示,构造函数创建了一个 TikaJaqlRecordReader                                        实例,isSplitable 方法被设置为返回 false 来覆盖阻止                                        InputFormat 拆分文件的默认行为。为了能够在载入 Jaql 中后操作输入,可以使用一般类型 JsonHolder。
清单 12. TikaJaqlInputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TikaJaqlInputFormat extends MultiFileInputFormat<JsonHolder, JsonHolder>
{

    @Override
    public RecordReader<JsonHolder, JsonHolder> getRecordReader(
            InputSplit split, JobConf job, Reporter reporter)
            throws IOException
    {
        return new TikaJaqlRecordReader(job, (MultiFileSplit) split);
    }
    @Override
    protected boolean isSplitable(FileSystem fs, Path filename)
    {
        return false;
    }
}

返回列表