首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

如何通过 Oozie 在 YARN 上运行 Spark 作业(2)

如何通过 Oozie 在 YARN 上运行 Spark 作业(2)

  • 下载 spark-assembly.jar 文件浏览 HDFS NameNode 用户界面,下载 spark-assembly.jar 文件。
    • 从 Ambari 控制台中选择 HDFS,然后选择 Quick Links –> NameNode UI
    • 单击 Utilities –> Browse the file system
    • 在 Hadoop 文件资源管理器中,导航到 /iop/apps/4.1.0.0/spark/jars,选择 spark-assembly.jar,单击 Download 并保存该文件。
    • 将下载的 spark-assembly.jar 文件转移到 lib 目录,这会得到以下目录结构:
      1
      2
      3
      4
      5
      6
      +-~/spark-example/
      +-job.properties
      +-workflow.xml
      +-lib/
      +-example-1.0.jar
      +-spark-assembly.jar




  • 将应用程序复制到 HDFS将 spark-example/ 目录复制到 HDFS 中的用户 HOME 目录。确保 HDFS 中的 spark-example 位置与 job.properties 中的 oozie.wf.application.path 值匹配。
    1
    $ hadoop fs -put spark-example spark-example




  • 运行示例作业运行以下命令来提交 Oozie 作业:
    1
    2
    3
    $cd ~/spark-example
    $oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
    job: 0000012-151103233206132-oozie-oozi-W




    检查工作流作业状态:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    $ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W

    Job ID : 0000012-151103233206132-oozie-oozi-W
    ————————————————————————————————————————————
    Workflow Name : SparkWordCount
    App Path : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example
    Status : SUCCEEDED
    Run : 0
    User : root
    Group : –
    Created : 2015-11-04 15:19 GMT
    Started : 2015-11-04 15:19 GMT
    Last Modified : 2015-11-04 15:23 GMT
    Ended : 2015-11-04 15:23 GMT
    CoordAction ID: –

    Actions
    ————————————————————————————————————————————
    ID Status Ext ID Ext Status Err Code
    ————————————————————————————————————————————
    0000012-151103233206132-oozie-oozi-W@:start: OK – OK –
    0000012-151103233206132-oozie-oozi-W@spark-node OK job_1446622088718_0022 SUCCEEDED –
    0000012-151103233206132-oozie-oozi-W@end OK – OK –
    ————————————————————————————————————————————




    完整的 Java 程序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    public static void main(String[] args) {
    if (args.length < 2) {
    System.err.println("Usage: WordCountSparkMain <file> <file>");
    System.exit(1);
    }
    String inputPath = args[0];
    String outputPath = args[1];
    SparkConf sparkConf = new SparkConf().setAppName("Word count");
    try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
    JavaRDD<String> lines = ctx.textFile(inputPath, 1);
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,
    String>() {
    private static final long serialVersionUID = 1L;
    public Iterable<String> call(String sentence) {
    List<String> result = new ArrayList<>();
    if (sentence != null) {
    String[] words = sentence.split(" ");
    for (String word : words) {
    if (word != null && word.trim().length() > 0) {
    result.add(word.trim().toLowerCase());
    }
    }
    }
    return result;
    }
    });
    JavaPairRDD<String, Integer> pairs = words.mapToPair(new
    PairFunction<String, String, Integer>() {
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Integer> call(String s) {
    return new Tuple2<>(s, 1);
    }
    });
      
    JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new
    Function2<Integer, Integer, Integer>() {
    private static final long serialVersionUID = 1L;
    public Integer call(Integer a, Integer b) {
    return a + b;
    }
    }, 2);
    JavaPairRDD<Integer, String> countsAfterSwap = counts.mapToPair(new
    PairFunction<Tuple2<String, Integer>, Integer, String>() {
    private static final long serialVersionUID = 2267107270683328434L;
    @Override
    public Tuple2<Integer, String> call(Tuple2<String, Integer> t)
    throws Exception {
    return new Tuple2<>(t._2, t._1);
    }
    });
    countsAfterSwap = countsAfterSwap.sortByKey(false);
    counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer,
    String>, String, Integer>() {
    private static final long serialVersionUID = 2267107270683328434L;
    @Override
    public Tuple2<String, Integer> call(Tuple2<Integer, String> t)
    throws Exception {
    return new Tuple2<>(t._2, t._1);
    }
    });
    JavaRDD<String> results = counts.map(new Function<Tuple2<String,
    Integer>, String>() {
    @Override
    public String call(Tuple2<String, Integer> v1) throws Exception {
    return String.format("%s,%s", v1._1, Integer.toString(v1._2));
    }
    });
    results.saveAsTextFile(outputPath);
    }
    }
    }




返回列表