当前位置: 首页 > 知识库问答 >
问题:

Spark:组RDD Sql查询

滕渝
2023-03-14
[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]
[2009,4929101,R01,2015-01-20 20:44:39]
[2009,4929102,R02,2015-01-20 14:00:00] (RPM)
[2009,4929102,P01,2015-01-20 12:00:00] (PPM)
[2009,4929102,R03,2015-01-20 15:00:00] (RPM)
[2009,4929102,C01,2015-01-20 13:00:00] (RPM)
[2009,4929103,R01,2015-01-20 14:44:39]
[2009,4929105,R01,2015-01-20 12:44:39]
[2009,4929105,V01,2015-01-20 11:44:39]
[2009,4929106,R01,2015-01-20 13:44:39]

val celllookuprdd:[celltype,cellname](cellname有4个值)

[R01,RPM]
[R02,RPM]
[R03,RPM]
[C01,RPM]
[P01,PPM]
[V01,PPM]

预期结果:[id,1001的位置,1001的日期1,2009年的第一个转数日期,2009年的最后一个转数日期,2009年的第一个PPM日期,2009年的最后一个PPM日期]

4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
4929102,LOC01,2015-01-20 10:44:39,2015-01-20 13:00:00,2015-01-20 15:00:00,2015-01-20 12:00:00,NULL
4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,NULL,NULL,NULL
4929104,LOC03,2015-01-20 10:44:39,NULL,NULL,NULL,NULL
4929105,NULL,NULL,2015-01-20 12:44:39,NULL,2015-01-20 11:44:39,NULL
4929106,NULL,NULL,2015-01-20 13:44:39,NULL,NULL,NULL

这是我当前的查询(在这里,我还指示了一个可选的eventtype作为第一列;但是在我以前的event2009RDD中,我选择了一个最小和最大日期,这是错误的,因为我需要通过celllookuprdd-rpm和PPM确定四个日期):

select if(event1001Table.eventtype is not null, event1001Table.eventtype,
          event2009Table.eventtype), 
       if(event1001Table.id is not null, event1001Table.id, 
          event2009Table.id), 
       event1001Table.date1, event2009Table.minDate, event2009Table.maxDate  
       from event1001Table full outer join event2009Table  
       on event1001Table.id=event2009Table.id")

编辑后显示应用答案后的结果:

  " min(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmn, " +
  " max(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmx, " +
  " min(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmn, " +
  " max(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmx " +


[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
---res: [2009,4929102,NULL,NULL,2015-01-20 13:00:00,2015-01-20 14:00:00] 
--- CORRECT

[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,P01,2015-01-20 14:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 14:00:00,NULL] 
--- INCORRECT (max should be equal to MIN although NULL is preferred if possible but I could just check in the code later on if min=max)

[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
[2009,4929102,P01,2015-01-20 09:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 09:00:00,NULL] 
--- INCORRECT (max is not working)

共有1个答案

柴文林
2023-03-14

让我们一步一步来做吧。让我们先构建2009年的部分

event2009RDD.registerTempTable("base2009")
cellLookupRDD.registerTempTable("lookup")

trns2009 = ssc.sql("select eventtype, id, \
                          min(case when l.cn = 'RPM' then r.date1 else null end) rpmmn, \
max(case when l.cn = 'RPM' then r.date1 else null end) rpmmx, \
min(case when l.cn = 'PPM' then r.date1 else null end) ppmmn, \
max(case when l.cn = 'PPM' then r.date1 else null end) ppmmx, \
from base2009 r inner join lookup l on r.celltype=l.celltype \
group by eventtype,id "

trns2009 .registerTempTable("transformed2009")

现在,您可以使用1001数据集进行完整的外部联接并获得输出。

注意:您不应该有

4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39,NULL,NULL
 类似资料:
  • 为了给出backfround,我使用

  • Spark项目由不同类型的紧密集成组件组成。Spark是一个计算引擎,可以组织,分发和监控多个应用程序。 下面我们来详细了解每个Spark组件。 Spark Core Spark Core是Spark的核心,并执行核心功能。 它包含用于任务调度,故障恢复,与存储系统和内存管理交互的组件。 Spark SQL Spark SQL构建于Spark Core之上,它为结构化数据提供支持。 它允许通过SQ

  • 我正在通过Spark Cassandra连接器应用以下内容:

  • 我正在尝试通过以下步骤通过 spark-1.6.0 和 scala-2.11.7 从 Cassandra 2.0.17 表中获取值 已启动cassandra--服务cassandr启动 已启动spark--sbin/start all。sh 规定的火花标度-箱/火花壳-罐子火花-盒-连接器_2.10-1.5.0-M1.jar 在 Scala 中执行了这些命令 直到现在一切都很好,但当我执行- 它给

  • 如果我试图缓存一个巨大的(例如:100GB表),当我对缓存的执行查询时,它会执行全表扫描吗?火花将如何索引数据。火花留档说: Spark SQL可以通过调用Spark,使用内存中的列格式缓存表。目录cacheTable(“tableName”)或dataFrame。缓存()。然后Spark SQL将只扫描所需的列,并将自动调整压缩以最小化内存使用和GC压力。你可以打电话给spark。目录uncac

  • 问题内容: 当我尝试将我的数据框分组到一个列上,然后尝试查找每个分组的最小值时,似乎无法在非数字列上执行此操作。然后,如何正确过滤分组依据上的最短(最早)日期? 我正在从Postgresql S3实例流式传输数据帧,因此已经配置了数据。 问题答案: 只需直接执行聚合,而不使用辅助程序即可: 与之不同的是,它将适用于任何类型。