概述
Spark-SQL是通过DataFrame接口对各种数据源进行操作
- 各种数据源的加载
- 数据的转换
- 数据源的保存
- 注册临时视图,来允许SQL的形式直接对临时视图进行操作等
数据源的加载&保存
数据源加载
Spark-SQL的默认数据源为parquet(spark.sql.sources.default设置),一些加载例子如下:
//默认方式为parquet
spark.read.load("D:\\data\\users.parquet").show()
//加载非parquet的数据源,需要设置format
//对内置数据源,可以使用短名称.对外部数据源,必须使用全名称(包名+类名)
//内置数据源有 parquet,txt,json,orc,csv,libsvm
spark.read.format("csv").load("D:\\data\\people.csv").show()
//以SQL形式读取文件
spark.sql("select * from json.`D:\\data\\sku.json`").show()
数据源保存
保存模式
Scala/Java | 描述 |
---|---|
SaveMode.ErrorIfExists(默认) | 如果存储源已存在,将抛出错误 |
SaveMode.Append | 如果存储源已存在,数据将以追加的形式写入 |
SaveMode.Overwrite | 如果存储源已存在,数据将以覆盖的形式写入 |
SaveMode.Ignore | 如果存储源已存在,数据不会写入(不会抛错,即原数据不变,新数据忽略) |
注意:
数据保存并非是一个原子性操作.这一点务必注意
比如Append,可能输出过程中断而导致部分数据进入.或者Overwrite,本质是一个删除后新建的操作,这里是有删除后新建失败,导致有新数据无法写入老数据丢失的可能性的
存储源
Spark 支持两种类型的存储源 : 基于文件的存储源(file-based) 和 基于表的存储源(table-based)
**基于文件的存储源(file-based) **
数据保存的格式可以变化,比如将一个csv保存为一个txt的例子
spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath")
基于Hive表的存储源(hive-table-based)
将数据保存到Hive的表中
- 元数据
数据保存到Hive的表中,将同时自动保存schema到Hive的MetaStore里 - 内/外表(数据保存时,根据是否指定path来区分内外表)
当手动设置为其它路径时成为外部表(表删除只删除元数据不删除表数据)
不设置path则成为内部表(表删除将同时删除元数据和表数据)
注意:从Spark2.0之后, CREATE TABLE 但指定path依然会成为一个外部表.即唯一标准是是否指定path - 表分区
如果表被设置为外部表,默认不会收集分区信息,需要手动同步(msck / add partition)
内置数据源
parquet
概述
Parquet是一种列式存储格式,无法直接阅读.但有非常好的压缩消耗和压缩比.
Parquet是Spark默认和推荐使用的数据格式.在很多方面,Spark都对Parquet有最大支持
在Spark中,出于兼容性的考虑,所有的Columns都将自动转换为可空类型
读写demo
spark.read.format("csv").load("InputPath").write.format("parquet").save("OutputPath")
分区发现(Partition Discovery)
在Hive之类中,分区是以目录的形式存在.分区键本身是目录的一部分.但分区信息是不会自动的被Hive发现
而在Parquet中,Spark可以自动的发现分区和推断数据类型.推断的依据就是目录
比如:
一个存放订单信息的目录,以类似这样的格式 order/time=xxxx-MM-dd}/area=xx/xxxx.parquet
传入路径(/order),读取将自动增加两列time和area作为分区列,并自动在实际使用中应用分区信息
注意:
- 对Parquet的分区发现字段现仅支持数字型和字符串型
- spark.sql.sources.partitionColumnTypeInference.enabled(默认为true)设置
可以关闭分区发现的数据类型推断
元数据合并(schema merging)
实际工作中,parquet数据文件很可能是由一个简单的逐渐变得复杂(随着业务的深入不断增加字段)
但因为parquet是自存储的,所以需要有一种途径来保证对历史数据的兼容.
这就是模式演进(schema evolution),而实现的手段就是元数据合并
注意:
模式合并,是一个昂贵的操作,在大多数的情况,都不会也不应该使用它,所以模式是默认关闭的
有以下两种方式可以打开模式合并
- 临时
在Option中设置mergeSchema=true
spark.read.option(“mergeSchema”, “true”) - 永久
配置文件配置 spark.sql.parquet.mergeSchema 设置为 true
parquet 与 Hive MetaStore
在Spark读取和写入Hive中的Parquet存储格式表时,将使用Spark自己的Parquet Support 而不是使用Hive Serde (spark.sql.hive.convertMetastoreParquet 默认开启)
Spark 与 Hive-MetaStore 在处理Parquet-Schema是有一些区别
- Hive-MetaStore不区分大小写,而Spark-Parquet-Schema 区分大小写
- Hive-MetaStore认为列都是可空的,而Spark认为列是否可空必须显式指定
所以在Parquet上,Spark会做出一些自协调如下
- 两个模式中相同名称的字段必须具有相同的数据类型,但不关心是否为空(都为可空)
- 如果列只出现在Spark-Parquet-Schema上但没有出现在Hive-MetaStore上,该列会被删除
- 如果列只出现在Hive-MetaStore而没有出现在Spark-Parquet-Schema上,该列将以可空形式追加
一个可以参考的最佳实践是:
对所有列名不采用驼峰命名法,全部使用纯小写下划线分割的下划线命名法
parquet元数据刷新
Spark会缓存Parquet-Schema以提高性能,但如果开启了spark.sql.hive.convertMetastoreParquet,也就是使用Spark自身的Parquet支持,这时来自Hive元数据的信息依然会被缓存,如果此时Hive表在Spark之外被改变,就必须刷新一下Parquet元数据以使用到新的元数据
spark.catalog.refreshTable("my_table")
parquet可用配置
参数名 | 默认值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 一些其它的基于Parquet系统(比如Hive,Impala,旧版Spark-SQL),在写出Parquet-schema时不区分binary data (二进制数据)和 strings (字符串) 启用这个属性,将保持将二进制数据解释为字符串的兼容性 |
spark.sql.parquet.int96AsTimestamp | true | 一些其它的基于Parquet系统(比如Hive,Impala),会将Timestamp写为int96.该属性告知Spark-SQL将int96解析为Timestamp的兼容性 |
spark.sql.parquet.cacheMetadata | true | 指示是否打开parquet的元数据缓存(这可以加快查询静态数据的速度) |
spark.sql.parquet.compression.codec | snappy | 指示写出parquet文件的压缩类型(默认snappy).可选项为:uncompressed, snappy, gzip, lzo |
spark.sql.parquet.filterPushdown | true | 设置为 true 时启用 (过滤谓词下推,尝试调整算子执行顺序,将写在后面的过滤谓词提前处理以减少查询数据量,提高性能) |
spark.sql.hive.convertMetastoreParquet | true | 面对Hive时,是否启用使用Spark-SQL内置的parquet,设为否则放弃使用内置parquet转而使用Hive serDe |
spark.sql.parquet.mergeSchema | false | 是否全局启用parquet的元数据合并,设为否则从summary file或random file中随机挑选 |
spark.sql.optimizer.metadataOnly | true | 当设为true时,将使用metadata信息来构建分区列而不是走表扫描.(只是在查询的所有列都是分区列时才有意义,并且此时依然有各种聚合能力) |
Orc
ORC格式是Spark-2.3之后才支持的数据源格式.它的可配置信息如下
参数名 | 默认值 | 描述 |
---|---|---|
spark.sql.orc.impl | hive | ORC的实现名称.可选值hive(使用hive-1.2.1的ORC库),native(或者hive.native,将使用Apache ORC 1.4.1) |
spark.sql.orc.enableVectorizedReader | true | 是否在本机使用向量化ORC解码.如果为false,会在本机构造一个向量化ORC阅读器,对于上面设置为hive,这个属性将被忽略 |
Json
Spark-SQL可以自动推断Json的schema.(Json的schema依然是自存储的)
Sparl-SQL加载将以DataFrame[Row]的形式,并且比较轻松的转换为一个DatasetT
//读取Json数据源,如果某些行缺age属性的,补0
spark.read.json("D:\\data\\people.json").na.fill(0,Seq("age"))
Hive
Hive的支持
Spark-SQL还支持读取和写入存储在Hive中的数据.Spark-SQL使用Hive,但需要做一些补充配置
- Hive的库必须在classpath中被找到.因为Hive的库有大量的依赖,而这些依赖不一定是被Spark完全打包的,所以在运行时,必须要能找到这些库包
注意:这种依赖与找到是针对每个executor而言的,因为executor才是真正的执行者 - 需要在Spark.Conf中拷入 hive-site.xml, core-site.xml(用于安全配置)和 hdfs-site.xml (用于 HDFS 配置)文件
- 需要在SparkSession中启用对Hive的支持:spark.enableHiveSupport()
在实际运行中,不一定需要部署一个完整的Hive系统,比如当hive-site.xml不存在时,会在当前目录创建一个metastore_db用以存储元数据(不推荐如此)
demo
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.enableHiveSupport()
.getOrCreate()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// | 0|
// | 1|
// | 2|
// ...
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
Hive的存储格式
创建一个Hive表,需要定义读取/写入文件系统的格式,以及数据的行断定义,列断定义等.Spark-Sql读取Hive表时,默认以文本格式读取
参数名 | 描述 |
---|---|
fileFormat | fileFormat是一种存储格式规范的包,包括serde,input format,output format.目前支持6个文件格式sequencefile,rcfile,orc,parquet,textfile,avro |
inputFormat, outputFormat | 这两个选项将相应的InputFormat和OutputFormat类的名称指定为字符串文字,例如: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。 这两个选项必须成对出现,如果已经指定了 “fileFormat” 选项,则无法指定它们 |
serde | 此选项指定 serde 类的名称。 当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 的信息,那么不要指定这个选项。 目前的 “sequencefile”, “textfile” 和 “rcfile” 不包含 serde 信息,你可以使用这3个文件格式的这个选项 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim | 这些选项只能与 “textfile” 文件格式一起使用。它们定义如何将分隔的文件读入行 |
对Hive不支持的地方
Spark-SQL,相当于内置了一个Hive引擎,它具有Hive绝大部分的功能,但确实不是完全支持Hive.Spark-SQL不支持的如下
- Tables with buckets
- UNION 和 Unique join
- Hive的列静态数据统计收集功能
- File format for CLI Spark-SQL只支持 TextOutputFormat
- 对一些UDF函数,Spark-SQL结果也与原生Hive不同
SQRT(n) If n < 0, Hive returns null, Spark SQL returns NaN ACOS(n) If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN ASIN(n) If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN
详情请见官网描述
JDBC
概述
Spark-SQL 同样支持以JDBC的形式从其它的关系型数据库读取数据.
(此方式优于JdbcRDD,因为DF更容易与处理与其它数据源的交互,比如直接映射为临时视图等等)
JDBC依赖目标数据库驱动包.
需要注意的是,这个驱动必须同时对driver端和executor端所有执行程序的类默认加载器可见
(DriverManager会忽略不可见的驱动导致执行找不到驱动)
例如,要从 Spark Shell 连接到 postgres 必须指定驱动包位置
bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jar
一个比较方便的办法是修改所有工作节点 compute_classpath.sh 包含进驱动包
常用配置
可以在数据源选项中指定 JDBC 连接属性。用户 和 密码通常作为登录数据源的连接属性提供。 除了连接属性外,Spark 还支持以下不区分大小写的选项
参数名 | 描述 |
---|---|
url | 要连接的JDBC URL。 源特定的连接属性可以在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 应该读取的 JDBC 表。请注意,可以使用在SQL查询的 FROM 子句中有效的任何内容。 例如,您可以使用括号中的子查询代替完整表 |
driver | 用于连接到此 URL 的 JDBC driver 程序的类名 |
partitionColumn, lowerBound, upperBound | 这三个属性必须被一起设置.partitionColumn 必须是数字列.lowerBound 和 upperBound 仅用于决定分区的大小而不是用于过滤表中的行.因此,表中的所有行将被分区并返回 |
numPartitions | 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。 如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制 |
fetchsize | JDBC 抓取的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操作 |
batchsize | JDBC 批处理的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能。 该选项仅适用于写操作。默认值为 1000 |
isolationLevel | 事务隔离级别,适用于当前连接。 它可以是 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED 此选项仅适用于写操作。请参考 java.sql.Connection 中的文档 |
sessionInitStatement | 在每个数据库会话打开到远程DB并开始读取数据之后,该选项将执行一个自定义SQL语句(或PL\SQL块)。使用它来实现会话初始化代码 示例:选项(“sessionInitStatement”、“”开始执行即时的“alter session set”“_serial_direct_read”=true”;结束;”“”) |
truncate | 这是一个与 JDBC 相关的选项.启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。它默认为 false。 此选项仅适用于写操作 |
createTableOptions | 这是一个与JDBC相关的选项。 如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操作 |
createTableColumnTypes | 这是一个与JDBC相关的选项.如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB.).此选项仅适用于写操作 |
customSchema | 用于从连接器读取JDBC数据的自定义模式。例如,“id DECIMAL(38,0), name STRING”。还可以指定部分字段,其他字段使用默认类型映射 例如,“id DECIMAL(38,0)”。列名应该与JDBC表对应的列名相同。用户可以指定Spark SQL的相应数据类型,而不是使用默认值。此选项仅适用于读取 |
demo
/**
* 连接RDBMS(SQLServer2008)Demo
*注:必须导入SQLServer驱动包(sqljdbc42.jar),(代码中不能体现,但实际已在项目中添加过了)
*/
val df = spark.read.format("jdbc")
.option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url","jdbc:sqlserver://127.0.0.1\\Sql2008:1433;DatabaseName=AdapterService")
.option("user", "sa")
.option("password", "12abAB")
.option("dbtable", "dbo.tm_flow")
.load()
df.show(10);