如何通过 Oozie 在 YARN 上运行 Spark 作业(2)
- UID
- 1066743
|
如何通过 Oozie 在 YARN 上运行 Spark 作业(2)
- 下载 spark-assembly.jar 文件浏览 HDFS NameNode 用户界面,下载 spark-assembly.jar 文件。
- 从 Ambari 控制台中选择 HDFS,然后选择 Quick Links –> NameNode UI。
- 单击 Utilities –> Browse the file system。
- 在 Hadoop 文件资源管理器中,导航到 /iop/apps/4.1.0.0/spark/jars,选择 spark-assembly.jar,单击 Download 并保存该文件。
- 将下载的 spark-assembly.jar 文件转移到 lib 目录,这会得到以下目录结构:
1
2
3
4
5
6
| +-~/spark-example/
+-job.properties
+-workflow.xml
+-lib/
+-example-1.0.jar
+-spark-assembly.jar
|
- 将应用程序复制到 HDFS将 spark-example/ 目录复制到 HDFS 中的用户 HOME 目录。确保 HDFS 中的 spark-example 位置与 job.properties 中的 oozie.wf.application.path 值匹配。
1
| $ hadoop fs -put spark-example spark-example
|
- 运行示例作业运行以下命令来提交 Oozie 作业:
1
2
3
| $cd ~/spark-example
$oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
job: 0000012-151103233206132-oozie-oozi-W
|
检查工作流作业状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| $ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W
Job ID : 0000012-151103233206132-oozie-oozi-W
————————————————————————————————————————————
Workflow Name : SparkWordCount
App Path : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example
Status : SUCCEEDED
Run : 0
User : root
Group : –
Created : 2015-11-04 15:19 GMT
Started : 2015-11-04 15:19 GMT
Last Modified : 2015-11-04 15:23 GMT
Ended : 2015-11-04 15:23 GMT
CoordAction ID: –
Actions
————————————————————————————————————————————
ID Status Ext ID Ext Status Err Code
————————————————————————————————————————————
0000012-151103233206132-oozie-oozi-W@:start: OK – OK –
0000012-151103233206132-oozie-oozi-W@spark-node OK job_1446622088718_0022 SUCCEEDED –
0000012-151103233206132-oozie-oozi-W@end OK – OK –
————————————————————————————————————————————
|
完整的 Java 程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
| public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: WordCountSparkMain <file> <file>");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
SparkConf sparkConf = new SparkConf().setAppName("Word count");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,
String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(String sentence) {
List<String> result = new ArrayList<>();
if (sentence != null) {
String[] words = sentence.split(" ");
for (String word : words) {
if (word != null && word.trim().length() > 0) {
result.add(word.trim().toLowerCase());
}
}
}
return result;
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new
PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new
Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer a, Integer b) {
return a + b;
}
}, 2);
JavaPairRDD<Integer, String> countsAfterSwap = counts.mapToPair(new
PairFunction<Tuple2<String, Integer>, Integer, String>() {
private static final long serialVersionUID = 2267107270683328434L;
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
throws Exception {
return new Tuple2<>(t._2, t._1);
}
});
countsAfterSwap = countsAfterSwap.sortByKey(false);
counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer,
String>, String, Integer>() {
private static final long serialVersionUID = 2267107270683328434L;
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
throws Exception {
return new Tuple2<>(t._2, t._1);
}
});
JavaRDD<String> results = counts.map(new Function<Tuple2<String,
Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> v1) throws Exception {
return String.format("%s,%s", v1._1, Integer.toString(v1._2));
}
});
results.saveAsTextFile(outputPath);
}
}
}
|
|
|
|
|
|
|