1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class TikaOutputFormat extends FileOutputFormat<Text, Text> { @Override public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { //to get output files in part-r-00000 format Path path = getDefaultWorkFile(context, ""); FileSystem fs = path.getFileSystem(context.getConfiguration()); FSDataOutputStream output = fs.create(path, context); return new TikaRecordWriter(output, context); } } |
1 2 3 4 5 6 7 8 | public TikaRecordWriter(DataOutputStream output, TaskAttemptContext context) { this.out = output; String cDel = context.getConfiguration().get("com.ibm.imte.tika.delimiter"); if (cDel != null) delimiter = cDel; logger.info("Delimiter character: " + delimiter); } |
1 2 3 4 5 6 7 8 9 | @Override public void write(Text key, Text value) throws IOException, InterruptedException { out.writeBytes(key.toString()); out.writeBytes(delimiter); out.writeBytes(value.toString()); out.writeBytes("\n"); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public static void main(String[] args) throws Exception { int exit = ToolRunner.run(new Configuration(), new TikaDriver(), args); System.exit(exit); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); //setting the input split size 64MB or 128MB are good. conf.setInt("mapreduce.input.fileinputformat.split.maxsize", 67108864); Job job = new Job(conf, "TikaMapreduce"); conf.setStrings("com.ibm.imte.tika.delimiter", "|"); conf.setStrings("com.ibm.imte.tika.replaceCharacterWith", ""); job.setJarByClass(getClass()); job.setJobName("TikaRead"); job.setInputFormatClass(TikaInputFormat.class); job.setOutputFormatClass(TikaOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public class TikaJaqlInputFormat extends MultiFileInputFormat<JsonHolder, JsonHolder> { @Override public RecordReader<JsonHolder, JsonHolder> getRecordReader( InputSplit split, JobConf job, Reporter reporter) throws IOException { return new TikaJaqlRecordReader(job, (MultiFileSplit) split); } @Override protected boolean isSplitable(FileSystem fs, Path filename) { return false; } } |
欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) | Powered by Discuz! 7.0.0 |