首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(2)

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(2)

实现自定义的 MapReduce 类在本文中使用的试验场景中,您可能希望在 Hadoop 中处理并对大量较小的二进制文件进行归档。例如,您可能需要让 Hadoop 分析多个 PDF                                格式的研究论文。使用传统的 MapReduce 技术,需要花费相对较长的时间才能完成此作业,因为您使用了太多的小文件作为输入。此外,MapReduce                                无法自然地读取您文件的 PDF 格式。除了这些限制之外,在 Hadoop 分布式文件系统中存储许多小文件也会消耗 NameNode 上的大量内存。每 100                                万个文件或数据块大约需要使用 1 GB 内存。因此,使用传统的 MapReduce                                技术处理小于一个数据块的文件的效率较低。开发一个具有以下特征的程序会提高效率:
  • 针对处理大量小文件而进行优化
  • 可读取二进制文件
  • 生成更少、更大的文件作为输出
一种更好的方法是使用 Apache Tika 读取所有受支持的文档格式的文本,开发一个 TikaInputFormat 类来使用                                MapReduce 任务读取和处理小文件,并使用 TikaOutputFormat 显示结果。使用                                        InputFormat、RecordReader 和 RecordWriter                                来处理解决方案。我们的目的是读取许多较小的 PDF 文件,生成具有类似于以下代码的分隔格式的输出。
清单 2. 想要的输出
1
2
3
4
<file1.pdf>|<content of file1>
<file2.pdf>|<content of file2>
<file3.pdf>|<content of file3>
...




此输出可以在以后用于下游分析。以下各节将会详细介绍每个类。
TikaHelper 将二进制数据转换为文本这个帮助器类的用途是将一个二进制数据流转换为文本格式。它接收一个 Java I/O 流作为输入,返回与该流等效的 string。
如果您熟悉 MapReduce,就会知道所有任务都包含在运行时设置的一些配置参数。通过使用这些参数,可以定义作业应如何运行 —                                例如输出所在的位置。您还可以添加将供这些类使用的参数。
在这个应用程序中,假设您希望输出一个分隔文件。那么您需要采用某种方式来将原始文本字段中所选的分隔字符替换为不同的字符,还需要采用某种方式来将文本中的新行替换为同样的取代字符。出于此用途,我们将添加两个参数:com.ibm.imte.tika.delimiter                                和 com.ibm.imte.tika.replaceCharacterWith。如 清单 3 中所示,在                                        TikaHelper 类中,从一个 Configuration                                        实例读取这些参数来获得替换选项。Configuration 从 RecordReader 传递而来,它创建了                                        TikaHelper 实例,我们将在本文的下一节中介绍它。
清单 3. TikaHelper.java 构造函数                               
1
2
3
4
5
6
7
8
9
10
11
12
13
public TikaHelper(Configuration conf)
{
    tika = new Tika();
    String confDelimiter = conf.get("com.ibm.imte.tika.delimiter");
    String confReplaceChar =
        conf.get("com.ibm.imte.tika.replaceCharacterWith");
    if (confDelimiter != null )
        this.delimiter = "["+ confDelimiter + "]";
    if (confReplaceChar != null )
        this.replaceWith = confReplaceChar;
    logger.info("Delimiter: " + delimiter);
    logger.info("Replace With character:" + replaceWith);
}




准备好选项后,请调用 readPath 方法来获取要转换为文本的数据流。替换了配置中所有想要替换的字符后,返回文件内容的                                        string 表示。
在一个 string 对象上调用 replaceAll                                方法,将所有循环出现的字符替换为该参数中指定的字符。因为它接受一个正则表达式作为输入,所以在字符两边加上正则表达式分组字符 [ 和                                        ]。在解决方案中,指示如果 com.ibm.imte.tika.replaceCharacterWith                                未指定,则将所有字符替换为一个空字符串。
在本文中,将输出保存为分隔文件。这会使得它们很容易被读取和处理。但是,您需要删除原始文本中的换行符和分隔字符。在情绪分析或欺诈检测等用例中,这些字符并不重要。如果需要                                100% 地保留原始文本,那么可以将结果输出为二进制 Hadoop 序列文件。
清单 4. TikaHelper                                构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  String readPath(InputStream stream)
{
    try
    {
        String content = tika.parseToString(stream);
        content = content.replaceAll(delimiter, replaceWith);
        content = content.replaceAll(endLine, replaceWith);
        return content;
    }
    catch (Exception e)
    {
        logger.error("Malformed PDF for Tika: " + e.getMessage());
    }
    return "Malformed PDF";
}




