SparkSQL(一)

SparkSQL(一)简介 spark1 0 版本就已经退出 SparkSQL 最早叫 sharkShark 是基于 spark 框架并且兼容 hive 执行 SQL 执行引擎 因为底层使用了 Spark 比 MR 的 Hive 普遍要快上两倍左右 当数据全部 load 到内存中 此时会比 Hive 快上 10 倍以上 SparkSQL 就是一种交互式查询应用服务特点 1 内存列存储 可以大大优化内存的使用率 减少内存消耗 避免 GC 对大量数据性能的开销 2

简介

特点

为什么要学习SparkSQL?

DataFrames

RDD和DataFrame的区别

在这里插入图片描述

创建DataFrames

1)spark-shell版本
spark中已经创建好了SparkContext和SQLContext对象
2)代码
spark-shell –master spark://hadoop1:7077 –executor-memory 512m –total-executor-cores 2
//创建了一个数据集,实现了并行化
val seq= Seq((“1”,“xiaoming”,15),(“2”,“xiaohong”,20),(“3”,“xiaobi”,10))
在这里插入图片描述
在这里插入图片描述














在这里插入图片描述
_1:列名,String当前列的数据类型
//查看数据 show 算子来打印,show是一个action类型 算子
df.show
在这里插入图片描述
在这里插入图片描述










DSL 风格语法

1.查询:

df.select("name").show df.select("name","age")..show //条件过滤 df.select("name","age").filter("age >10").show //参数必须是一个字符串,filter中的表达式也需要时一个字符串 

在这里插入图片描述
//2.参数是类名col (“列名”)
df.select(“name”,“age”).filter(col(“age”) >10).show
在这里插入图片描述
//3.分组统计个数








df.groupBy("age").count().show() 

在这里插入图片描述
//4.打印DataFrame结构信息

df.printSchema 

在这里插入图片描述

在这里插入图片描述

Hive中orderby和sortby的区别?

5.orderby 是全局有序 distribute sort by :局部有序,全局无序

结构表信息
sqlContext.sql(“desc t_person”).show

以编码的形式来执行sparkSQL

先将工程中的maven添加配置

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.3</version> </dependency> 

第一种通过反射方式推断

SparkSQLDemo1.scala

