运行 Spark 示例本文所有的例子都是在单机环境下运行的,选择的都是本地模式。随 Spark 安装包下载的示例代码都在 examples/src/main 目录下面,可以通过运行 bin/run-example<class>[params] 命令方式运行示例程序。例如,运行 SparkPI 的程序,该程序会计算出一个 PI 值,并打印结果在控制台上。
我们这里把输出日志重定向到当前目录下的 Sparkpilong.txt 日志文件。
清单 4. 运行代码1
| [root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example SparkPi 10 > Sparkpilog.txt
|
输出的日志分为两部分,一部分是通用日志信息,它由后面会介绍的一系列脚本及程序产生,另一部分是运行程序的输出结果,此处是计算 PI 的输出结果。清单 5 所示是通用日志信息,清单 6 所示是 SparkPI 程序的运算结果。
清单 5. 通用日至信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/05/19 09:58:38 WARN Utils: Your hostname, localhost.localdomain resolves
to a loopback address: 127.0.0.1; using 10.10.19.186 instead (on interface eth0)
15/05/19 09:58:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/19 09:58:38 INFO SecurityManager: Changing view acls to: root
15/05/19 09:58:38 INFO SecurityManager: Changing modify acls to: root
15/05/19 09:58:38 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/05/19 09:58:43 INFO DAGScheduler: Stopping DAGScheduler
15/05/19 09:58:44 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/05/19 09:58:44 INFO MemoryStore: MemoryStore cleared
15/05/19 09:58:44 INFO BlockManager: BlockManager stopped
15/05/19 09:58:44 INFO BlockManagerMaster: BlockManagerMaster stopped
15/05/19 09:58:44 INFO SparkContext: Successfully stopped SparkContext
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.
15/05/19 09:58:44 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
|
清单 6. 计算结果
上面针对输入参数 10 的 PI 计算结果为 3.142888。
清单 7 所示代码是 Spark 安装包里自带的 SparkPI 类的源代码。
清单 7. SparkPI 程序源代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("Pi is roughly " + 4.0 * count / n);
jsc.stop();
}
}
|
一个 Spark 的任务对应一个 RDD,RDD 是弹性分布式数据集,即一个 RDD 代表一个被分区的只读数据集。一个 RDD 的生成只有两种途径,一是来自于内存集合和外部存储系统,另一种是通过转换操作来自于其他 RDD,比如 map、filter、join,等等。清单 7 所示程序定义了一个名为 dataSet 的 RDD。
清单 5 输出的大量信息都是计算机信息、Spark 信息,这些信息都是通过内部调用的若干脚本输出的,我们来看看具体运行示例代码的脚本。我们首先运行脚本 run-example,它的核心代码如清单 8-10 所示。
清单 8. run-example 脚本源代码1
2
3
| FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
export SPARK_HOME="$FWDIR"
EXAMPLES_DIR="$FWDIR"/examples
|
清单 8 所示代码设置了示例代码目录,这里是当前目录下的 examples 文件夹。接下来指定第一个参数是运行的类名称,如清单 9 所示。
清单 9. run-example 脚本源代码1
2
3
| if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
Shift
|
脚本调用 spark-submit 脚本进入下一执行层级。
清单 10. run-example 脚本源代码1
2
3
4
5
| "$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
"$@"
|
接下来我们来看看 spark-submit 脚本里面的内容。
清单 11. spark-submit 脚本源代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
elif [ "$1" = "--properties-file" ]; then
SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
elif [ "$1" = "--master" ]; then
export MASTER=$2
fi
shift
done
|
上面代码通过用户从 run-example 脚本里传入的参数,此处为 master,来确定运行模式,然后调用 spark-class 脚本,如清单 12 所示。
清单 12. spark-class 脚本源代码1
| exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
|
清单 12 所示代码调用了 Spark-Class 运行程序 SparkSubmit。
具体介绍 Spark-Class 程序代码之前,我们来尝试运行 WordCount 实验,假设已经创建了一个名为 wordcountdata.txt 的文本文件,该文件被放置在目录 (/home/zhoumingyao/spark/spark-1.2.1-bin-hadoop2.4/spark-1.2.1-bin-hadoop2.4),接着如清单 13 所示,开始运行程序。
清单 13. 运行 WordCount 程序1
| root@localhost:3 spark-1.2.1-bin-hadoop2.4]# ./bin/run-example JavaWordCount ./wordcountdata.txt
|
输出如清单 14 所示,这里忽略了与清单 5 相同输出的内容,以及大部分清单 13 的计算结果,只列出少量结果。统计字符出现次数的详细信息如清单 14 所示。
清单 14. 输出结果1
2
3
4
5
6
7
8
9
10
11
12
13
| For: 4
SQLMLlib: 1
subfolder: 1
OS).: 1
Streaming,: 1
APIs: 1
full: 1
--master: 3
through: 1
Provisioning3rd-Party: 1
applications: 4
graph: 3
over: 1
|
|