TikaInputFormat 定义作业每个 MapReduce 任务都必须有一个 InputFormat。TikaInputFormat                                是此解决方案中开发的 InputFormat。它从 CombineFileInputFormat 类扩展而来,使用                                        Text 作为键和值的输入参数。Text 是一个可写对象 (writable),它是                                Hadoop 要用于键-值对的序列化格式。
TikaInputFormat 用于验证作业的配置,拆分输入数据块,并创建一个合适的 RecordReader。如                                        createRecordReader 方法中的 清单 5 中所示,可以返回一个                                        RecordReader 实例。前面已经介绍过,不需要拆分 TikaInputFormat                                        格式的文件,因为这些文件被视为很小。无论如何,TikaHelper 都无法读取文件的各个部分。因此,必须将                                        isSplitable 方法的返回值设置为 false。
清单 5. TikaInputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TikaInputFormat extends CombineFileInputFormat<Text, Text>
{
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException
    {
        return new TikaRecordReader((CombineFileSplit) split, context);
    }
     
    @Override
    protected boolean isSplitable(JobContext context, Path file)
    {
        return false;
    }
}




TikaRecordReader 生成键-值对TikaRecordReader 使用提供给 TikaInputFormat 的数据生成键-值对。这个类派生自                                        RecordReader 抽象类。本节将介绍该构造函数和 nextKeyValue 方法。
在 清单 6 中所示的构造函数中,存储执行从 TikaInputFormat                                        传送的作业所需的信息。Path[] paths 存储每个文件的路径,FileSystem fs 表示                                Hadoop 中的一个文件系统,CombineFileSplit split 包含碎片的条件。请注意,我们还使用                                        Configuration 创建了一个 TikaHelper 实例,以便解析                                        TikaRecordReader 类中的文件。
清单 6. TikaRecordReader.java                                构造函数
1
2
3
4
5
6
7
8
public TikaRecordReader(CombineFileSplit split, TaskAttemptContext context)
            throws IOException
{
    this.paths = split.getPaths();
    this.fs = FileSystem.get(context.getConfiguration());
    this.split = split;
    this.tikaHelper = new TikaHelper(context.getConfiguration());
}




在 清单 7 中所示的 nextKeyValue 方法中,遍历 Path[]                                中的每个文件,返回一个 Text                                类型的键和值,其中分别包含文件路径和每个文件的内容。为此,首先确定是否已位于文件数组的末尾。如果不是,则前进到数组中的下一个可用文件。然后打开一个连接该文件的                                        FSDataInputStream 流。在这种情况下,键是文件的路径,值是文本内容。将该流传递给                                        TikaHelper 来读取值的内容。(始终指向迭代中的当前文件的 currentStream                                字段。)接下来,关闭用完的流。
了解 HadoopDev查找扩大的 InfoSphere BigInsights 开发团队带给您的资源,帮助您开始使用受 InfoSphere BigInsights 支持的                                        Hadoop。文档、产品下载、实验、代码示例、帮助、事件、专家博客 — 这里一应俱全。您还可以直接联系开发人员。。

为输入中的每个文件运行此方法一次。每个文件都生成一个键-值对。之前已经解释过,读取一个碎片时,会打开下一个碎片来获取记录,依此类推。此过程也会在其他碎片上并行发生。最终,通过返回                                        false 值来停止循环。
除了以下代码之外,还必须覆盖一些默认函数,如可通过  获得的完整代码所示。
清单 7. TikaInputFormat.java                                        nextKeyValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
    if (count >= split.getNumPaths())
    {
        done = true;
        return false;
        //we have no more data to parse
    }
     
    Path path = null;
    key = new Text();
    value = new Text();
         
         
    try {
        path = this.paths[count];
    } catch (Exception e) {
        return false;
    }
         
    currentStream = null;
    currentStream = fs.open(path);

    key.set(path.getName());
    value.set(tikaHelper.readPath(currentStream));

    currentStream.close();
    count++;

    return true; //we have more data to parse
}

返回列表