首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Spark 部署及示例代码讲解(2)

Spark 部署及示例代码讲解(2)

运行 Spark 示例本文所有的例子都是在单机环境下运行的,选择的都是本地模式。随 Spark 安装包下载的示例代码都在 examples/src/main 目录下面,可以通过运行                bin/run-example<class>[params] 命令方式运行示例程序。例如,运行 SparkPI 的程序,该程序会计算出一个 PI                值,并打印结果在控制台上。
我们这里把输出日志重定向到当前目录下的 Sparkpilong.txt 日志文件。
清单 4. 运行代码
1
[root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example SparkPi 10 > Sparkpilog.txt




输出的日志分为两部分,一部分是通用日志信息,它由后面会介绍的一系列脚本及程序产生,另一部分是运行程序的输出结果,此处是计算 PI 的输出结果。清单 5                所示是通用日志信息,清单 6 所示是 SparkPI 程序的运算结果。
清单 5. 通用日至信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/05/19 09:58:38 WARN Utils: Your hostname, localhost.localdomain resolves
to a loopback address: 127.0.0.1; using 10.10.19.186 instead (on interface eth0)
15/05/19 09:58:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/19 09:58:38 INFO SecurityManager: Changing view acls to: root
15/05/19 09:58:38 INFO SecurityManager: Changing modify acls to: root
15/05/19 09:58:38 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)

15/05/19 09:58:43 INFO DAGScheduler: Stopping DAGScheduler
15/05/19 09:58:44 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/05/19 09:58:44 INFO MemoryStore: MemoryStore cleared
15/05/19 09:58:44 INFO BlockManager: BlockManager stopped
15/05/19 09:58:44 INFO BlockManagerMaster: BlockManagerMaster stopped
15/05/19 09:58:44 INFO SparkContext: Successfully stopped SparkContext
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.




清单 6. 计算结果
1
Pi is roughly 3.142888




上面针对输入参数 10 的 PI 计算结果为 3.142888。
清单 7 所示代码是 Spark 安装包里自带的 SparkPI 类的源代码。
清单 7. SparkPI                    程序源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class JavaSparkPi {

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}

JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);

int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});

System.out.println("Pi is roughly " + 4.0 * count / n);

jsc.stop();
}
}




一个 Spark 的任务对应一个 RDD,RDD 是弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD                的生成只有两种途径,一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 map、filter、join,等等。清单 7                所示程序定义了一个名为 dataSet 的 RDD。
清单 5 输出的大量信息都是计算机信息、Spark 信息,这些信息都是通过内部调用的若干脚本输出的,我们来看看具体运行示例代码的脚本。我们首先运行脚本                run-example,它的核心代码如清单 8-10 所示。
清单 8.                    run-example 脚本源代码
1
2
3
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
export SPARK_HOME="$FWDIR"
EXAMPLES_DIR="$FWDIR"/examples




清单 8 所示代码设置了示例代码目录,这里是当前目录下的 examples 文件夹。接下来指定第一个参数是运行的类名称,如清单 9 所示。
清单 9.                    run-example 脚本源代码
1
2
3
if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
Shift




脚本调用 spark-submit 脚本进入下一执行层级。
清单 10.                    run-example 脚本源代码
1
2
3
4
5
"$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
"$@"




接下来我们来看看 spark-submit 脚本里面的内容。
清单 11.                    spark-submit 脚本源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
elif [ "$1" = "--properties-file" ]; then
SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
fi
shift
done




上面代码通过用户从 run-example 脚本里传入的参数,此处为 master,来确定运行模式,然后调用 spark-class 脚本,如清单 12 所示。
清单 12.                    spark-class 脚本源代码
1
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"




清单 12 所示代码调用了 Spark-Class 运行程序 SparkSubmit。
具体介绍 Spark-Class 程序代码之前,我们来尝试运行 WordCount 实验,假设已经创建了一个名为 wordcountdata.txt                的文本文件,该文件被放置在目录                (/home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4),接着如清单                13 所示,开始运行程序。
清单 13. 运行                    WordCount 程序
1
root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example JavaWordCount ./wordcountdata.txt




输出如清单 14 所示,这里忽略了与清单 5 相同输出的内容,以及大部分清单 13 的计算结果,只列出少量结果。统计字符出现次数的详细信息如清单 14 所示。
清单 14. 输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
For: 4
SQLMLlib: 1
subfolder: 1
OS).: 1
Streaming,: 1
APIs: 1
full: 1
--master: 3
through: 1
Provisioning3rd-Party: 1
applications: 4
graph: 3
over: 1

返回列表