压缩
spark.io.compression.codec
默认lz4
可选(lz4
,lzf
,snappy
,zstd
)
广播变量压缩(spark.boardcast.compress
,true)会使用该配置的压缩方式
RDD缓存压缩(spark.rdd.compress
,false)会使用该配置的压缩方式
Shuffle-Map输出压缩(spark.shuffle.compress
,true)会使用该配置的压缩方式
Shuffle-Spill压缩(spark.shuffle.spill.compress
,true)会使用该配置的压缩方式
event-log压缩(spark.eventLog.compress
,false)会使用该配置的压缩方式spark.io.compression.lz4.blockSize
默认32K
spark.io.compression.snappy.blockSize
默认32K
使用lz4
或snappy
压缩时的压缩块大小spark.shuffle.compress
Map输出压缩,默认truespark.shuffle.spill.compress
溢写压缩,默认truespark.shuffle.consolidateFiles
默认false ,可以设为true
这是在Spark2.x已经废弃,但老版本可以使用的优化,在使用优化版的Hash-Shuffle
序列化
spark.serializer
使用的序列化器 默认JavaSerializer.可选Kyro
如果使用Kryo可以不从这里设置,而在conf.registerKryoClasses()
注册类型时会自动转换spark.serializer
为Kryo序列化器
内存管理
spark.memory.fraction
Spark内存占据JVM堆比例,默认0.6
Spark内存是一个泛概念,实际指Spark存储内存
+Spark执行内存
的总和spark.memory.storageFraction
Spark存储内存占Spark内存比率,默认0.5
没有特指的Spark执行内存
设置(Spark剩余内存即是)
一般来说需要压低Spark存储内存
,特别是缓存使用较少但大量使用shuffle的情况
存储内存
占比0.5有点偏高,存储的LRU算法容易造成伪内存泄露,也不利于GC
执行内存
占比0.5又有点偏低,无论是Shuffle还是排序聚合消耗的都是执行内存
(Spark2.X的动态资源调度没有再特指Shuffle内存
,而是将其统一移入了执行内存
)
执行
任务(Task&Stage)
spark.default.parallelism
默认并行度(非常重要)
默认并行度指的是Spark中的无法通过上级推断出并行度的场景,作为默认并行度
比如byKey或join等Shuffle场景(无法推断Key数量),又或者Spark读取一个本地文件(非HDFS),并且用户也没有特别在方法调用时指定并行度的时候
默认并行度的默认值 是一个动态计算值,计算规则如下
本地模式
为核数,Mesos细粒度
为8,其它(包括YARN
)为所有executor核总数(或2,谁大取谁)spark.stage.maxConsecutiveAttempts
Stage的最大失败次数,默认4spark.task.maxFailures
Task的最大失败次数.默认4(必须大于1)
每失败一次减一,然后重启执行,直到失败次数降为1为止spark.task.reaper.enable
资源
spark.executor.cores
executor的核数(YARN下默认为1)spark.executor.heartbeatInterval
executor对driver的心跳间隔,默认10S
调度
spark.scheduler.maxRegisteredResourcesWaitTime
执行前的等待资源分配的最大超时时间,默认30Sspark.scheduler.minRegisteredResourceRatio
实际执行的预期资源百分比,默认0.8(YARN)spark.scheduler.mode
调度模式,默认FIFO
,可选FIFO
,FAIR
spark.scheduler.revive.interval
调度任务间隔 默认1S
任务本身是在CoarseGrainedSchedulerBackend.DriverEndPoint
的一个消息队列中,任务的调度是一个定时器线程定时从这个队列取出任务然后进行分配,这个定时器的时间间隔就是由该参数控制spark.scheduler.listenerbus.eventqueue.capacity
Spark的事件中枢队列上限 默认10000
Spark的内部设计是一个基于事件总线的异步架构,所有组件的的交互(Stage解析提交,Task解析调度等等)都是通过这个事件总线来完成的,而事件队列的上限就由该参数控制
大集群任务可以考虑适当增加该参数(超过事件队列上限的消息会被删除),但也会增加driver的内存消耗
本地化
spark.locality.wait
本地化的降级间隔,默认3S
一般情况下这都是需要下调的.每一个级别降低间隔3S,总体就是十几秒,这对任务的延时影响比较大 如果本地化要求没这么高的话,一般1S足矣
黑名单
spark.backlist.enabled
是否开启黑名单机制,默认falsespark.backlist.timeout
黑名单恢复时间,默认1小时(实验参数)spark.blacklist.task.maxTaskAttemptsPerExecutor
executor被加入黑名单之前,Task在该Executor可以被重试多少次.默认1次spark.blacklist.task.maxTaskAttemptsPerNode
node被加入黑名单之前,Task在该Node可以被重试多少次,默认2次spark.blacklist.stage.maxFailedTasksPerExecutor
executor被加入黑名单之前,一个Stage中有多少个Task在该Executor失败,默认2个spark.blacklist.stage.maxFailedExecutorsPerNode
node被加入黑名单之前,一个Stage有多少个Task在该Node失败,默认2个spark.blacklist.application.maxFailedTasksPerExecutor
在成功的任务集中,必须有多少个不同的Task失败才能将executor加入黑名单,默认2个spark.blacklist.application.maxFailedExecutorsPerNode
在成功的任务集中,必须有多少个不同的Task失败才能将Node加入黑名单,默认2个spark.blacklist.killBlacklistedExecutors
如果一个executor或node被加入黑名单,是否杀死该executor或node(在其上的所有Task也会被杀死).默认falsespark.blacklist.application.fetchFailure.enabled
是否将数据拉取请求也计入黑名单考察范围.(如果是动态资源调度的外部Shuffle,则会以node为单位),默认false
推测执行
spark.speculation
是否开启推测执行,默认falsespark.speculation.interval
推测执行的检测频率,默认100ms
可以适当调高(3秒5秒都可以),推测执行检测没必要这么高频spark.speculation.multiplier
任务执行时间超过平均执行X倍即视为慢任务,默认1.5spark.speculation.quantile
任务执行完毕占比X之后开始检测推测执行.默认0.75
这个可以调高一点(0.95或者更高都可以),慢任务推测应该是少量情况,检测设的太低会导致大量的任务都被推测执行了
动态资源调度
spark.dynamicAllocation.enabled
是否启用动态资源调度.默认false (必须同时开启spark.shuffle.service.enabled
)spark.dynamicAllocation.executorIdleTimeout
在动态资源调度中,如果executor保持空闲超过该时间,则该executor会被杀死回收,默认60Sspark.dynamicAllocation.cachedExecutorIdleTimeout
在动态资源调度中,如果executor作为数据存储点空闲超过多少时间则会被杀死回收,默认无限,也就是不回收spark.dynamicAllocation.initialExecutors
动态资源调度中,初始executor数,默认等于spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.minExecutors
动态资源调度中维持最少executor数,默认0spark.dynamicAllocation.maxExecutors
动态资源调度中维持最多executor数,默认无限spark.dynamicAllocation.executorAllocationRatio
动态资源调度中默认会按照任务本身的并行度*比率
去申请executor.默认为1
这个比率的结果最终将被minExecutors
和maxExecutors
覆盖spark.dynamicAllocation.schedulerBacklogTimeout
动态资源调度中,如果Task积压超过该时间则会尝试申请新的executor,默认1S
网络
spark.netwrok.timeout
执行的默认最大超时时间,最大120S 这个默认值本身没有意义,但会作为以下配置的默认值
spark.core.connection.ack.wait.timeout
连接在超时和放弃之前等待ACK响应时间
spark.storage.blockManagerSlaveTimeoutMs
BlockManagerSlaver
超时时间
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout
RPC请求超时时间
spark.rpc.lookupTimeout
spark.rpc.numRetries
RPC请求的重试此时,默认3次,可以适当调高spark.rpc.retry.wait
PRC请求的创世间隔,默认3S
Shuffle
spark.shuffle.file.buffer
文件溢写缓冲大小 默认32K
如果内存充足可以适当调大,这可以减少Map端文件溢写次数spark.shuffle.io.maxReties
默认3次
这是拉取请求的错误重试次数,一般可以适当调大(30次)
因为网络波动或目标executor处于GC过程导致不可访问,shuffle请求很可能不成功,调大可以提高稳定性spark.shuffle.io.retryWait
默认5S
这是拉取请求的最大超时时间,一般可以适当调大(30S),原因同上spark.reducer.maxSizeInFlight
默认48M
拉取请求的缓冲块(一次拉取多少大小的数据),如果内存充足可以适当调大,可以减少拉取次数spark.reducer.maxBlocksInFlightPerAddress
默认Int.Max
每个主机(Address)可以被多少个Reducer主机(Address)拉取数据,在大集群中可以适当调低来降低某个热点executor的压力spark.reducer.maxReqsInFlight
默认Int.Max
每个主机(Address)可以接收多少个Reducer拉取请求,大集群中可以适当调低来降低某个热点executor的压力
内存管理
spark.memory.fraction
Spark内存,默认0.6- Spark1.6
spark.shuffle.io.memoryFraction
默认0.2 适当调大
Spark2.Xspark.memory,storageFracation
默认0.5,适当压低
Spark2.X是动态内存管理,Shuffle是占据执行内存的一部分,通过压低存储内存可以调高执行内存
Streaming
spark.streaming.backpressure.enabled
是否启用背压,默认false
建议开启,特别是在Streaming+Kafka的时候,配合单分区最大拉取数可以很好的控制消费速率spark.streaming.blockInterval
块拆分间隔,默认200MSspark.streaming.receiver.writeAheadLog.enable
是否启用Receiver预写日志,默认falsespark.streaming.unpersist
是否自动清理RDD缓存,默认为truespark.streaming.stopGracefullyOnShutdown
Streaming 是否优雅关闭,默认falsespark.streaming.kafka.maxRatePerPartition
Kafka的Direct模式下单分区最大拉取消息数,默认无限
建议显式设置,Streaming控制消费速率是非常重要的spark.streaming.kafka.maxRetries
Kafka的Direct模式下拉取的重试次数,默认为1
(这里是指重试次数1次,即总共最大会请求两次)
环境
spark.driver.extraJavaOptions
传递给driver端的JVM配置参数
不应该使用该参数调整堆大小(-Xmx
)
Client模式下,不能通过SparkConf传递该参数(因为事实上driver此时已经启动了),而必须使用命名参数driver-java-options
spark.executor.extraJavaOptions
传递给executor的JVM配置参数
同driver设置,不应该用该参数调整堆大小
--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"
参数总结
spark-submit \
--name AppName \
#--properties-file spark-submit-common.conf \# 通用提交参数包装,
###############
# 资源和调度相关
###############
--master yarn \
--queue ${queueName} \
--deploy-mode ${cluster} \
--class org.apache.spark.examples.SparkPi \
--driver-cores ${1} \
--driver-memory ${1g} \
--num-executors ${2} \
--executor-core ${1} \
--executor-memory ${1g} \
#############
# 常用Conf配置
#############
--conf spark.default.parallelism=${10} \ # 默认并行度,必调参数,但取值依赖实际情况
--conf spark.memory.storageFraction=${0.3} \ # 压低存储内存,必调参数,但取值依赖实际情况
--conf spark.locality.wait=1s \ # 本地化的降级等待时间压低为1S
--conf spark.netwrok.timeout=300s \ # 提高网络超时时间来提升总稳定性,默认120S,会影响包括连接请求ACK,BlockSlayer超时,shuffle的IO连接超时,RPC连接超时等等
--conf spark.shuffle.io.maxRetries=30 \ # 提高Shuffle拉取请求重试次数,默认3次
--conf spark.shuffle.io.retryWait=30S \ # 提高Shuffle拉取的最大超时,默认3S
--conf spark.io.compression.codec=snappy \ # 使用Snappy快压,默认lz4,会影响shuffle-map-output,shuffle.spill,boardcast-compress,rdd-compress等等
--conf spark.driver.memoryOverhead=2g \ # 堆外内存,提高到2G
# Streaming相关
--conf spark.streaming.backpressure.enable=true \ # 必开背压(默认false)
--conf spark.streaming.kafka.maxRatePerPatition=${5000} \ # 必开Kafka限速(每个Kafka分区)
--conf spark.streaming.stopGracefullyOnShutdown=true \ # 必开优雅关闭
# 黑名单相关(可选)
--conf spark.blacklist.enabled=true \ # 开启黑名单
--conf spark.blacklist.timeout=${15m} \ # 黑名单恢复,默认1小时,小集群缩短到15分钟
--conf spark.blacklist.stage.maxFailedTasksPerExecutor=${5} \ # executor的Stage级别Task失败,默认2次,这个小集群需要调大
--conf spark.blacklist.stage.maxFailedExecutorsPerNode=${5} \ # node的Stage级别Task失败,默认2次,这个小集群需要调大
# 推测执行相关(可选)
--conf spark.speculation=true \ # 开启推测执行(可选)
--conf spark.speculation.interval=3s \ # 监测间隔(提高为3S,默认200Ms)
--conf spark,speculation.quantile=${0.95} \ # 完成百分比监测(默认0.75容易产生太多的推测)
# 动态资源调度相关(可选)
--conf spark.shuffle.service.enabled=true \ # 必开Shuffle外部服务
--conf spark.dynamicAllocation.enabled=true \ # 开启动态资源调度
--conf spark.dynamicAllocation.executorIdleTimeout=300S \ # 提高executor空闲回收时间来避免大量产生资源申请,默认60S
--conf spark.dynamicAllocation.minExecutors=${1} \ # 资源最低保留(根据任务和业务),默认0
--conf spark.dynamicAllocation.maxExecutors=${10} \ # 资源最多申请(根据任务和业务),默认无限
--conf spark.dynamicAllocation.schedulerBacklogTimeout=${1s} \ # Task积压后申请资源,根据业务上可以忍耐的任务延时而定,默认1S
# 在应用级别的变更日志级别方式(调试专用)
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties"
#############
# GC调优
#############
# G1(常规使用,深入调优必须依赖GC日志,这个专章描述)
# JVM:关闭应用级GC,固定堆大小,打印GC日志,设定元空间上限 G1:常规启用,使用最大Region
--conf "spark.executor.extraJavaOptions=-XX:-DisableExplicitGC -Xms${?} -Xmx${?} -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:${app}-gc.log -XX:MaxMetaSpaceSize=1024M -XX:+UseG1GC -XX:G1HeapRegionSize=32M"
# CMS(常规使用,深入调优必须依赖GC日志,这个专章描述)
# JVM:关闭应用级GC,固定堆大小,打印GC日志,设定元空间上限 CMS:常规启用,限定5次清理配一次整理
--conf "spark.executor.extraJavaOptions=-XX:-DisableExplicitGC -Xms${?} -Xmx${?} -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:${app}-gc.log -XX:MaxMetaSpaceSize=1024M -XX:+UseConcMarkSweep -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCBeforeCompaction=5"
#############################
# 最后的jar以及传入的main函数参数
#############################
/mypath/app.jar \
100(my_main_args)