NightPxy 个人技术博客

Spark(2.3)-常用配置参数

Posted on By NightPxy

压缩

  • spark.io.compression.codec 默认lz4 可选(lz4,lzf,snappy,zstd)
    广播变量压缩(spark.boardcast.compress,true)会使用该配置的压缩方式
    RDD缓存压缩(spark.rdd.compress,false)会使用该配置的压缩方式
    Shuffle-Map输出压缩(spark.shuffle.compress,true)会使用该配置的压缩方式
    Shuffle-Spill压缩(spark.shuffle.spill.compress,true)会使用该配置的压缩方式
    event-log压缩(spark.eventLog.compress,false)会使用该配置的压缩方式
  • spark.io.compression.lz4.blockSize 默认32K
    spark.io.compression.snappy.blockSize 默认32K
    使用lz4snappy压缩时的压缩块大小
  • spark.shuffle.compress Map输出压缩,默认true
  • spark.shuffle.spill.compress 溢写压缩,默认true
  • spark.shuffle.consolidateFiles 默认false ,可以设为true
    这是在Spark2.x已经废弃,但老版本可以使用的优化,在使用优化版的Hash-Shuffle

序列化

  • spark.serializer使用的序列化器 默认JavaSerializer.可选Kyro
    如果使用Kryo可以不从这里设置,而在conf.registerKryoClasses()注册类型时会自动转换spark.serializer为Kryo序列化器

内存管理

  • spark.memory.fraction Spark内存占据JVM堆比例,默认0.6
    Spark内存是一个泛概念,实际指 Spark存储内存+Spark执行内存的总和
  • spark.memory.storageFraction Spark存储内存占Spark内存比率,默认0.5
    没有特指的Spark执行内存设置(Spark剩余内存即是)
    一般来说需要压低Spark存储内存,特别是缓存使用较少但大量使用shuffle的情况
    存储内存占比0.5有点偏高,存储的LRU算法容易造成伪内存泄露,也不利于GC
    执行内存占比0.5又有点偏低,无论是Shuffle还是排序聚合消耗的都是执行内存(Spark2.X的动态资源调度没有再特指Shuffle内存,而是将其统一移入了执行内存)

执行

任务(Task&Stage)

  • spark.default.parallelism 默认并行度(非常重要)
    默认并行度指的是Spark中的无法通过上级推断出并行度的场景,作为默认并行度
    比如byKey或join等Shuffle场景(无法推断Key数量),又或者Spark读取一个本地文件(非HDFS),并且用户也没有特别在方法调用时指定并行度的时候
    默认并行度的默认值 是一个动态计算值,计算规则如下
    本地模式为核数,Mesos细粒度为8,其它(包括YARN)为所有executor核总数(或2,谁大取谁)
  • spark.stage.maxConsecutiveAttempts Stage的最大失败次数,默认4
  • spark.task.maxFailures Task的最大失败次数.默认4(必须大于1)
    每失败一次减一,然后重启执行,直到失败次数降为1为止
  • spark.task.reaper.enable

资源

  • spark.executor.cores executor的核数(YARN下默认为1)
  • spark.executor.heartbeatInterval executor对driver的心跳间隔,默认10S

调度

  • spark.scheduler.maxRegisteredResourcesWaitTime 执行前的等待资源分配的最大超时时间,默认30S
  • spark.scheduler.minRegisteredResourceRatio 实际执行的预期资源百分比,默认0.8(YARN)
  • spark.scheduler.mode 调度模式,默认FIFO,可选FIFO,FAIR
  • spark.scheduler.revive.interval 调度任务间隔 默认1S
    任务本身是在CoarseGrainedSchedulerBackend.DriverEndPoint的一个消息队列中,任务的调度是一个定时器线程定时从这个队列取出任务然后进行分配,这个定时器的时间间隔就是由该参数控制
  • spark.scheduler.listenerbus.eventqueue.capacity Spark的事件中枢队列上限 默认10000
    Spark的内部设计是一个基于事件总线的异步架构,所有组件的的交互(Stage解析提交,Task解析调度等等)都是通过这个事件总线来完成的,而事件队列的上限就由该参数控制
    大集群任务可以考虑适当增加该参数(超过事件队列上限的消息会被删除),但也会增加driver的内存消耗

