NightPxy 个人技术博客

Spark-EventBus

Posted on By NightPxy

Spark-EventBus

概述

事件总线(EventBus )是一种架构上的设计模式.它的优点在于 解耦复杂组件(系统)之间通讯
在技术上可以简单落地为观察者模式,分布式场景下一般会引入分布式消息队列(消息总线)

Spark中也使用到该技术(Spark的内部组件非常多,光Core就有Job,Stage,Task,Block等等)
Spark中的落地观察者模式(用于内部组件间通讯,不涉及第三方系统交互)

ListenerBus

ListenerBus 是Spark事件总线模式的高层抽象
ListenerBus封装的核心是监听者的注册与广播,与具体业务无关
ListenerBus在Spark有三个应用方向

  • Core的组件交互SparkListenerBus
  • SQL的ExternalCatalog(InMemoryCatalog,HiveExternalCatalog)
  • Streaming应用(StreamingListenerBus,StreamingQueryListenerBus)

核心源码如下

//要求子类实现泛型 L:监听者 E:事件
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
  // 监听者容器,CopyOnWrite的线程安全
  // 容器内存放的是[监听者,时间记录器],广播时每一个监听者处理消息记录处理时间方便Metric
  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
  
  //注册
  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }
  
  //取消
  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }
  
  //广播
  def postToAll(event: E): Unit = {
      ...
  } 
}

SparkListenerBus

SparkListenerBus 它负责定义Core组件的交互过程,比如job提交&完成,stage 提交&完成,Taskt提交&完成等等
它也是 ListenerBus 的一个实现子类

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    // 模式匹配到具体的事件处理Handler  
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case executorBlacklisted: SparkListenerExecutorBlacklisted =>
        listener.onExecutorBlacklisted(executorBlacklisted)
      case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
        listener.onExecutorUnblacklisted(executorUnblacklisted)
      case nodeBlacklisted: SparkListenerNodeBlacklisted =>
        listener.onNodeBlacklisted(nodeBlacklisted)
      case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
        listener.onNodeUnblacklisted(nodeUnblacklisted)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
      case _ => listener.onOtherEvent(event)
    }
  }

}

AsyncEventQueue

AsyncEventQueue 是SparkListenerBus的具体实现类,顾名思义,异步的事件总线实现
异步意思是,事件是异步处理的,事件首先提交到异步队列中暂存,再通过子线程循环拉取处理

private class AsyncEventQueue(
    val name: String,
    conf: SparkConf,
    metrics: LiveListenerBusMetrics,
    bus: LiveListenerBus)
  extends SparkListenerBus
  with Logging {
  
  //异步中的事件暂存队列,上限默认10000
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
 
  //事件分发线程
  private val dispatchThread = new Thread(s"spark-listener-group-$name")   {
    setDaemon(true)
    //
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }
  //事件分发处理
  private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    // 事件队列提取事件
    var next: SparkListenerEvent = eventQueue.take()
    // 如果事件不是空事件标记(AsyncEventQueue.POISON_PILL)就取出然后广播给所有监听者
    while (next != POISON_PILL) {
      val ctx = processingTime.time()
      try {
        super.postToAll(next)
      } finally {
        ctx.stop()
      }
      eventCount.decrementAndGet()
      next = eventQueue.take()
    }
    eventCount.decrementAndGet()
  }
}

ReplayListenerBus

LiveListenerBus

LiveListenerBus 其实并不是事件总线,而是一个包装器概念

private[spark] class LiveListenerBus(conf: SparkConf) {
  //事件子线分发(构造专用的彼此独立的多个AsyncEventQueue线路)
  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
  
  //核心在注册监听者时同时声明监听队列  
  //  如果该队列已存在就注册到该监听队列  
  //  如果该队列不存在就创建队列然后注册到该监听队列  
  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }
  
  //事件提交
  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }

    metrics.numEventsPosted.inc()

    // 如果事件缓冲队列为空,就不暂存直接提交监听队列.
    if (queuedEvents == null) {
      postToQueues(event)
      return
    }

    // 如果事件缓冲队列不为空,并且没有启动的,需要同步锁将事件提交缓冲队列
    synchronized {
      if (!started.get()) {
        queuedEvents += event
        return
      }
    }

    // 如果已经启动了,则还是直接提交监听队列
    postToQueues(event)
  }

  //提交方法非常简单,所有监听队列广播事件  
  private def postToQueues(event: SparkListenerEvent): Unit = {
    val it = queues.iterator()
    while (it.hasNext()) {
      it.next().post(event)
    }
  }
}

ExternalCatalog

ExternalCatalog

//元数据操作定义,都是基于事件总线完成的
abstract class ExternalCatalog
  extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
  import CatalogTypes.TablePartitionSpec
  
  protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
  
  final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    postToAll(DropDatabasePreEvent(db))
    doDropDatabase(db, ignoreIfNotExists, cascade)
    postToAll(DropDatabaseEvent(db))
  }
  ....
}

InMemoryCatalog

//内存版的Catalog
class InMemoryCatalog(
    conf: SparkConf = new SparkConf,
    hadoopConfig: Configuration = new Configuration)
  extends ExternalCatalog {

  import CatalogTypes.TablePartitionSpec

  private class TableDesc(var table: CatalogTable) {
    val partitions = new mutable.HashMap[TablePartitionSpec, CatalogTablePartition]
  }

  private class DatabaseDesc(var db: CatalogDatabase) {
    val tables = new mutable.HashMap[String, TableDesc]
    val functions = new mutable.HashMap[String, CatalogFunction]
  }

  // Database name -> description
  private val catalog = new scala.collection.mutable.HashMap[String, DatabaseDesc]
  ....

HiveExternalCatalog

//Hive的Catalog
private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)
  extends ExternalCatalog with Logging {

  import CatalogTypes.TablePartitionSpec
  import HiveExternalCatalog._
  import CatalogTableType._

  /**
   * A Hive client used to interact with the metastore.
   */
  lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }
  ...
}