NightPxy 个人技术博客

Sql-概述

Posted on By NightPxy

概述

Spark-SQL是Spark处理结构化数据的一个模块.

结构化数据
结构化数据的一个简单理解就是指带有Schema信息的数据
Spark-SQL是专门用来处理结构化数据的意思是:

  • Spark-SQL只能处理带Schema的数据
  • 通过Schema信息,Spark-SQL会带来额外的执行优化的收益

Spark-SQL 与 SQL Spark-SQL本身并不等于SQL,它包含SQL,又不仅限于SQL

  • 包含SQL是指,Spark-SQL提供了以类SQL的形式使用Spark
  • 不仅限于SQL是指,是指Spark-SQL在SQL之外,还提供了DF/DS Api,以及各种外部数据源的整合,让使用者以一种高维视角来同时处理HDFS,RDBMS,MongoDB,HBASE等等可以结构化的数据

结构化处理方案

Spark-SQL提供三个组件来处理结构化数据: DataFrame,Dataset以及SQL

  • Dataset
    Dataset 是 Spark 1.6+ 提供的一种新的分布式数据集,类似一个Java的强类型集合
  • DataFrame
    DataFrame相当于是Dataset[Row].它提供一个统一对象(包含列名,数据类型等)Row作为Dataset强类型的封装,类似于RDB的表或者弱类型集合的概念
  • SQL
    以SQL的形式,执行Spark作业

注意:
这三种组件都只是针对开发者而言,事实上在Spark内部,这三种组件都会转化使用同一种计算引擎执行.这带来一个额外的好处是,可以在这三种组件中做任意的切换

组件

SparkSession

SparkSession 是 Spark-SQL的统一入口.一个简单的SparkSession声明如下

import org.apache.spark.sql.SparkSession

  val spark = SparkSession
    .builder()
    .getOrCreate()

  // For implicit conversions like converting RDDs to DataFrames
  import spark.implicits._

Dataset

import org.apache.spark.sql.SparkSession;

  val spark = SparkSession
    .builder()
    .appName("Spark-Sql-DataFrame-App").master("local[2]")
    .getOrCreate();
  import spark.implicits._;

  /**
    * 构建一个DF
    *   读取一个本地Json文件(可以是HDFS或任意Hadoop支持的存储系统)
    */
  val peopleDF =  spark.read.json("D:\\data\\people.json");

  /**
    * DF的一些基本操作
    */
  println("******************************** DF的一些基本操作 *************************************")
  //打印架构
  peopleDF.printSchema()
  //打印前10行数据
  peopleDF.show(10)
  //dataFrame依然有RDD的支持(比如持久化机制,注意在Spark-SQL中,缓存是立即执行的)
  val peopleCache = peopleDF.filter($"age">0).cache();
  //一个简单的dataFrame查询实例
  peopleCache.filter($"age"< 30).select($"name",$"age").groupBy($"name",$"age").count().show(10)


  /**
    * DF转SQL的临时表
    * 这样可以非常容易的将一个DF转到SQL层面来处理
    */
  println("******************************** DF转SQL *************************************")
  //类似RDBMS的会话临时表,SparkSession关闭后自动关闭临时表
  peopleCache.createOrReplaceTempView("peopleTmp")
  spark.sql("select name,age,count(1) from peopleTmp where age < 30 group by name,age").show(10)
  //类似RDBMS的全局临时表,可以在跨Session状态下使用
  //全局临时表必须注册在global_temp,且使用时必须包含全名: global_temp.xxxx
  peopleCache.createOrReplaceGlobalTempView("peopleGlobTmp")
  spark.newSession().sql("select name,age,count(1) from global_temp.peopleGlobTmp where age < 30 group by name,age").show(10)

  spark.close();

DataFrame

import org.apache.spark.sql.SparkSession;

  val spark = SparkSession
    .builder()
    .appName("Spark-Sql-DataSet-App").master("local[2]")
    .getOrCreate();

  import spark.implicits._;

  //定义描述DS结构的Person类型
  case class Person(name: String, age: Long)

  /**
    * 构建DS
    */
  //从一个Java对象集合中创建DS
  val dsFromScalaCollection = Seq(Person("SuperMan", 20)).toDS();
  //从一个结构化的外部系统中创建DS
  val dsFromLocalSystem_Json = spark.read.format("json").load("D:\\data\\people.json").na.fill(0,Seq("age")).as[Person];
  //从一个非结构化的外部数据源中创建DS
  val dsFromLocalSystem_Txt = spark.read.textFile("D:\\data\\people.txt")
    .map(line => line.split(",")).map(columns => Person(columns(0), columns(1).trim.toInt))
  //使用unionByName而不使用union  union:根据模式中的位置解析列(模式中的列位置不一定与数据集中强类型对象中的字段匹配)
  val peopleDS = dsFromScalaCollection.unionByName(dsFromLocalSystem_Txt).unionByName(dsFromLocalSystem_Json)

  //数据任意使用,不关心来自什么外部系统(外部系统异构透明)
  peopleDS.filter($"age"< 30).select($"name",$"age").groupBy($"name",$"age").count().show(10)

  spark.stop()

Sql

val spark = SparkSession.builder().appName("SQL-App").master("local[2]").getOrCreate()
  import  spark.implicits._;
  case class Person(name: String, age: Long)

  //DS读取非结构化数据至SQL临时表
  spark.read.textFile("D:\\data\\people.txt")
    .map(line=>line.split(",")).map(columns=>Person(columns(0),columns(1).trim.toLong))
    .createTempView("data_from_txt")

  //Sql方式读取数据
  val sql =
    """
      |CREATE TEMPORARY VIEW data_from_json
      |USING org.apache.spark.sql.json
      |OPTIONS (
      |  path "D:\\data\\people.json"
      |)
    """.stripMargin;
  spark.sqlContext.sql(sql);
  //简单使用
  spark.sqlContext.sql("SELECT * FROM  data_from_json json join data_from_txt txt on json.name = txt.name").show();

  spark.close();

Spak-SQL 与 RDD

Spark-SQL 支持两种方式,将一个RDD转换为一个DF/DS

  • 用反射的方式推导出类型的schema信息.
    这种情况下,需要一个RDD本身有一个明确的指定类型
  • 手动构造一个schema,然后赋予一个RDD来转换为一个Dataset

反射推导

case class Person(name: String, age: Long)
//构造出一个RDD
val rdd = spark.sparkContext.textFile("D:\\data\\people.txt")
//RDD TO DF
val df = rdd
   .map(row => row.split(","))
   .map(columns => Person(columns(0), columns(1).trim.toInt))
   .toDF()
df.show(10)

构造schema

当类型无法在执行之前被定义时,通过以下步骤构建schema

  • 从RDD中构建出行定义
  • 创建一个StructType(schema定义类),匹配刚才的行定义
  • 使用SparkSession提供的createDataFrame应用schema到RDD
//一个非结构化的原始RDD
val rdd = spark.sparkContext.textFile("D:\\data\\people.txt")
 //原始RDD转RDD[Row]
val rowRdd = rdd.map(row=>row.split(",")).map(columns=>Row(columns(0),columns(1).trim.toLong))
//定义schema
val schema = StructType(
StructField(name = "name", dataType = org.apache.spark.sql.types.StringType, nullable = true) ::
StructField(name = "age", dataType = org.apache.spark.sql.types.LongType, nullable = true) :: Nil)
// DF 等于 RDD[Row] + schema
val df =spark.createDataFrame(rowRdd,schema)