spark.read.load
spark.read.load 是加载外部数据源的入口,这里的外部数据源源码解析,就从这里开始
首先 完整源码如下
def load(paths: String*): DataFrame = {
...
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = sparkSession.sessionState.conf)).asJava)
val reader = (ds, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)
case (ds: ReadSupport, None) =>
ds.createReader(options)
case (ds: ReadSupportWithSchema, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $ds.")
case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
if (reader.readSchema() != schema) {
throw new AnalysisException(s"$ds does not allow user-specified schemas.")
}
reader
case _ => null // fall back to v1
}
if (reader == null) {
loadV1Source(paths: _*)
} else {
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
}
} else {
loadV1Source(paths: _*)
}
}
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
....
}
else {
loadV1Source(paths: _*)
}
load的核心之处就是这个 if else
根据数据源本身的继承类型,将会区分为V1,V2两个部分.这两种数据源处理有非常大的不同
DataSourceV2
如果是数据源是DataSourceV2的,在load时就构造出Reader
Reader又分为两大类 ReadSupport 和 ReadSupportWithSchema,分别对应数据源本身是不是自带Schema.比如Parquet就是自身自带Schema的,而Text就是不带Schema的
Reader之外,还要处理spark.read.schema()传入的用户自定义的schema信息
数据源是否自带Schema 和 用户是否传入Schema ,就是2*2=4种分支情况 ,这就是如下match模式的来历
val reader = (ds, userSpecifiedSchema) match {
//如果是数据源自带schema,用户又传入了schema,就以用户为准.
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)
//如果数据源不带schema,用户又没传,就以默认不带schema方式(毕竟数据源本身不带)
case (ds: ReadSupport, None) =>
ds.createReader(options)
//如果是数据源自带schema,用户也没传,就以数据源为准
case (ds: ReadSupportWithSchema, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $ds.")
//如果数据源不带但是用户传入了schema,这里处理就比较特殊了
//首先会以不带schema的形式去读取数据
//然后根据readSchema(物理执行计划中的最终输出)来比较用户传入的schema
//如果不匹配,不是用户多传入最终输出中不存在的列还是少传了列,都是不能接收的,所以异常抛出
case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
if (reader.readSchema() != schema) {
throw new AnalysisException(s"$ds does not allow user-specified schemas.")
}
reader
//异常或特殊情况,会以V1版本执行
case _ => null // fall back to v1
if (reader == null) {
loadV1Source(paths: _*)
} else {
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
}
}
这里可以看出Spark其实是在尽可能的使用V2去读取,失败或异常就会再用V1去尝试
在使用V2时,出现两个比较重要的东西
DataSourceV2Relation
完整源码如下
case class DataSourceV2Relation(
output: Seq[AttributeReference],
reader: DataSourceReader)
extends LeafNode
with MultiInstanceRelation
with DataSourceReaderHolder {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
}
首先注意看类定义
- 要求构造传入 output 和 reader
这就是负责读取的实例和输出列.
output 其实是reader.readSchema(),就是reader的物理执行计划中的输出,也就是最终输出 - 继承自LeafNode
这说明 DataSourceV2Relation 本质是一个 LogicalPlan (逻辑执行计划) - MultiInstanceRelation
这代表这个逻辑 - DataSourceReaderHolder
自定义 equals/hashCode 的部分,这个不用管它
关注下这里
//获取读取的统计信息
//这个统计信息就是指 数据大小
//如果能读取到数据大小,方便之后的优化.比如如果数据足够小就自动广播等等之类的
//注意如果此时没有读取数据大小,就会取默认的 defaultSizeInBytes
//defaultSizeInBytes默认接近于一个Long大小,这就是说如果读取不到数据大小,就默认认为非常非常非常大
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
Dataset.ofRows
DataSourceV2Relation 是描述如果读取V2数据源的
Dataset.ofRows 就是执行这个逻辑计划(DataSourceV2Relation)
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
// DataSourceV2Relation 包装成 Relation类逻辑计划的执行器
// 这里代表了数据集都是惰性的.
// 本质上,数据集是一个描述产生数据所需的逻辑执行计划
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
QueryExecution
Relation类逻辑计划的执行器