首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 Spark 和 IBM Cloud Object Storage 更快地分析数据(3)

使用 Spark 和 IBM Cloud Object Storage 更快地分析数据(3)

从 Spark 访问 IBM COS 对象
在 Spark 上启用 Stocator 后,就可以使用模式                swift2d://<container>.<service>/ 从 Spark 访问 IBM                COS 对象。swift2d 关键字告诉 Spark 使用哪个驱动程序来访问存储。它表明您正在使用 Stocator                访问一个对象存储。容器和服务将在下一节中更详细地介绍。
例如,以下 Python 代码从 IBM COS 读取一个名为 data.json 的 JSON 对象,并将它作为一个名为 data.parquet                的 Parquet 对象写回。
清单 6. 访问 IBM COS                对象
1
2
df = sqlContext.read.json("swift2d://vault.spark/data.json”)
df.write.parquet("swift2d://vault.spark/data.parquet”)




6

测试 Spark 与 IBM COS 之间的连接

为了测试 Spark 与 IBM COS 之间的连接,我们使用了一段简单的 Python 脚本,该脚本将单一列表 的 6 个元素分布在 Spark                集群上,将数据写入 Parquet 对象中,最后读回该对象。Parquet 对象的名称被作为参数传入脚本中。
该数据显示了两次:第一次是在写入对象存储之前,与其模式一起显示;第二次是在从对象存储读回之后。
清单 7. 测试连接的 Python                脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys

sc = SparkContext()
sqlContext = SQLContext(sc)

if (len(sys.argv) != 2):
    print "ERROR: This program takes object name as input"
    sys.exit(0)

objectName = sys.argv[1]

myList = [[1,'a'],[2,'b'],[3,'c'],[4,'d'],[5,'e'],[6,'f']]
parallelList = sc.parallelize(myList).collect()
schema = StructType([StructField('column1', IntegerType(), False),
                     StructField('column2', StringType(), False)])
df = sqlContext.createDataFrame(parallelList, schema)
df.printSchema()
df.show()
dfTarget = df.coalesce(1)
dfTarget.write.parquet("swift2d://vault.spark/" + objectName)
dfRead = sqlContext.read.parquet("swift2d://vault.spark/" + objectName)
dfRead.show()
print "Done!"




要运行该脚本,请完成以下步骤:
  • 将代码以文件 sniff.test.py 的形式保存在 清单 7                    中。
  • 创建一个名为 vault 的容器。
  • 将服务(url 中的容器名称后显示的词语)设置为 core-site.xml 文件中定义的                    SERVICE_NAME(请记住,我们的示例中使用了 spark)。
  • 发出以下命令,其中的 testing.parquet                    是要创建并读取的对象的名称:spark-submit --jars stocator-1.0.8-SNAPSHOT-jar-with-dependencies.jar sniff.test.py testing.parquet。
您会在 IBM COS 中看到一个 testing.parquet 对象,以及以下 Spark 输出:
清单 8. 确认已连接的 Spark                结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
root                                                                           
|-- column1: integer (nullable = false)
|-- column2: string (nullable = false)

+-------+-------+
|column1|column2|
+-------+-------+
|      1|      a|
|      2|      b|
|      3|      c|
|      4|      d|
|      5|      e|
|      6|      f|
+-------+-------+

+-------+-------+
|column1|column2|
+-------+-------+
|      1|      a|
|      2|      b|
|      3|      c|
|      4|      d|
|      5|      e|
|      6|      f|
+-------+-------+

Done!




结束语通过配置 Spark、Stocator 和 IBM Cloud Object Storage                来协同工作,可以使用对象存储语义更快地访问和分析存储的数据,而无需使用为处理文件系统设计的旧式存储连接器。
返回列表