import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ 
   DataFrame, SQLContext} import org.apache.spark.{ 
   SparkConf, SparkContext} / * sparkSQL --就是查询 */ object SparkSQLDemo1 { 
    def main(args: Array[String]): Unit = { 
    //之前在spark-shell中,sparkContext和SQLContext是创建好的 所以不需要创建 //因为是代码编程,需要进行创建 val conf = new SparkConf().setAppName("SparkSQLDemo1").setMaster("local") val sc =new SparkContext(conf) //创建SQLContext对象 val sqlc = new SQLContext(sc) //集群中获取数据生成RDD val lineRDD: RDD[Array[String]] = sc.textFile("hdfs://hadoop2:8020/Person.txt").map(_.split(" ")) //lineRDD.foreach(x => println(x.toList)) //将获取数据 关联到样例类中 val personRDD: RDD[Person] = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt)) import sqlc.implicits._ //toDF相当于反射,这里若要使用的话,需要导入包 / * DataFrame [_1:int,_2:String,_3:Int] * spark-shell 数据是一个自己生成并行化数据并没有使用样例类来 存数据而是直接使用 * 直接调用toDF的时候,使用就是默认列名 _+数字 数字从1开始逐渐递增 * 可以在调用toDF方法的时候指定类的名称(指定名称多余数据会报错) * * 列名不要多余,也不要少于 * 也就是说列名要和数据一一对应 * * 使用代码编程数据是存储到样例类中,样例类中的构造方法中的参数就是对应的列名 * 所以通过toDF可以直接获取对应的属性名作为列名使用 * 同时也可以自定义列名 * */ val personDF: DataFrame = personRDD.toDF() //val personDF: DataFrame = personRDD.toDF("ID","NAME","AGE") personDF.show() //使用Sql语法 //注册临时表,这个表相当于存储在 SQLContext中所创建对象中 personDF.registerTempTable("t_person") val sql = "select * from t_person where age > 20 order by age" //查询 val res = sqlc.sql(sql) // def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate)) //默认打印是20行 res.show() //固化数据 //将数据写到文件中mode是以什么形式写 写成什么文件 / * def mode(saveMode: String): DataFrameWriter = { * this.mode = saveMode.toLowerCase match { * case "overwrite" => SaveMode.Overwrite -复写 * case "append" => SaveMode.Append -- 追加 * case "ignore" => SaveMode.Ignore * case "error" | "default" => SaveMode.ErrorIfExists * case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + * "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") * */ // res.write.mode("append").json("out3") // hdfs://hadoop2:8020/out111") //除了这两种还可以csv模式,json模式 //csv在 1.6.3 spark中需要第三方插件,才能使用能使用,,,,2.0之后自动集成 //这个方法不要使用因为在2.0会被删除 res.write.mode("append").save("hdfs://hadoop2:8020/out111") } case class Person(id:Int,name:String,age:Int) } 

第二通过StructType

SparkSQLStructTypeDemo.scala

import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ 
   DataFrame, Row, SQLContext} import org.apache.spark.sql.types.{ 
   IntegerType, StringType, StructField, StructType} import org.apache.spark.{ 
   SparkConf, SparkContext} object SparkSQLStructTypeDemo { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("SparkSQLStructTypeDemo").setMaster("local") val sc = new SparkContext(conf) val sqlcontext = new SQLContext(sc) //获取数据并拆分 val lineRDD = sc.textFile("hdfs://hadoop2:8020/Person.txt").map(_.split(" ")) //创建StructType对象 封装了数据结构(类似于表的结构) val structType: StructType = StructType { 
    List( //列名 数据类型 是否可以为空值 StructField("id", IntegerType, false), StructField("name", StringType, true), StructField("name", IntegerType, false) //列需要和数据对应,但是StructType这种可以: / * 列的数据大于数据,所对应列的值应该是null * 列数是不能小于数据,不然会抛出异常 * StructField("oop", IntegerType, false) * StructField("poo", IntegerType, false) */ ) } //将数据进行一个映射操作 val rowRDD: RDD[Row] = lineRDD.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt)) //将RDD转换为DataFrame val personDF: DataFrame = sqlcontext.createDataFrame(rowRDD,structType) personDF.show() } } 

1.将当前程序打包操作提交到集群,需要做 一定的更改 ,注意path路径 修改为 args(下标)

模式
spark-submit
–class 类名(类的全限定名(包名+类名))
–master spark://集群:7077
/root/jar包路径
输入数据路径
输出路径数据












JDBC数据源

SparkSql可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,在通过对DataFrame的一系列操作,还可以将数据写到关系型数据库中

 spark-shell --master spark://hadoop1:7077 --executor-memory 512m --total-executor-cores 2 --jars /root/mysql-connector-java-5.1.32.jar --driver-class-path /root/mysql-connector-java-5.1.32.jar 

将数据写入到Mysql中

import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ 
   DataFrame, Row, SQLContext} import org.apache.spark.sql.types.{ 
   IntegerType, StringType, StructField, StructType} import org.apache.spark.{ 
   SparkConf, SparkContext} object DataFormeInputJDBC { 
    /* def createSC(AppName:String,Master:String):SparkContext = { } def createSC(AppName:String,Master:String,sc:SparkContext):SQLContext = { }*/ def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("DataFormeInputJDBC").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //获取数据拆分 val lines = sc.textFile("hdfs://hadoop1:8020/Person.txt").map(_.split(" ")) // StructType 存的表结构 val structType: StructType = StructType(Array(StructField("id", IntegerType, false), StructField("name", StringType, true), StructField("age", IntegerType, true))) //开始映射 val rowRDD: RDD[Row] = lines.map(arr => Row(arr(0).toInt,arr(1),arr(2).toInt)) //将当前RDD转换为DataFrame val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,structType) //创建一个用于写入mysql配置信息 val prop = new Properties() prop.put("user","root") prop.put("password","123") prop.put("driver","com.mysql.jdbc.Driver") //提供mysql的URL val jdbcurl = "jdbc:mysql://hadoop1:3306/mydb1" //表名 val table = "person" //数据库要对,表若不存在会自动创建并存储 //需要将数据写入到jdbc //propertities的实现是HashTable personDF.write.mode("append").jdbc(jdbcurl,table,prop) println("插入数据成功") sc.stop() } } 

HIVE-on-Spark

hive底层是通过MR进行计算,将其改变为SparkCore来执行

配置步骤
1.在不是高可用集群的前提下,只需要将Hadoop安装目录中的core-site.xml拷贝到spark的配置conf文件目录下即可
2.将hive安装路径下的hive-site.xml拷贝到spark的配置conf配置文件目录下即可
注意
若是高可用:需要将hadoop安装路径下的core-site,xml和hdfs-site.xml拷到spark的conf目录下








操作完成后建议重启集群
通过sparksql来操作,需要在spark安装路径中sbin目录

启动: spark-sql \ --master spark://hadoop1:7077 \ --executor-memory 512m \ --total-executor-cores 2 \ --jars /root/mysql-connector-java-5.1.32.jar \ --driver-class-path /root/mysql-connector-java-5.1.32.jar 基本操作: 1.创建表: create table person1(id int,name string,age int)row format delimited fields terminated by ' ' 2.加载数据:(本地加载) load data local inpath '/root/Person.txt' into table person1; 3.查询: select * from person1; select name,age from person where age > 20 order by age; 4.删除 drop table person 

内部表和外部表

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/228073.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午7:53
下一篇 2026年3月16日 下午7:53


相关推荐

  • HTML5快速设计网页[通俗易懂]

    HTML5快速设计网页目录一、认识web开发和软件安装二、使用HTML/HTML5搭建页面骨架一、认识web开发和软件安装1、认识网页:商城网页比较经典,比如京东,淘宝、小米商城还有锤子官网等有图片、文字还有一些多媒体组合而成的。我们还需要善于观察然后模仿成自己的2、网站:由多个网页组织在一起而成的,网页和网页之间是有联系的。就像蜘蛛网一样织成一张大网3、用户眼中…

    2022年4月11日
    52
  • 张钹:人工智能技术已进入第三代

    张钹:人工智能技术已进入第三代原文编注:近日,中科院院士、清华大学人工智能研究院院长张钹教授接受记者采访时认为,目前基于深度学习的人工智能在技术上已经触及天花板。从长远来看,必须得走人类智能这条路,最…

    2022年7月26日
    9
  • 最详细的maven配置——报错了你打我[通俗易懂]

    最详细的maven配置——报错了你打我[通俗易懂]目录1、前言2、下载3、配置PATH、settings.xml以及本地仓库3.1、配置path3.2、配置settings.xml和本地仓库4、在IDEA中配置Maven1、前言maven说的简短一点就是一个大型的jar包管理工具,类似于工具人。只要有了maven,就不用去幸幸苦苦的找jar包了。wc,爽哉。好了,还是不多bb,我么还是直接干正事。(切记切记:安装maven必须装好jdk)2、下载首先我们还是去官网。瞅瞅最新版的是哪个版本。点我直达当然,玩Windows的人都知道,我们下

    2022年5月28日
    54
  • ZooKeeper 常用应用场景原理详解

    ZooKeeper 常用应用场景原理详解ZooKeeper 常用应用场景原理详解,zookepper存放数据的目录结构类似于标准的文件系统格式,如果使用过window或linux就能体会到其内部的数据结构

    2022年6月17日
    46
  • JSP格式化Date

    JSP格式化Date问题描述前端从后台获取到的时间格式为 ThuApr0510 30 00CST2018 现在传参后台需要的时间格式为 2018 04 0510 30 解决方法 jsp 页面引用格式化标签库 lt taglibprefix fmt uri http java sun com jsp jstl fmt gt

    2026年3月17日
    2
  • 第一范式、第二范式、第三范式、BCNF范式详解

    第一范式、第二范式、第三范式、BCNF范式详解范式是“符合某一种级别的关系模式的集合,表示一个关系内部各属性之间的联系的合理化程度”。

    2022年5月24日
    40

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号