首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

在 Bluemix 上使用 Spark 在浏览器中分析天气数据(3)

在 Bluemix 上使用 Spark 在浏览器中分析天气数据(3)

第 5 步. 解析和分析数据
  • 要真正开始处理数据,需要将它们解析成列。这是通过将 RDD 中的每一行映射到一个(用逗号划分行的)函数来完成的。                        
    1
    weatherParse = weather.map(lambda line : line.split(","))




    Python                        中的 lambda                        符号用于创建未绑定到某个名称的匿名函数。此概念被用于上述代码中,以便将某个函数作为映射函数的参数进行传递。匿名函数接收来自 RDD                            weather 的每一行,并用逗号分隔它们。
    因此,新的 RDD                            weatherParse 被定义为字符串列表的一个列表。并且每个列表中的字符串是该行的各个元素。
  • 下面就让我们来看看第一个列表:                        
    1
    weatherParse.first()




  • 现在,让我们来看看第一个列表的各个元素,该列表的第一项开始于偏移量零:                        
    1
    weatherParse.first()[0]




  • 您还可以通过索引拉取其他元素。                        
    1
    weatherParse.first()[2]




  • 现在,您可以通过只选择包含降水量数据值的行来减少数据集,在这些行中,METRIC 列等于 PRCP。
  • RDD weatherPrecp 包含一个队值 (v1, v2) 列表,其中 v1 是一个气象站标识符,v2                    是该气象站的一次降水数据点(一天)。下面的表 1 展示了这种结构。表 1.键值 气象站 1  值 1  气象站 2  值 2  气象站 1  值 3  气象站 2  值 4  气象站 3  值 5  ...  ...
  • 接着,将该数据集转换(映射)到一个新的数据集,其中每行(数据对)都增加 1。下面的表 2 展示了这种新的结构。表 2.键 值  气象站 1  (值 1,1)  气象站 2  (值 2,1)  气象站 1  (值 3,1)  气象站 2  (值 4,1)  气象站 3  (值 5,1)  ...  ...
  • 现在,通过运行下列代码创建 weatherPrecpCountByKey。                        
    1
    2
    3
    4
    # x[0] is the station
    # x[3] is the precipitation value
    weatherPrecpCountByKey = weatherPrecp.map(lambda x : (x[0], (int(x[3]), 1)))
    weatherPrecpCountByKey.first()




    表                        2 只是一个过渡。因为随后能够将表 2 的映射精简成表 3 所表示的形式。
    表 3.键 值  气象站 1  (值 1 + 值 3,2)  气象站 2  (值 2 + 值 4,2)  气象站 3  (值 5,1)  ...  ...
    在该表中,通过用相应的计数除以值的总和,可以计算出每个气象站的平均降水量。使用来自 Spark API 的                            reduceByKey 函数生成表                        3。
    1
    2
    weatherPrecpAddByKey = weatherPrecpCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1]))
    weatherPrecpAddByKey.first()




  • 现在,您可以最终计算每个气象站的平均值。通过用一个函数映射 weatherPrecpAddByKey RDD 来创建                        weatherAverages RDD,该函数将用总的读数除以总的降水量。                        
    1
    2
    weatherAverages = weatherPrecpAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) )
    weatherAverages.first()




  • 现在,您可以打印前十个气象站及其平均降水量。                        
    1
    2
    for pair in weatherAverages.top(10):
        print "Station %s had average precipitations of %f" % (pair[0],pair[1])




  • 如果想要输出具有最高平均降水量的 10 个气象站,可反转成对出现的气象站 ID 和平均值的顺序。要实现此操作,只需使用一个交换对值顺序的函数。                        
    1
    2
    3
    4
    5
    6
    precTop10=[]
    stationsTop10=[]
    for pair in weatherAverages.map(lambda (x,y) : (y,x)).top(10):
        precTop10.append(pair[0])
        stationsTop10.append(pair[1])
        print "Station %s had average precipitations of %f" % (pair[1],pair[0])




  • 使用交互式 notebook,很容易创建这些结果的曲线图。                        
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    %matplotlib inline
    import numpy as np
    import matplotlib.pyplot as plt
    N = 10
    index = np.arange(N)  
    bar_width = 0.5
    plt.bar(index, precTop10, bar_width,
    color='b')
    plt.xlabel('Stations')
    plt.ylabel('Precipitations')
    plt.title('10 stations with the highest average precipitation')
    plt.xticks(index + bar_width, stationsTop10, rotation=90)
    plt.show()




返回列表