spark中sqlcontext的用法
邢冷勋
2023-12-01
vi people.json
{"name":"zhangsan","job number":"101","age":33,"gender":"male","deptno":1,"sal":18000}
{"name":"lisi","job number":"102","age":30,"gender":"male","deptno":2,"sal":20000}
{"name":"wangwu","job number":"103","age":35,"gender":"female","deptno":3,"sal":50000}
{"name":"zhaoliu","job number":"104","age":31,"gender":"male","deptno":1,"sal":28000}
{"name":"tianqi","job number":"105","age":36,"gender":"female","deptno":3,"sal":90000}
vi dept.json
{"name":"development","deptno":1}
{"name":"personnel","deptno":2}
{"name":"testing","deptno":3}
新员工表
vi newpeople.json
{"name":"zhaosi","job number":"121","age":53,"gender":"male","deptno":1,"sal":98000}
{"name":"liuneng","job number":"122","age":60,"gender":"male","deptno":1,"sal":91000}
-------------------
[hadoop@h201 ss]$ hadoop fs -put people.json /user/hadoop/
[hadoop@h201 ss]$ hadoop fs -put dept.json /user/hadoop/
1.
spark SQL的入口点位 SQLContext
1.3版本后 自动生成 sqlcontext
1.3版本后 引进了新的组件DataFrame
DataFrames
DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。
DataFrame的API支持4种语言:Scala、Java、Python、R。
[hadoop@h201 spark-1.1.0-bin-hadoop2.4]$ bin/spark-shell
scala> val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala> val people=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/people.json")
scala> val dept=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/dept.json")
查看表中内容
scala> people.show
scala> people.show(10) 显示前十条记录
查看列信息
scala> people.columns
添加过滤条件
scala> people.filter("gender='male'").count
scala> people.where($"age" >20).show
scala> people.where($"age" >30 && $"gender" ==="male").show
scala> people.sort($"age".desc).show
scala> people.sort($"age").show(3) 默认升序
添加一个列
scala> people.withColumn("level",people("age")/10).show
修改列名
scala> val xp=people.withColumnRenamed("job number","jobId")
scala> val newpeople=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/newpeople.json")
查看两张表中的所有内容
scala> people.unionAll(newpeople).show
查看同部门的人数
scala> people.unionAll(newpeople).groupBy(col("deptno")).count.show
scala> people.unionAll(newpeople).groupBy(col("deptno")).count.filter($"count">2)show
分组统计
scala> val deptagg = people.groupBy("deptno").agg(Map("age" -> "max","gender" ->"count"))
scala> deptagg.show
去重
scala> people.select("deptno").distinct.show
join
scala> people.join(dept,people("deptno")===dept("deptno"),"outer").show
join分组
scala> val joinP=people.join(dept,people("deptno")===dept("deptno"),"outer")
scala> val joinGp =joinP.groupBy(dept("deptno")).agg(Map("age" ->"max","gender" ->"count"))
scala> joinGp.show
另存为json文件
scala> joinP.save("hdfs://h201:9000/user/hadoop/joinP.json","json")
==================================================
注册成临时表
scala> people.registerTempTable("pp")
scala> sqlContext.sql("show databases").show
scala> sqlContext.tableNames
查看表结构
scala> sqlContext.table("pp")
scala> sqlContext.tables("default").show
sql命令执行
scala> val a1=sqlContext.sql("select name from pp where sal>20000 and sal<50000")
scala> a1.collect()
缓存表(调优部分使用)
缓存使用柱状格式存储表,即按列存储,浏览时仅自动解压缩需要的列,减少内存的使用。
scala> val a2=sqlContext.sql("select name from pp")
scala> sqlContext.cacheTable("pp")
scala> a2.show
释放缓存表
scala> sqlContext.uncacheTable("pp")