在 Bluemix 上使用 Spark 在浏览器中分析天气数据(3)
 
- UID
- 1066743
|

在 Bluemix 上使用 Spark 在浏览器中分析天气数据(3)
第 5 步. 解析和分析数据- 要真正开始处理数据,需要将它们解析成列。这是通过将 RDD 中的每一行映射到一个(用逗号划分行的)函数来完成的。
1
| weatherParse = weather.map(lambda line : line.split(","))
|
Python 中的 lambda 符号用于创建未绑定到某个名称的匿名函数。此概念被用于上述代码中,以便将某个函数作为映射函数的参数进行传递。匿名函数接收来自 RDD weather 的每一行,并用逗号分隔它们。
因此,新的 RDD weatherParse 被定义为字符串列表的一个列表。并且每个列表中的字符串是该行的各个元素。
- 下面就让我们来看看第一个列表:
- 现在,让我们来看看第一个列表的各个元素,该列表的第一项开始于偏移量零:
1
| weatherParse.first()[0]
|
- 您还可以通过索引拉取其他元素。
1
| weatherParse.first()[2]
|
 - 现在,您可以通过只选择包含降水量数据值的行来减少数据集,在这些行中,METRIC 列等于 PRCP。
- RDD weatherPrecp 包含一个队值 (v1, v2) 列表,其中 v1 是一个气象站标识符,v2 是该气象站的一次降水数据点(一天)。下面的表 1 展示了这种结构。表 1.键值 气象站 1 值 1 气象站 2 值 2 气象站 1 值 3 气象站 2 值 4 气象站 3 值 5 ... ...
- 接着,将该数据集转换(映射)到一个新的数据集,其中每行(数据对)都增加 1。下面的表 2 展示了这种新的结构。表 2.键 值 气象站 1 (值 1,1) 气象站 2 (值 2,1) 气象站 1 (值 3,1) 气象站 2 (值 4,1) 气象站 3 (值 5,1) ... ...
- 现在,通过运行下列代码创建 weatherPrecpCountByKey。
1
2
3
4
| # x[0] is the station
# x[3] is the precipitation value
weatherPrecpCountByKey = weatherPrecp.map(lambda x : (x[0], (int(x[3]), 1)))
weatherPrecpCountByKey.first()
|
表 2 只是一个过渡。因为随后能够将表 2 的映射精简成表 3 所表示的形式。
表 3.键 值 气象站 1 (值 1 + 值 3,2) 气象站 2 (值 2 + 值 4,2) 气象站 3 (值 5,1) ... ...
在该表中,通过用相应的计数除以值的总和,可以计算出每个气象站的平均降水量。使用来自 Spark API 的 reduceByKey 函数生成表 3。
1
2
| weatherPrecpAddByKey = weatherPrecpCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1]))
weatherPrecpAddByKey.first()
|
- 现在,您可以最终计算每个气象站的平均值。通过用一个函数映射 weatherPrecpAddByKey RDD 来创建 weatherAverages RDD,该函数将用总的读数除以总的降水量。
1
2
| weatherAverages = weatherPrecpAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) )
weatherAverages.first()
|
 - 现在,您可以打印前十个气象站及其平均降水量。
1
2
| for pair in weatherAverages.top(10):
print "Station %s had average precipitations of %f" % (pair[0],pair[1])
|
- 如果想要输出具有最高平均降水量的 10 个气象站,可反转成对出现的气象站 ID 和平均值的顺序。要实现此操作,只需使用一个交换对值顺序的函数。
1
2
3
4
5
6
| precTop10=[]
stationsTop10=[]
for pair in weatherAverages.map(lambda (x,y) : (y,x)).top(10):
precTop10.append(pair[0])
stationsTop10.append(pair[1])
print "Station %s had average precipitations of %f" % (pair[1],pair[0])
|
- 使用交互式 notebook,很容易创建这些结果的曲线图。
1
2
3
4
5
6
7
8
9
10
11
12
13
| %matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
N = 10
index = np.arange(N)
bar_width = 0.5
plt.bar(index, precTop10, bar_width,
color='b')
plt.xlabel('Stations')
plt.ylabel('Precipitations')
plt.title('10 stations with the highest average precipitation')
plt.xticks(index + bar_width, stationsTop10, rotation=90)
plt.show()
|
 |
|
|
|
|
|