集群环境下的变量
Spark作为一个分布式计算框架,所有的操作都会以分布式的形式进行执行
这体现 在执行时会由 driver端以数据副本的形式 交给 executor端去执行
这带来的问题是如果变更变量,实际上变更是变量的副本,也就是这种变更是不会传递到 driver和其它executor 的
针对这种情况,Spark提供了两种共享变量 广播变量(broadcast) 和 累加(accumulators)
广播变量(broadcast)
概述
广播变量允许将一个变量的副本发送到每个executor.这带来两个好处
- 以executor节点会单位重用变量而不是作业.这样在同一个executor的不同作业,都可以反复使用这个变量
- 如果变量发生变更,可以以重新广播的形式将变量推送到所有节点.
注意:
一般情况下,广播变量是不可变的.
但如果需要变更广播变量,变更变量本身依然是没有意义的,必须重新广播
使用场景
广播变量本身是会产生序列化传输的.所以只有在数据本身是可能会在多个Stage或多个Job中反复使用,使用广播变量才会变的有意义
用法
// SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
val broadcastVar = sc.broadcast(Array(1, 2, 3))
val v = broadcastVar.value
累加器(accumulators)
概述
累加器变量仅支持累加(合并)操作,并且累加器是可以在UI中可见的
因此累加器可以在并行计算时执行一些特殊合并计算
注意,累加器依然遵循RDD的Lazy原则,即:
- 累加器更新操作依然是在action中,并且在每个Job中,只会执行一次(如果Job失败重启,累加器更新不会执行)
- 在transform中,累加器依然保持Lazy执行.(如果transform被重新执行了,则累加器会重复执行)
用法
数值类型累加器(内置)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value)
自定义类型累加器
自定义累加器,需要继承实现 AccumulatorV2
//两个泛型参数->累加的元素类型和结果类型可以不同的
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = { myVector.reset() }
def add(v: MyVector): Unit = { myVector.add(v) }
...
}
// 创建一个自定义的累加器类型:
val myVectorAcc = new VectorAccumulatorV2
//将这个触发器注册到SparkContext:
sc.register(myVectorAcc, "MyVectorAcc1")