本地化

  • spark.locality.wait 本地化的降级间隔,默认3S
    一般情况下这都是需要下调的.每一个级别降低间隔3S,总体就是十几秒,这对任务的延时影响比较大 如果本地化要求没这么高的话,一般1S足矣

黑名单

  • spark.backlist.enabled 是否开启黑名单机制,默认false
  • spark.backlist.timeout 黑名单恢复时间,默认1小时(实验参数)
  • spark.blacklist.task.maxTaskAttemptsPerExecutor executor被加入黑名单之前,Task在该Executor可以被重试多少次.默认1次
  • spark.blacklist.task.maxTaskAttemptsPerNode node被加入黑名单之前,Task在该Node可以被重试多少次,默认2次
  • spark.blacklist.stage.maxFailedTasksPerExecutor executor被加入黑名单之前,一个Stage中有多少个Task在该Executor失败,默认2个
  • spark.blacklist.stage.maxFailedExecutorsPerNodenode被加入黑名单之前,一个Stage有多少个Task在该Node失败,默认2个
  • spark.blacklist.application.maxFailedTasksPerExecutor 在成功的任务集中,必须有多少个不同的Task失败才能将executor加入黑名单,默认2个
  • spark.blacklist.application.maxFailedExecutorsPerNode 在成功的任务集中,必须有多少个不同的Task失败才能将Node加入黑名单,默认2个
  • spark.blacklist.killBlacklistedExecutors 如果一个executor或node被加入黑名单,是否杀死该executor或node(在其上的所有Task也会被杀死).默认false
  • spark.blacklist.application.fetchFailure.enabled 是否将数据拉取请求也计入黑名单考察范围.(如果是动态资源调度的外部Shuffle,则会以node为单位),默认false

推测执行

  • spark.speculation 是否开启推测执行,默认false
  • spark.speculation.interval 推测执行的检测频率,默认100ms
    可以适当调高(3秒5秒都可以),推测执行检测没必要这么高频
  • spark.speculation.multiplier 任务执行时间超过平均执行X倍即视为慢任务,默认1.5
  • spark.speculation.quantile 任务执行完毕占比X之后开始检测推测执行.默认0.75
    这个可以调高一点(0.95或者更高都可以),慢任务推测应该是少量情况,检测设的太低会导致大量的任务都被推测执行了

动态资源调度

  • spark.dynamicAllocation.enabled 是否启用动态资源调度.默认false (必须同时开启spark.shuffle.service.enabled)
  • spark.dynamicAllocation.executorIdleTimeout 在动态资源调度中,如果executor保持空闲超过该时间,则该executor会被杀死回收,默认60S
  • spark.dynamicAllocation.cachedExecutorIdleTimeout 在动态资源调度中,如果executor作为数据存储点空闲超过多少时间则会被杀死回收,默认无限,也就是不回收
  • spark.dynamicAllocation.initialExecutors 动态资源调度中,初始executor数,默认等于spark.dynamicAllocation.minExecutors
  • spark.dynamicAllocation.minExecutors 动态资源调度中维持最少executor数,默认0
  • spark.dynamicAllocation.maxExecutors 动态资源调度中维持最多executor数,默认无限
  • spark.dynamicAllocation.executorAllocationRatio 动态资源调度中默认会按照任务本身的并行度*比率去申请executor.默认为1
    这个比率的结果最终将被 minExecutorsmaxExecutors 覆盖
  • spark.dynamicAllocation.schedulerBacklogTimeout 动态资源调度中,如果Task积压超过该时间则会尝试申请新的executor,默认1S

