NightPxy 个人技术博客

SparkSql-外部数据源-源码解析

Posted on By NightPxy

spark.read.load

spark.read.load 是加载外部数据源的入口,这里的外部数据源源码解析,就从这里开始
首先 完整源码如下

def load(paths: String*): DataFrame = {
    ...
    val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
      val ds = cls.newInstance()
      val options = new DataSourceOptions((extraOptions ++
        DataSourceV2Utils.extractSessionConfigs(
          ds = ds.asInstanceOf[DataSourceV2],
          conf = sparkSession.sessionState.conf)).asJava)
          
      val reader = (ds, userSpecifiedSchema) match {
        case (ds: ReadSupportWithSchema, Some(schema)) =>
          ds.createReader(schema, options)

        case (ds: ReadSupport, None) =>
          ds.createReader(options)

        case (ds: ReadSupportWithSchema, None) =>
          throw new AnalysisException(s"A schema needs to be specified when using $ds.")

        case (ds: ReadSupport, Some(schema)) =>
          val reader = ds.createReader(options)
          if (reader.readSchema() != schema) {
            throw new AnalysisException(s"$ds does not allow user-specified schemas.")
          }
          reader

        case _ => null // fall back to v1
      }

      if (reader == null) {
        loadV1Source(paths: _*)
      } else {
        Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
      }
    } else {
      loadV1Source(paths: _*)
    }
  }
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
  ....
}
else {
  loadV1Source(paths: _*)
}

load的核心之处就是这个 if else
根据数据源本身的继承类型,将会区分为V1,V2两个部分.这两种数据源处理有非常大的不同

DataSourceV2

如果是数据源是DataSourceV2的,在load时就构造出Reader
Reader又分为两大类 ReadSupport 和 ReadSupportWithSchema,分别对应数据源本身是不是自带Schema.比如Parquet就是自身自带Schema的,而Text就是不带Schema的

Reader之外,还要处理spark.read.schema()传入的用户自定义的schema信息
数据源是否自带Schema 和 用户是否传入Schema ,就是2*2=4种分支情况 ,这就是如下match模式的来历

val reader = (ds, userSpecifiedSchema) match {
    //如果是数据源自带schema,用户又传入了schema,就以用户为准.
    case (ds: ReadSupportWithSchema, Some(schema)) =>
    ds.createReader(schema, options)

    //如果数据源不带schema,用户又没传,就以默认不带schema方式(毕竟数据源本身不带)
    case (ds: ReadSupport, None) =>
    ds.createReader(options)

    //如果是数据源自带schema,用户也没传,就以数据源为准
    case (ds: ReadSupportWithSchema, None) =>
    throw new AnalysisException(s"A schema needs to be specified when using $ds.")

    //如果数据源不带但是用户传入了schema,这里处理就比较特殊了
    //首先会以不带schema的形式去读取数据
    //然后根据readSchema(物理执行计划中的最终输出)来比较用户传入的schema
    //如果不匹配,不是用户多传入最终输出中不存在的列还是少传了列,都是不能接收的,所以异常抛出
    case (ds: ReadSupport, Some(schema)) =>
    val reader = ds.createReader(options)
    if (reader.readSchema() != schema) {
    throw new AnalysisException(s"$ds does not allow user-specified schemas.")
    }
    reader
    
    //异常或特殊情况,会以V1版本执行
    case _ => null // fall back to v1
    if (reader == null) {
      loadV1Source(paths: _*)
    } else {
      Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
    }
}

这里可以看出Spark其实是在尽可能的使用V2去读取,失败或异常就会再用V1去尝试
在使用V2时,出现两个比较重要的东西

DataSourceV2Relation

完整源码如下

case class DataSourceV2Relation(
    output: Seq[AttributeReference],
    reader: DataSourceReader)
  extends LeafNode 
  with MultiInstanceRelation 
  with DataSourceReaderHolder {

  override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]

  override def computeStats(): Statistics = reader match {
    case r: SupportsReportStatistics =>
      Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
    case _ =>
      Statistics(sizeInBytes = conf.defaultSizeInBytes)
  }

  override def newInstance(): DataSourceV2Relation = {
    copy(output = output.map(_.newInstance()))
  }
}

首先注意看类定义

  • 要求构造传入 output 和 reader
    这就是负责读取的实例和输出列.
    output 其实是reader.readSchema(),就是reader的物理执行计划中的输出,也就是最终输出
  • 继承自LeafNode
    这说明 DataSourceV2Relation 本质是一个 LogicalPlan (逻辑执行计划)
  • MultiInstanceRelation
    这代表这个逻辑
  • DataSourceReaderHolder
    自定义 equals/hashCode 的部分,这个不用管它

关注下这里

//获取读取的统计信息 
//这个统计信息就是指 数据大小
//如果能读取到数据大小,方便之后的优化.比如如果数据足够小就自动广播等等之类的
//注意如果此时没有读取数据大小,就会取默认的 defaultSizeInBytes 
//defaultSizeInBytes默认接近于一个Long大小,这就是说如果读取不到数据大小,就默认认为非常非常非常大
override def computeStats(): Statistics = reader match {
    case r: SupportsReportStatistics =>
       Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
    case _ =>
      Statistics(sizeInBytes = conf.defaultSizeInBytes)
  }

Dataset.ofRows

DataSourceV2Relation 是描述如果读取V2数据源的
Dataset.ofRows 就是执行这个逻辑计划(DataSourceV2Relation)


def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    // DataSourceV2Relation 包装成 Relation类逻辑计划的执行器
    // 这里代表了数据集都是惰性的.
    // 本质上,数据集是一个描述产生数据所需的逻辑执行计划  
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}

QueryExecution

Relation类逻辑计划的执行器