1 2 3 4 5 6 | +-~/spark-example/ +-job.properties +-workflow.xml +-lib/ +-example-1.0.jar +-spark-assembly.jar |
1 | $ hadoop fs -put spark-example spark-example |
1 2 3 | $cd ~/spark-example $oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run job: 0000012-151103233206132-oozie-oozi-W |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | $ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W Job ID : 0000012-151103233206132-oozie-oozi-W ———————————————————————————————————————————— Workflow Name : SparkWordCount App Path : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example Status : SUCCEEDED Run : 0 User : root Group : – Created : 2015-11-04 15:19 GMT Started : 2015-11-04 15:19 GMT Last Modified : 2015-11-04 15:23 GMT Ended : 2015-11-04 15:23 GMT CoordAction ID: – Actions ———————————————————————————————————————————— ID Status Ext ID Ext Status Err Code ———————————————————————————————————————————— 0000012-151103233206132-oozie-oozi-W@:start: OK – OK – 0000012-151103233206132-oozie-oozi-W@spark-node OK job_1446622088718_0022 SUCCEEDED – 0000012-151103233206132-oozie-oozi-W@end OK – OK – ———————————————————————————————————————————— |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: WordCountSparkMain <file> <file>"); System.exit(1); } String inputPath = args[0]; String outputPath = args[1]; SparkConf sparkConf = new SparkConf().setAppName("Word count"); try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) { JavaRDD<String> lines = ctx.textFile(inputPath, 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String sentence) { List<String> result = new ArrayList<>(); if (sentence != null) { String[] words = sentence.split(" "); for (String word : words) { if (word != null && word.trim().length() > 0) { result.add(word.trim().toLowerCase()); } } } return result; } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }); JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer a, Integer b) { return a + b; } }, 2); JavaPairRDD<Integer, String> countsAfterSwap = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { private static final long serialVersionUID = 2267107270683328434L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<>(t._2, t._1); } }); countsAfterSwap = countsAfterSwap.sortByKey(false); counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { private static final long serialVersionUID = 2267107270683328434L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { return new Tuple2<>(t._2, t._1); } }); JavaRDD<String> results = counts.map(new Function<Tuple2<String, Integer>, String>() { @Override public String call(Tuple2<String, Integer> v1) throws Exception { return String.format("%s,%s", v1._1, Integer.toString(v1._2)); } }); results.saveAsTextFile(outputPath); } } } |
欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) | Powered by Discuz! 7.0.0 |