NightPxy 个人技术博客

RDD-变量

Posted on By NightPxy

集群环境下的变量

Spark作为一个分布式计算框架,所有的操作都会以分布式的形式进行执行
这体现 在执行时会由 driver端以数据副本的形式 交给 executor端去执行
这带来的问题是如果变更变量,实际上变更是变量的副本,也就是这种变更是不会传递到 driver和其它executor 的
针对这种情况,Spark提供了两种共享变量 广播变量(broadcast)累加(accumulators)

广播变量(broadcast)

概述

广播变量允许将一个变量的副本发送到每个executor.这带来两个好处

  • 以executor节点会单位重用变量而不是作业.这样在同一个executor的不同作业,都可以反复使用这个变量
  • 如果变量发生变更,可以以重新广播的形式将变量推送到所有节点.

注意:
一般情况下,广播变量是不可变的.
但如果需要变更广播变量,变更变量本身依然是没有意义的,必须重新广播

使用场景

广播变量本身是会产生序列化传输的.所以只有在数据本身是可能会在多个Stage或多个Job中反复使用,使用广播变量才会变的有意义

用法

// SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
val broadcastVar = sc.broadcast(Array(1, 2, 3))
val v = broadcastVar.value

累加器(accumulators)

概述

累加器变量仅支持累加(合并)操作,并且累加器是可以在UI中可见的
因此累加器可以在并行计算时执行一些特殊合并计算

注意,累加器依然遵循RDD的Lazy原则,即:

  • 累加器更新操作依然是在action中,并且在每个Job中,只会执行一次(如果Job失败重启,累加器更新不会执行)
  • 在transform中,累加器依然保持Lazy执行.(如果transform被重新执行了,则累加器会重复执行)

用法

数值类型累加器(内置)

val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value)

自定义类型累加器

自定义累加器,需要继承实现 AccumulatorV2

//两个泛型参数->累加的元素类型和结果类型可以不同的
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

private val myVector: MyVector = MyVector.createZeroVector

def reset(): Unit = { myVector.reset() }

def add(v: MyVector): Unit = { myVector.add(v) }
...
}

// 创建一个自定义的累加器类型:
val myVectorAcc = new VectorAccumulatorV2
//将这个触发器注册到SparkContext:
sc.register(myVectorAcc, "MyVectorAcc1")