首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Spark 部署及示例代码讲解(3)

Spark 部署及示例代码讲解(3)

解释示例运行过程图 2. Spark                    示例代码运行过程描述图
图 2 所示是整个程序执行的路线图,具体我们下面一点点解释。
通过上面的介绍,我们可以大致看到,示例代码的运行顺序是依次从左向右的,
Run-example.sh->load-spark-env.sh->lib 目录下的 jar                包文件->spark-submit.sh->spark-class
清单 15 所示是 lib 目录下的文件。
清单 15. 文件内容
1
2
3
4
5
6
7
8
[root@localhost:3 bin]# ls -lrt ../lib
总用量 236232
-rw-rw-r--. 1 1000 1000 87065934 2 月 3 11:45 spark-examples-1.2.1-hadoop2.4.0.jar
-rw-rw-r--. 1 1000 1000 148865850 2 月 3 11:45 spark-assembly-1.2.1-hadoop2.4.0.jar
-rw-rw-r--. 1 1000 1000 1916671 2 月 3 11:45 spark-1.2.1-yarn-shuffle.jar
-rw-rw-r--. 1 1000 1000 1809447 2 月 3 11:45 datanucleus-rdbms-3.2.9.jar
-rw-rw-r--. 1 1000 1000 1890075 2 月 3 11:45 datanucleus-core-3.2.10.jar
-rw-rw-r--. 1 1000 1000 339666 2 月 3 11:45 datanucleus-api-jdo-3.2.6.jar




spark-examples 脚本里面有清单 16 所示代码,主要是用于加载类。
清单 16.                    spark-examples 代码 1
1
2
3
4
5
6
7
8
9
for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done




清单 17 所示代码做了一层保护,如果发现超过 1 个以上的 spark-example 包文件,抛出错误。
清单 17.                    spark-examples 代码 2
1
2
3
4
5
6
if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi




清单 11 和清单 12 已经介绍过,最终程序由 spark-class 执行。Client 模式会运行 Spark 驱动在同一个 JVM 里面,然后调用                spark-class 运行程序。清单 12 运行的输出如清单 18 所示。
清单 18. 运行输出
1
2
3
master local
  • --class org.apache.spark.examples.SparkPi
    /home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4/
    lib/spark-examples-1.2.1-hadoop2.4.0.jar 10




  • spark-class 脚本首先确定运行模式,如清单 19 所示。
    清单 19. 目录列表
    1
    2
    3
    4
    5
    6
    case "$1" in
    # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
            SPARK_DAEMON_MEMORY. 'org.apache.spark.deploy.master.Master')
    OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
    OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
    ;;




    对于 JDK8 有特殊的设置,JDK8 开始不再支持 MaxPermSize 等参数设置 JVM。
    清单 20. JDK8
    1
    2
    3
    4
    5
    6
    7
    # Set JAVA_OPTS to be able to load native libraries and to set heap size
    if [ "$JAVA_VERSION" -ge 18 ]; then
    JAVA_OPTS="$OUR_JAVA_OPTS"
    else
    JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
    fi
    JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"




    绕了一大圈,最终的启动程序如清单 21 所示。
    清单 21. 启动程序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
    # This is used only if the properties file actually contains these special configs
    # Export the environment variables needed by SparkSubmitDriverBootstrapper
    export RUNNER
    export CLASSPATH
    export JAVA_OPTS
    export OUR_JAVA_MEM
    export SPARK_CLASS=1
    shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own
    exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@"
    else
    # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala
    if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
    echo -n "Spark Command: " 1>&2
    echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
    echo -e "========================================\n" 1>&2
    fi
    exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
    fi




    最终执行示例的容器是由 org.apache.spark.deploy.SparkSubmitDriverBootstrapper 类产生的,参数为清单 22 所示。
    清单 22. 设置运行参数
    1
    2
    3
    org.apache.spark.deploy.SparkSubmit --master local
  • --class
    org.apache.spark.examples.SparkPi /home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/
                      spark-1.2.1-bin-hadoop2.4/lib/spark-examples-1.2.1-hadoop2.4.0.jar 10




  • 清单 23. SparkSubmitDriverBootstrapper 代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    private[spark] object SparkSubmitDriverBootstrapper {
    // Start the driver JVM
    val filteredCommand = command.filter(_.nonEmpty)
    val builder = new ProcessBuilder(filteredCommand)
    val env = builder.environment()

    if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
    val libraryPaths = confLibraryPath ++ sys.env.get(
                                    Utils.libraryPathEnvName)
    env.put(Utils.libraryPathEnvName, libraryPaths.mkString(
                                    sys.props("path.separator")))
    }

    val process = builder.start()

    // If we kill an app while it's running, its sub-process should be killed too.
    Runtime.getRuntime().addShutdownHook(new Thread() {
    override def run() = {
    if (process != null) {
    process.destroy()
    process.waitFor()
    }
    }
    })

    // Redirect stdout and stderr from the child JVM
    val stdoutThread = new RedirectThread(process.getInputStream,
                         System.out, "redirect stdout")
    val stderrThread = new RedirectThread(process.getErrorStream,
                          System.err, "redirect stderr")
    stdoutThread.start()
    stderrThread.start()

    // Redirect stdin to child JVM only if we're not running Windows. This is because the
    // subprocess there already reads directly from our stdin, so we should avoid spawning a
    // thread that contends with the subprocess in reading from System.in.
    val isWindows = Utils.isWindows
    val isSubprocess = sys.env.contains("IS_SUBPROCESS")
    if (!isWindows) {
    val stdinThread = new RedirectThread(System.in, process.getOutputStream,
                  "redirect stdin",propagateEof = true)
    stdinThread.start()
    // Spark submit (JVM) may run as a subprocess,and so this JVM should terminate on
    // broken pipe, signaling that the parent process has exited.
    //This is the case if the application is launched directly from python,
    //as in the PySpark shell. In Windows,the termination logic is handled in java_gateway.py
    if (isSubprocess) {
    stdinThread.join()
    process.destroy()
    }
    }
    val returnCode = process.waitFor()
    sys.exit(returnCode)
    }

    }




    从上面的 Scala 代码里面可以看到,Scala 最终启动的是 JVM 线程,所以它可以访问 Java 的库文件,例如 java.io.File。通过 Main 函数的方式启动了一个 JVM 进程,随后针对该进程又托管了一系列线程级别的操作。
    返回列表