网络

  • spark.netwrok.timeout 执行的默认最大超时时间,最大120S 这个默认值本身没有意义,但会作为以下配置的默认值
    spark.core.connection.ack.wait.timeout 连接在超时和放弃之前等待ACK响应时间
    spark.storage.blockManagerSlaveTimeoutMsBlockManagerSlaver超时时间
    spark.shuffle.io.connectionTimeout
    spark.rpc.askTimeoutRPC请求超时时间
    spark.rpc.lookupTimeout
  • spark.rpc.numRetriesRPC请求的重试此时,默认3次,可以适当调高
  • spark.rpc.retry.waitPRC请求的创世间隔,默认3S

Shuffle

  • spark.shuffle.file.buffer 文件溢写缓冲大小 默认32K
    如果内存充足可以适当调大,这可以减少Map端文件溢写次数
  • spark.shuffle.io.maxReties 默认3次
    这是拉取请求的错误重试次数,一般可以适当调大(30次)
    因为网络波动或目标executor处于GC过程导致不可访问,shuffle请求很可能不成功,调大可以提高稳定性
  • spark.shuffle.io.retryWait 默认5S
    这是拉取请求的最大超时时间,一般可以适当调大(30S),原因同上
  • spark.reducer.maxSizeInFlight 默认48M
    拉取请求的缓冲块(一次拉取多少大小的数据),如果内存充足可以适当调大,可以减少拉取次数
  • spark.reducer.maxBlocksInFlightPerAddress 默认Int.Max
    每个主机(Address)可以被多少个Reducer主机(Address)拉取数据,在大集群中可以适当调低来降低某个热点executor的压力
  • spark.reducer.maxReqsInFlight 默认 Int.Max
    每个主机(Address)可以接收多少个Reducer拉取请求,大集群中可以适当调低来降低某个热点executor的压力

内存管理

  • spark.memory.fraction Spark内存,默认0.6
  • Spark1.6 spark.shuffle.io.memoryFraction 默认0.2 适当调大
    Spark2.X spark.memory,storageFracation 默认0.5,适当压低
    Spark2.X是动态内存管理,Shuffle是占据执行内存的一部分,通过压低存储内存可以调高执行内存

Streaming

  • spark.streaming.backpressure.enabled 是否启用背压,默认false
    建议开启,特别是在Streaming+Kafka的时候,配合单分区最大拉取数可以很好的控制消费速率
  • spark.streaming.blockInterval 块拆分间隔,默认200MS
  • spark.streaming.receiver.writeAheadLog.enable 是否启用Receiver预写日志,默认false
  • spark.streaming.unpersist 是否自动清理RDD缓存,默认为true
  • spark.streaming.stopGracefullyOnShutdown Streaming 是否优雅关闭,默认false
  • spark.streaming.kafka.maxRatePerPartition Kafka的Direct模式下单分区最大拉取消息数,默认无限
    建议显式设置,Streaming控制消费速率是非常重要的
  • spark.streaming.kafka.maxRetries Kafka的Direct模式下拉取的重试次数,默认为1
    (这里是指重试次数1次,即总共最大会请求两次)

环境

  • spark.driver.extraJavaOptions 传递给driver端的JVM配置参数
    不应该使用该参数调整堆大小(-Xmx)
    Client模式下,不能通过SparkConf传递该参数(因为事实上driver此时已经启动了),而必须使用命名参数driver-java-options
  • spark.executor.extraJavaOptions 传递给executor的JVM配置参数
    同driver设置,不应该用该参数调整堆大小
--conf spark.executor.extraJavaOptions="-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:gc.log"

参数总结

spark-submit \
--name AppName \
#--properties-file spark-submit-common.conf  \# 通用提交参数包装,

###############
# 资源和调度相关
###############
--master yarn \
--queue ${queueName} \
--deploy-mode ${cluster} \
--class  org.apache.spark.examples.SparkPi \
--driver-cores ${1} \ 
--driver-memory ${1g} \
--num-executors ${2} \ 
--executor-core ${1} \
--executor-memory ${1g} \

