概述
OOM即内存溢出,简单来说就是JVM堆满了,在GC清理掉不可达对象之后仍然无法继续分配,就会OOM
Spark运行时分为driver
和executor
两端启动执行,所以出现OOM后,首先也必须区分针对
Driver溢出
driver的职责是协调调度executor,必要时也会接收executor结果.
所以如果是在driver端出现OOM,常见的原因有
返回结果集过大
- 首先关注executor是否返回了过大的数据集结果
因为driver对executor结果拉回是针对所有所有executor的(1个10M,100个就1G了)
一般情况下,不应该依赖将executor执行结果拉回到driver处理.应该尽可能的在executor就地解决
如果确实有需要从executor拉回,数据集也应该是诸如聚合统计结果之类的非常非常小的结果集
driver堆过小
driver负责协调调度工作,它类似NN与RM的结合体,正常情况下确实需要适当调大driver的堆内存
- driver负责存储元数据
Stage DAG,TaskScheduler, BlockManager,Metric等等.这些都是内存存储的 - driver负责接收所有节点的心跳
Executor溢出
executor负责实际的执行工作.
executor的内存主要被Spark执行内存
,Spark存储内存
,用户使用内存
三块瓜分(详见Spark内存管理一章)
OOM也主要是针对这些部分处理
合适的executor堆大小
一个简单的executor内存指标是 两倍数据大小
依据来源是 用户内存(1-spark.memory.fraction
,默认占比0.4)能至少完整放下一份数据
此外,可以适当的压低存储占比来为执行留出更多的内存
存储占比默认0.5spark.memory.storageFracation
,但是Spark存储是基于LRU,如果存储占比过高,存储不释放而造成隐型的内存泄露
用户内存:代码逻辑优化
OOM是Java堆溢出,Java代码逻辑优化也是可行的
比如 循环内反复创建,使用StringBuilder拼接字符串等等
executor数据过重
分区数过低
分区数过低,则相应分区的数据越多,需要耗用的内存也就越多.
解决办法就是通过reparation
重分区来扩大分区
分区数过低有一种特殊情况,比如常用的小文件合并方案coalesce(1)
原数据有10分区(10个文件块),这时为了合并小文件coalesce(1),就可能产生OOM
原因在于
coalesce(1)用在降低分区时,会是一种窄依赖
=> 也就是说与前面是在一个Stage中
=> 也就是说整体会用一个Task来完成Stage流程,
=> 一个Task读取这10个文件块,这里就可能会产生OOM了
解决办法非常简单 coalesce时强制shuffle打断Stage,让前半段读取保持高并行度,最后再合并
数据倾斜
另一种数据过重的情况是某一个或某一些key或者Hash-Key都归到某一个分区中,造成这个分区的数据过重
解决办法
- 对同分区Key,依然可以采用
reparation
重分区来进行 - 对同Key倾斜,重分区就没有意义了,这时候可以采用
随机前缀打散+二次聚合
的思路解决