使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(4)
 
- UID
- 1066743
|

使用 MapReduce 和 InfoSphere BigInsights 对各种文档类型进行处理和内容分析(4)
TikaJaqlRecordReader 生成键-值对此类用于生成 MapReduce 中使用的键-值对。它派生自 org.apache.hadoop.mapred.RecordReader 类,以便保持与 Jaql 的兼容性。本节将介绍该构造函数和 next 方法。
在 清单 13 中所示的构造函数中,初始化所要的类变量。获取包含有关文件的信息的碎片,然后创建一个新的 TikaHelper 实例来读取二进制文件。
清单 13. TikaJaqlRecordReader 构造函数1
2
3
4
5
6
7
8
| public TikaJaqlRecordReader(Configuration conf, MultiFileSplit split)
throws IOException
{
this.split = split;
this.conf = conf;
this.paths = split.getPaths();
this.tikaHelper = new TikaHelper(conf);
}
|
和 如何使用?不需要实现任务的输出部分,因为在使用 Jaql 加载数据后,可以使用现有的预定义 Jaql 模块来操作数据,并以各种格式写出它。
在 next 方法中,如 清单 14 中所示,逐个迭代碎片中的所有文件。打开每个文件的一个流后,将它的名称和内容作为元素分配给一个新的 BufferedJsonRecord 实例。BufferedJsonRecord 帮助您以合适的格式保留各项。Jaql 在内部在 JSON 文档上运行,所以所有数据都由 I/O 适配器转换为有效的 JSON 对象。然后,BufferedJsonRecord 被分配为记录的值。但是,键仍然是空的。
清单 14. TikaJaqlRecordReader next 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| public boolean next(JsonHolder key, JsonHolder value) throws IOException
{
if (count >= split.getNumPaths())
{
done = true;
return false;
}
Path file = paths[count];
fs = file.getFileSystem(conf);
InputStream stream = fs.open(file);
BufferedJsonRecord bjr = new BufferedJsonRecord();
bjr.setNotSorted();
bjr.add(new JsonString("path"), new JsonString(file.getName()));
bjr.add(new JsonString("content"),
new JsonString(this.tikaHelper.readPath(stream)));
value.setValue(bjr);
stream.close();
count++;
return true;
}
|
创建 Jaql 模块Jaql 模块使用户能够创建可重用的 Jaql 函数和资源的包。创建一个包含 I/O 适配器的 tika 模块。I/O 适配器被传递给 I/O 函数,允许 Jaql 从各种类型的来源读取或写入数据,这些来源包括分隔文件、序列文件、AVRO 文件、HBase 和 Hive 表,等等。这个 tika 模块使用户能够读取 Apache Tika 所支持的二进制文件(比如 Word 文件或 PDF 文档),提取文件名和文本内容。要创建 tika 模块,可以导出之前开发的 TikaJaql 类作为 JAR 文件。Jaql 可动态地加载 Java 资源,使用函数 addRelativeClassPath() 将它们添加到类路径中,以便注册这些额外的库。
在 Jaql 中,创建和引用模块很简单。通过将每个 Jaql 脚本添加到 Jaql 的搜索路径中,可以将它们添加为模块。实现此操作的最简单的方式是:在 $JAQL_HOME/modules 目录中创建一个新文件夹,并在这里包含您的文件。在本例中,该模块名为 tika,所以您需要创建文件夹 $JAQL_HOME/modules/tika。然后,可以使用 Jaql 脚本创建函数,并将它们包含在此文件夹中。
创建一个名为 tikaRead() 的自定义函数,它使用 com.ibm.imte.tika.jaql.TikaJaqlInputFormat 作为输入格式组件。此函数用于读取数据,所以只需更改 inoptions(无需更改 outoptions)。基于上一节中开发的已实现的类,调用 tikaRead() 函数作为读取的输入,这会为每个输入文件生成一个包含两个字段的记录:path 是完整的文件名,content 是文件的文本内容。对 tikaRead() 函数的调用类似于调用其他任何 Jaql 输入 I/O 适配器,比如 lines() 或 del()。后面的一节中包含一些使用示例。
创建文件 tika.jaql,如 清单 15 中所示,并将它放在 $JAQL_HOME/modules/tika 目录中,以便能够轻松地将它导入到其他 Jaql 脚本中。Jaql 文件的名称无关紧要,但在 modules 文件夹下创建的文件夹的名称很重要。您还可以使用命令行选项从 Jaql 支持的终端动态地添加模块。
此代码在 /home/biadmin/ 中寻找已生成的 JAR 文件。您需要将 Tika JAR 文件复制到此文件夹,将创建的类文件作为 TikaJaql.jar 导出到此文件夹。在 Eclipse 中,可以使用 Export 命令从一个项目创建一个 JAR 文件。
清单 15. tika.jaql1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| addRelativeClassPath(getSystemSearchPath(), '/home/biadmin/tika-app-1.5.jar,/hom
e/biadmin/TikaJaql.jar');
//creating the function
tikaRead = fn (
location : string,
inoptions : {*}? = null,
outoptions : {*}? = null
)
{
location,
"inoptions": {
"adapter": "com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter",
"format": "com.ibm.imte.tika.jaql.TikaJaqlInputFormat",
"configurator": "com.ibm.jaql.io.hadoop.FileInputConfigurator"
}
};
|
使用 Jaql创建该模块后,使用以下示例帮助了解此函数的一些可能用法。
Jaql 非常灵活,可用于转换和分析数据。它拥有与分析工具的连接器,比如数据挖掘和文本分析 (AQL)。它拥有各种文件格式(比如线、序列和 Avro)和外部来源(比如 Hive 和 HBase)的连接器。您还可以使用它从本地文件系统或者甚至从 Web 直接读取文件。
下一节将演示在 Jaql 中使用 tika 模块的 3 个示例。第一个示例展示了 HDFS 上的二进制文档向包含文本内容的分隔文件的基本转换。这个示例演示了模块的基本功能;它等效于在前几节中使用 MapReduce 作业执行的任务。第二个示例将展示如何使用 Jaql 直接从外部文件系统来源将二进制文档加载到 HDFS 并进行转换。实践证明,如果不希望在 HDFS 中存储二进制文档,仅希望以文本或序列文件格式存储内容,那么这个示例是一个有用的过程。在这种情况下,加载是单线程的,所以它没有与第一个方法相同的吞吐量。第三个示例展示了在读取文件后,如何直接在 Jaql 中执行文本分析,无需首先提取并持久化文本内容。
使用 清单 16 中的代码,从 HDFS 的一个目录内读取文件,然后将结果写回 HDFS 中。此方法非常类似于第一节的 MapReduce 作业中完成的操作。必须先导入所创建的 tika 模块,然后才能使用 tikaRead() 功能。然后,使用 read() 函数读取指定文件夹中的文件,以带分隔符的文件格式将文件名称和文本内容写入到 HDFS 中的一个文件中。
在 中,可以找到 Jaql 的更多信息。
演示输入是一个文件夹中的一组 Word 格式的客户评论,如 清单 16 中所示。在 10 条评论中,一些评论是正面的,一些评论是负面的。假设您希望提取文本并存储为分隔格式。随后,您可能希望在文本之上执行文本分析。您希望保留文件名,因为它会告诉您是谁创建了该评论。通常,此关系记录在一个单独的表中。
清单 16. hdfs:/tmp/reviews/ 中的输入文件1
2
3
4
| review1.doc
review2.doc
review3.doc
...
|
如 清单 17 中所示,运行 Jaql 命令来读取此文件夹中的所有支持的文档,提取文本,将其保存为一个分隔文件中,每个原始文档对应这个文件中的一行。
清单 17. 使用 Jaql 将 HDFS 转换为 HDF1
2
3
4
5
| import tika(*);
read(tikaRead("/tmp/reviews")) //You could put data transformations here
-> write(del("/tmp/output",
{schema:schema{path,content}, delimiter:"|", quoted:true}));
|
现在,可以在 /tmp/output 文件夹中找到输出。此文件夹包含来自 /tmp/reviews 的 Word 文档的文本内容,内容格式如下。 |
|
|
|
|
|