############# 
# 常用Conf配置
#############
--conf spark.default.parallelism=${10} \  # 默认并行度,必调参数,但取值依赖实际情况
--conf spark.memory.storageFraction=${0.3} \ # 压低存储内存,必调参数,但取值依赖实际情况
--conf spark.locality.wait=1s \ # 本地化的降级等待时间压低为1S
--conf spark.netwrok.timeout=300s \ # 提高网络超时时间来提升总稳定性,默认120S,会影响包括连接请求ACK,BlockSlayer超时,shuffle的IO连接超时,RPC连接超时等等  
--conf spark.shuffle.io.maxRetries=30 \ # 提高Shuffle拉取请求重试次数,默认3次
--conf spark.shuffle.io.retryWait=30S \ # 提高Shuffle拉取的最大超时,默认3S 
--conf spark.io.compression.codec=snappy \ # 使用Snappy快压,默认lz4,会影响shuffle-map-output,shuffle.spill,boardcast-compress,rdd-compress等等
--conf spark.driver.memoryOverhead=2g \ # 堆外内存,提高到2G

# Streaming相关
--conf spark.streaming.backpressure.enable=true \ # 必开背压(默认false)
--conf spark.streaming.kafka.maxRatePerPatition=${5000} \ # 必开Kafka限速(每个Kafka分区)
--conf spark.streaming.stopGracefullyOnShutdown=true \ # 必开优雅关闭

# 黑名单相关(可选)
--conf spark.blacklist.enabled=true \ # 开启黑名单 
--conf spark.blacklist.timeout=${15m} \ # 黑名单恢复,默认1小时,小集群缩短到15分钟
--conf spark.blacklist.stage.maxFailedTasksPerExecutor=${5} \ # executor的Stage级别Task失败,默认2次,这个小集群需要调大
--conf spark.blacklist.stage.maxFailedExecutorsPerNode=${5} \ # node的Stage级别Task失败,默认2次,这个小集群需要调大

# 推测执行相关(可选)
--conf spark.speculation=true \ # 开启推测执行(可选)
--conf spark.speculation.interval=3s \ # 监测间隔(提高为3S,默认200Ms)
--conf spark,speculation.quantile=${0.95} \ # 完成百分比监测(默认0.75容易产生太多的推测)

# 动态资源调度相关(可选)  
--conf spark.shuffle.service.enabled=true \ # 必开Shuffle外部服务
--conf spark.dynamicAllocation.enabled=true \ # 开启动态资源调度
--conf spark.dynamicAllocation.executorIdleTimeout=300S \ # 提高executor空闲回收时间来避免大量产生资源申请,默认60S
--conf spark.dynamicAllocation.minExecutors=${1} \ # 资源最低保留(根据任务和业务),默认0
--conf spark.dynamicAllocation.maxExecutors=${10} \ # 资源最多申请(根据任务和业务),默认无限
--conf spark.dynamicAllocation.schedulerBacklogTimeout=${1s} \ # Task积压后申请资源,根据业务上可以忍耐的任务延时而定,默认1S

# 在应用级别的变更日志级别方式(调试专用)
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties"

############# 
# GC调优
#############
# G1(常规使用,深入调优必须依赖GC日志,这个专章描述) 
# JVM:关闭应用级GC,固定堆大小,打印GC日志,设定元空间上限 G1:常规启用,使用最大Region  
--conf "spark.executor.extraJavaOptions=-XX:-DisableExplicitGC -Xms${?} -Xmx${?}  -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:${app}-gc.log -XX:MaxMetaSpaceSize=1024M -XX:+UseG1GC -XX:G1HeapRegionSize=32M" 

# CMS(常规使用,深入调优必须依赖GC日志,这个专章描述)
# JVM:关闭应用级GC,固定堆大小,打印GC日志,设定元空间上限 CMS:常规启用,限定5次清理配一次整理  
--conf "spark.executor.extraJavaOptions=-XX:-DisableExplicitGC -Xms${?} -Xmx${?} -XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -XX:+PrintGCApplicationConcurrentTime -Xloggc:${app}-gc.log -XX:MaxMetaSpaceSize=1024M -XX:+UseConcMarkSweep -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCBeforeCompaction=5" 

#############################
# 最后的jar以及传入的main函数参数
#############################
/mypath/app.jar \
100(my_main_args)