当前位置: 首页 > 知识库问答 >
问题:

使用Spark Scala运行上一小时事务的总和

澹台成龙
2023-03-14
Customer     TimeStamp        Tr Last_1Hr_RunningSum
Cust-1  6/1/2015 6:51:55      1        1
Cust-1  6/1/2015 6:58:34      3        4
Cust-1  6/1/2015 7:20:46      3        7
Cust-1  6/1/2015 7:40:45      4       11
Cust-1  6/1/2015 7:55:34      5       15
Cust-1  6/1/2015 8:20:34      0       12
Cust-1  6/1/2015 8:34:34      3       12
Cust-1  6/1/2015 9:35:34      7        7
Cust-1  6/1/2015 9:45:34      3       10
Cust-2  6/1/2015 16:26:34     2        2
Cust-2  6/1/2015 16:35:34     1        3
Cust-2  6/1/2015 17:39:34     3        3
Cust-2  6/1/2015 17:43:34     5        8
Cust-3  6/1/2015 17:17:34     6        6
Cust-3  6/1/2015 17:21:34     4       10
Cust-3  6/1/2015 17:45:34     2       12
Cust-3  6/1/2015 17:56:34     3       15
Cust-3  6/1/2015 18:21:34     4       13
Cust-3  6/1/2015 19:24:34     1        1
    null

提前谢了。

共有1个答案

马峻
2023-03-14

我尝试使用嵌套查询运行sqlcontext.sql,但它给我带来了错误

你试过用join吗?

df.registerTempTable("input")

val result = sqlContext.sql("""
        SELECT
           FIRST(a.Customer) AS Customer,
           FIRST(a.Timestamp) AS Timestamp,
           FIRST(a.Tr) AS Tr,
           SUM(b.Tr) AS Last_1Hr_RunningSum
        FROM input a
        JOIN input b ON
          a.Customer = b.Customer
          AND b.Timestamp BETWEEN (a.Timestamp - 3600000) AND a.Timestamp
        GROUP BY a.Customer, a.Timestamp
        ORDER BY a.Customer, a.Timestamp
        """)

result.show()

它打印预期的结果:

+--------+-------------+---+-------------------+
|Customer|    Timestamp| Tr|Last_1Hr_RunningSum|
+--------+-------------+---+-------------------+
|  Cust-1|1420519915000|  1|                1.0|
|  Cust-1|1420520314000|  3|                4.0|
|  Cust-1|1420521646000|  3|                7.0|
|  Cust-1|1420522845000|  4|               11.0|
|  Cust-1|1420523734000|  5|               15.0|
|  Cust-1|1420525234000|  0|               12.0|
|  Cust-1|1420526074000|  3|               12.0|
|  Cust-1|1420529734000|  7|                7.0|
|  Cust-1|1420530334000|  3|               10.0|
|  Cust-2|1420554394000|  2|                2.0|
|  Cust-2|1420554934000|  1|                3.0|
|  Cust-2|1420558774000|  3|                3.0|
|  Cust-2|1420559014000|  5|                8.0|
|  Cust-3|1420557454000|  6|                6.0|
|  Cust-3|1420557694000|  4|               10.0|
|  Cust-3|1420559134000|  2|               12.0|
|  Cust-3|1420559794000|  3|               15.0|
|  Cust-3|1420561294000|  4|               13.0|
|  Cust-3|1420565074000|  1|                1.0|
+--------+-------------+---+-------------------+
 类似资料:
  • 我尝试将Azure服务总线与ApacheQPID和Spring与事务集成。 但Azure服务总线AMQP实现似乎不支持事务。这是真的吗?我没有找到相关信息。 这是我的JMS配置 这是我的spring集成片段: 它与session transact=“false”配合使用,但与session transact=“true”配合使用时会产生错误: QPID跟踪

  • 我目前正在编写一个spring-webmvc应用程序,几天前,我的第一个问题是我不知道如何使用Hibernate和Spring进行交易。现在一切正常,一个事务管理器上线了,我可以成功地处理事务。 这是我基于Java的Spring配置的一部分: 因此,当我想让我的服务层或dao层中的方法在事务中运行时,我只需用@Transactional对其进行注释,一切正常。我的问题是,当我不想让服务方法在事务内

  • 我有windows 2008任务调度程序,我设置了一个PHP脚本,以这样运行 C:\php\php。exe-f等。。。 在windows任务计划程序中,我只能每天或每小时计划一次如何将其配置为每4小时运行一次?

  • 问题内容: 我是Postgres的新用户,我敢肯定已经有了答案,但是我找不到。 我需要分析活动日志表中的一些数据,并将结果按时间段分组。 一个简单的问题版本是一个包含三个字段的表: 我要捕获的操作字符串可能是“ create_entry”(是的,我知道这很不错,因为它的数据库设计不错,但我坚持使用它) 我正在寻找的输出是一个报告,该报告按年份和月份显示了“ create_entry”操作的计数。就

  • 我有一个更新LetsEncrypt证书的Ansible任务。我只希望这项任务每周运行一次,以避免因重击API而受阻,但其余的任务需要每天运行- 有没有可能对一个可完成的任务进行速率限制,如果它在过去n小时内已经运行过,那么它将被跳过? 我能想到的最好的方法是,如果任务已运行,则触摸文件,如果文件存在并且比某个时间戳更新,则跳过任务 — 我确实想过在<code>ansible中对任务应用标记,并将标

  • 相关的ignite配置如下所示: