概述
Spark是一个高度依赖内存的计算引擎.这也是它比传统MapReduce计算速度更快的原因之一
所以Spark的内存分配和管理,是Spark一个比较重要的模块
Spark 内存分配
Spark 的 executor ,本质是一个JVM进程,所以其内存分配必然是基于JVM的(堆内内存,On-heap)
但同时,Spark也同时引入了堆外内存(Off-heap),使之可以在JVM之外在节点内存中直接开辟
堆内内存
堆内内存的大小,就是启动时–executor-memory或者配置spark.executor.memory参数配置
所有该executor内执行的Task共享堆内内存
堆内内存可能被用于executor数据存储的被称为存储内存,比如缓存,或者是广播变量存储等,也可能被用于计算,比如shuffle中间数据存储等等,被称为计算内存.(后面详解)
所谓的堆内内存的本质就是JVM内存,所以这部分的内存是由JVM进行管理
这种管理体现在两部分 申请和释放
- 内存申请
堆内内存的申请,是向JVM进行申请,由JVM进行分配,并且会存储引用根等等 - 内存释放
堆内内存的释放也是由JVM进行自动释放.(就是GC那一套)
堆内内存也会有GC的一切特征,比如
- 对象的释放并不是精准释放,
- 对象的释放是有延迟的
堆外内存
堆内内存的劣势在于GC,即每一个对象的释放都不是精准释放而是必须通过GC扫描
所以Spark也利用Java-UnSafe Api引入了堆外内存.直接从JVM之外的操作系统中开辟内存
在默认情况下堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存
堆外与堆内的选择
堆外内存的好处是
- 可以精准释放(谁申请谁释放),而无需通过频繁的GC扫描
- 堆外内存的序列化的数据占用的空间可以被精确计算
堆外内存的坏处是
- 堆外内存只能存储二进制数据
这意味着要使用堆外内存,必须要经过数据序列化为二进制和二进制反序列化数据
所以堆外内存并不一定会比堆内内存效率高.
具体如何选择在于权衡 GC的压力更大还是序列化的压力更大
- 如果小内存的大量的小对象或者大量超大对象等等GC的压力会非常大的时候,就可以考虑堆外内存
- 反之,如果堆内的GC运行的非常正常,就无需再耗费序列化去启用堆外内存
executor 内存管理
概述
org.apache.spark.memory.UnifiedMemoryManager
Spark executor 中的内存分为三大块 Spark内存,用户内存和保留内存
核心源码如下
object UnifiedMemoryManager {
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
....
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
.......
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
预留内存
保留内存是留出一定数量的内存用于非存储和非执行目的.
这是一个固定保留值 300M
ps:spark.testing.memory其实可以调节,但这明显是个测试配置,所以不应该使用它
可用内存 = systemMemory(申请资源) - reservedMemory(固定值300)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
有一个隐含条件是 必须是1.5倍保留内存,即至少450M,才能启动
用户内存
用户内存是指用户产生的,或者说应用程序本身主导的数据存储.比如使用的一些临时数据或者数据结构等等.
用户内存占比 (申请内存-预留内存300M)(1-spark.memory.fraction)
spark.memory.fraction 1.X版本是0.75 2.x版本是0.6
如果如果默认申请 2G 则在一个Stage内部Task算子执行最大内存
在2.x版本中不能超过 (2048-300)0.4=699M
在1.x版本不能超过(2048-300)*0.25=437M
有一个面试题
100个Executor,每个4G.处理100G数据,则每个executor分配1G数据
1G远小于4G为什么使用mapPartition会报OOM
这就是因为4G真正用户能用的只有949M,而mapPartition加载一次分区数据需要1G,所以会OOM
注意: 这是1.x的情况 2.x 版本用户内存系数默认提升至40% 所以可用为1518M
Spark内存
这部分内存是由Spark控制和使用,由spark.memory.fraction控制,默认0.6(2.x版本)
由Spark使用,主要是指两种情况
- RDD缓存以及广播变量存储等.这部分被称为存储内存
- shuffle中间数据存储,连接,排序等用于Spark计算的存储,这部分被称为执行存储
存储内存占比,由 spark.memory.storageFraction 控制,默认0.5
剩余部分为执行内存,默认为 1- spark.memory.storageFraction
在Spark2.x,Spark内存管理已经是以统一内存管理方式进行分配
它的意思是
- 如果是存储内存少,而执行内存需求大,执行内存可以借用存储内存进行执行
- 存储内存可以设定一个最小阈值,保证在被借用时始终保持一下最小存储内存使用量
- 如果是执行内存小,存储内存大,同上
- 在借用的过程中,如果执行内存因为被借出而导致内存不够,执行内存会杀死部分存储内存而夺回资源,但是如果存储内存因为借出而导致的内存不够,它会等待执行内存用完释放而不会杀死对方