首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Spark Core 问题详解(3)

Spark Core 问题详解(3)

安全如何支持多用户 Spark 运行的时候使用的是同一个账户,但是在某些场景下需要支持多用户,比如 Spark                                SQL 的 ThriftServer 服务要面向所有用户开放,如果都使用一个账户执行任务,数据的安全就无法保障了。尝试过创建多个 SparkContext,但是 Spark 目前的实现是不支持在同一个 JVM 里创建多个 SparkContext。下面介绍一种基于 Task 的实现方式,它需要注意以下两点:
  • 能够模拟某个用户操作,并且这个操作是线程安全的,避免影响到别的任务。
  • 执行某个任务时,driver 端和 executor 端都要同时使用该模拟用户执行。
Hadoop 本身提供了一个线程安全的模拟其他用户的方法(UserGroupInformation 的 doAs 方法),具体的实现有三点:
  • 在 driver 端使用 UserGroupInformation 的 doAs 方法模拟用户操作。
  • 给 Task 类添加 user 属性,在 DAGSchedular 创建 Task 的时候把当前模拟用户传给 Task 的 user 属性。
  • 在 Task 的 run 方法里使用 UserGroupInformation 的 doAs 方法模拟该用户的操作。
展望 Spark 1.5 的 Tungsten-sort                                升级到 1.4.1 之后,在 Shuffle 的时候大任务总是时不时挂掉一批任务,运行时间大大延长了。除了增加 partition、在某些场景下用 reduceByKey 代替 groupByKey 等一些常见的方法之后,貌似也没什么好的方法,而且 partition 数增大到一定数量之后弊端很大。
Databricks 之前发布了一个“钨丝计划”,号称要榨干 JVM 的性能。现在 Spark                                1.5 已经发布了,它会引入一种新的 Shuffle 方式,不过暂时只在使用 Spark-SQL 的时候才默认开启。现在一起来看看新的 Shuffle 方式 tungsten-sort 它是怎么实现的。
要查看 Shuffle 的过程可以直接找到 ShuffleMapTask 这个类,它是 Shuffle 的起点。
下图是整个 tungsten-sort 的写入每条记录的过程:
                                1、Record 的 key 和 value 会以二进制的格式存储写入到 ByteArrayOutputStream 当中,用二进制的形式存储的好处是可以减少序列化和反序列化的时间。然后判断当前 Page 是否有足够的内存,如果有足够的空间就写入到当前 Page(注:Page 是一块连续的内存)。写入 Page 之后,会把内存地址 address 和 partitionId 编码成一个 8 字节的长整形记录在 InMemorySorter 当中。
                                2、当前 Page 内存不够的时候会去申请新的 Page,如果内存不够就要把当前数据 Spill 到磁盘了。Shuffle 可以利用的内存默认是 Executor 内存的 0.2*0.8=0.16,它是由下面两个参数来决定的,如果数据量比较大,建议增大下面两个值,减少 Spill 的次数。
spark.shuffle.memoryFraction 0.2
spark.shuffle.safetyFraction 0.8
3、Spill 的过程,从 InMemorySorter 反编码出来内存地址,按照 partitionId 的顺序把数据从内存写入到一个文件当中,不会对同一个 partition 的数据做排序。
                                4、Spill 完了内存足够就申请新的 Page,内存不够就要报错了!因为内存不够就直接抛异常的做法是无法在生产环境运行。Bug 产生的原因和它为每个任务平均分配内存的机制有关系,在数据倾斜的场景很容易复现该问题,并且这个异常不应该抛,内存不足就继续 Spill。请关注下面这个 Bug。

实践的时候发现有两个方法可以降低它产生的几率,增加 partition 数量和减小 Page 的大小。Page 的大小通过参数 spark.buffer.pageSize 来设置,单位是 bytes,最小是 1MB,最大是 64MB。默认的计算公式是:nextPowerOf2(maxMemory                                / cores / 16)                                (注:maxMemory 指的是上面提到的 Shuffle 可用内存,nextPowerOf2 是 2 的次方)。
                                5、所有数据写入完毕之后,会把 Spill 产生的所有文件合并成一个数据文件,并生成一个索引文件,如果 map 数是 M,那生成的文件个数就是 2M。Shuffle                                Writer 的工作到这里就结束了,Shuffle                                Reader 沿用了 Sort-based 的 Reader 来读取 Shuffle 产生的数据。合并的过程有个优化点,它会使用 NIO 的 FileChannel 去合并文件,不过使用条件比较苛刻,必须设置以下参数并且 Kernel 内核不能是 2.6.32 版本。
spark.shuffle.compress true
spark.io.compression.codec org.apache.spark.io.LZFCompressionCodec
spark.file.transferTo true
返回列表