Spark-EventBus
概述
事件总线(EventBus )是一种架构上的设计模式.它的优点在于 解耦复杂组件(系统)之间通讯
在技术上可以简单落地为观察者模式,分布式场景下一般会引入分布式消息队列(消息总线)
Spark中也使用到该技术(Spark的内部组件非常多,光Core就有Job
,Stage
,Task
,Block
等等)
Spark中的落地观察者模式(用于内部组件间通讯,不涉及第三方系统交互)
ListenerBus
ListenerBus 是Spark事件总线模式的高层抽象
ListenerBus封装的核心是监听者的注册与广播,与具体业务无关
ListenerBus在Spark有三个应用方向
- Core的组件交互
SparkListenerBus
- SQL的ExternalCatalog(
InMemoryCatalog
,HiveExternalCatalog
) - Streaming应用(
StreamingListenerBus
,StreamingQueryListenerBus
)
核心源码如下
//要求子类实现泛型 L:监听者 E:事件
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// 监听者容器,CopyOnWrite的线程安全
// 容器内存放的是[监听者,时间记录器],广播时每一个监听者处理消息记录处理时间方便Metric
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
//注册
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
//取消
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
//广播
def postToAll(event: E): Unit = {
...
}
}
SparkListenerBus
SparkListenerBus 它负责定义Core组件的交互过程,比如job提交&完成
,stage 提交&完成
,Taskt提交&完成
等等
它也是 ListenerBus 的一个实现子类
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
// 模式匹配到具体的事件处理Handler
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
}
AsyncEventQueue
AsyncEventQueue 是SparkListenerBus的具体实现类,顾名思义,异步的事件总线实现
异步意思是,事件是异步处理的,事件首先提交到异步队列中暂存,再通过子线程循环拉取处理
private class AsyncEventQueue(
val name: String,
conf: SparkConf,
metrics: LiveListenerBusMetrics,
bus: LiveListenerBus)
extends SparkListenerBus
with Logging {
//异步中的事件暂存队列,上限默认10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
//事件分发线程
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
//
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
//事件分发处理
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
// 事件队列提取事件
var next: SparkListenerEvent = eventQueue.take()
// 如果事件不是空事件标记(AsyncEventQueue.POISON_PILL)就取出然后广播给所有监听者
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
}
}
ReplayListenerBus
LiveListenerBus
LiveListenerBus 其实并不是事件总线,而是一个包装器概念
private[spark] class LiveListenerBus(conf: SparkConf) {
//事件子线分发(构造专用的彼此独立的多个AsyncEventQueue线路)
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
//核心在注册监听者时同时声明监听队列
// 如果该队列已存在就注册到该监听队列
// 如果该队列不存在就创建队列然后注册到该监听队列
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
//事件提交
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
// 如果事件缓冲队列为空,就不暂存直接提交监听队列.
if (queuedEvents == null) {
postToQueues(event)
return
}
// 如果事件缓冲队列不为空,并且没有启动的,需要同步锁将事件提交缓冲队列
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
// 如果已经启动了,则还是直接提交监听队列
postToQueues(event)
}
//提交方法非常简单,所有监听队列广播事件
private def postToQueues(event: SparkListenerEvent): Unit = {
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
}
}
ExternalCatalog
ExternalCatalog
//元数据操作定义,都是基于事件总线完成的
abstract class ExternalCatalog
extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
import CatalogTypes.TablePartitionSpec
protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
postToAll(DropDatabasePreEvent(db))
doDropDatabase(db, ignoreIfNotExists, cascade)
postToAll(DropDatabaseEvent(db))
}
....
}
InMemoryCatalog
//内存版的Catalog
class InMemoryCatalog(
conf: SparkConf = new SparkConf,
hadoopConfig: Configuration = new Configuration)
extends ExternalCatalog {
import CatalogTypes.TablePartitionSpec
private class TableDesc(var table: CatalogTable) {
val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
}
private class DatabaseDesc(var db: CatalogDatabase) {
val tables = new mutable.HashMap[String, TableDesc]
val functions = new mutable.HashMap[String, CatalogFunction]
}
// Database name -> description
private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
....
HiveExternalCatalog
//Hive的Catalog
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
extends ExternalCatalog with Logging {
import CatalogTypes.TablePartitionSpec
import HiveExternalCatalog._
import CatalogTableType._
/**
* A Hive client used to interact with the metastore.
*/
lazy val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}
...
}