当前位置: 首页 > 工具软件 > SqlContext > 使用案例 >

spark中sqlcontext的用法

邢冷勋
2023-12-01
vi people.json
{"name":"zhangsan","job number":"101","age":33,"gender":"male","deptno":1,"sal":18000}
{"name":"lisi","job number":"102","age":30,"gender":"male","deptno":2,"sal":20000}
{"name":"wangwu","job number":"103","age":35,"gender":"female","deptno":3,"sal":50000}
{"name":"zhaoliu","job number":"104","age":31,"gender":"male","deptno":1,"sal":28000}
{"name":"tianqi","job number":"105","age":36,"gender":"female","deptno":3,"sal":90000}

vi dept.json
{"name":"development","deptno":1}
{"name":"personnel","deptno":2}
{"name":"testing","deptno":3}

新员工表
vi newpeople.json
{"name":"zhaosi","job number":"121","age":53,"gender":"male","deptno":1,"sal":98000}
{"name":"liuneng","job number":"122","age":60,"gender":"male","deptno":1,"sal":91000}

-------------------
[hadoop@h201 ss]$ hadoop fs -put people.json /user/hadoop/
[hadoop@h201 ss]$ hadoop fs -put dept.json /user/hadoop/

1.
spark SQL的入口点位 SQLContext
1.3版本后 自动生成 sqlcontext
1.3版本后  引进了新的组件DataFrame
DataFrames
DataFrame是一个分布式的数据集合,该数据集合以命名列的方式进行整合。DataFrame可以理解为关系数据库中的一张表,也可以理解为R/Python中的一个data frame。DataFrames可以通过多种数据构造,例如:结构化的数据文件、hive中的表、外部数据库、Spark计算过程中生成的RDD等。
DataFrame的API支持4种语言:Scala、Java、Python、R。

[hadoop@h201 spark-1.1.0-bin-hadoop2.4]$ bin/spark-shell
scala> val sqlContext=new org.apache.spark.sql.SQLContext(sc)

scala> val people=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/people.json")
scala> val dept=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/dept.json")

查看表中内容
scala> people.show
scala> people.show(10) 显示前十条记录

查看列信息
scala> people.columns

添加过滤条件
scala> people.filter("gender='male'").count
 
scala> people.where($"age" >20).show

scala> people.where($"age" >30 && $"gender" ==="male").show

scala> people.sort($"age".desc).show

scala> people.sort($"age").show(3)     默认升序

添加一个列
scala> people.withColumn("level",people("age")/10).show

修改列名
scala> val xp=people.withColumnRenamed("job number","jobId")


scala> val newpeople=sqlContext.jsonFile("hdfs://h201:9000/user/hadoop/newpeople.json")

查看两张表中的所有内容
scala> people.unionAll(newpeople).show

查看同部门的人数
scala> people.unionAll(newpeople).groupBy(col("deptno")).count.show
scala> people.unionAll(newpeople).groupBy(col("deptno")).count.filter($"count">2)show

分组统计
scala> val deptagg = people.groupBy("deptno").agg(Map("age" -> "max","gender" ->"count"))
scala> deptagg.show

去重
scala> people.select("deptno").distinct.show

join
scala> people.join(dept,people("deptno")===dept("deptno"),"outer").show

join分组
scala> val joinP=people.join(dept,people("deptno")===dept("deptno"),"outer")
scala> val joinGp =joinP.groupBy(dept("deptno")).agg(Map("age" ->"max","gender" ->"count"))
scala> joinGp.show

另存为json文件
scala> joinP.save("hdfs://h201:9000/user/hadoop/joinP.json","json")

==================================================
注册成临时表
scala> people.registerTempTable("pp")
scala> sqlContext.sql("show databases").show

scala> sqlContext.tableNames

查看表结构
scala> sqlContext.table("pp")

scala> sqlContext.tables("default").show

sql命令执行
scala> val a1=sqlContext.sql("select name from pp where sal>20000 and sal<50000")
scala> a1.collect()


缓存表(调优部分使用)
缓存使用柱状格式存储表,即按列存储,浏览时仅自动解压缩需要的列,减少内存的使用。
scala> val a2=sqlContext.sql("select name from pp")
scala> sqlContext.cacheTable("pp")

scala> a2.show

释放缓存表
scala> sqlContext.uncacheTable("pp")


 类似资料: