当前位置: 首页 > 面试题库 >

SparkSQL-滞后功能?

唐法
2023-03-14
问题内容

我在此DataBricks帖子中看到,SparkSql支持窗口函数,特别是我正在尝试使用lag()窗口函数。

我有几行信用卡交易,并且已经对它们进行了排序,现在我要遍历各行,并为每一行显示交易金额,以及当前行金额与上一行金额的差额。

在DataBricks帖子之后,我提出了这个查询,但是它给我抛出了一个异常,我无法理解为什么。

这是在PySpark中。tx是我在注册为临时表时创建的数据框。

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

和异常(被截断)。

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

我真的很感激任何见解,该功能是相当新的功能,就现有示例或其他相关帖子而言,没有太多事情要做。

编辑

我也尝试过在没有SQL语句的情况下执行此操作,如下所示,但继续出现错误。我已经将其与Hive和SQLContext一起使用,并收到相同的错误。

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)

问题答案:
  1. 框架规格应以关键字开头,ROWS而不是ROW
  2. 框架规格要求下限值
    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    

UNBOUNDED关键字

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  1. LAG 函数根本不接受框架,因此带有滞后的正确SQL查询看起来像这样
    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
     PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    

    ) as prev_amt from tx

编辑

关于SQL DSL使用:

  1. 正如您所读的错误消息

注意,使用窗口函数当前需要一个HiveContex

请务必sqlContext使用HiveContextnot进行初始化SQLContext

  1. windowSpec.rowsBetween(-1, 0)什么都不做,但是功能再次不支持帧说明lag


 类似资料:
  • 问题内容: 我正在此SQL函数中编写SQL Server 2012 但在第一行 ** LAG(Data)我有一个错误。所以我不知道如何在SQL Server 2008中使用LAG()函数。 有人可以帮我吗? 问题答案: 遗憾的是,SQL Server 2012之前的版本不可用。您必须使用自连接来代替:

  • 在以前的工作中,我们必须比较项目x和项目x-1以获得大量数据(~10亿行)。由于这是在SQL Server2008R2上完成的,我们必须使用自联接。很慢。 我想我要试验一下滞后函数;如果速度快,这将是非常有价值的。我发现它快了2到3倍,但由于它应该是一个简单的操作,而且它的查询计划/表扫描更简单/大大减少了,所以我非常失望。下面复制的代码。 创建数据库: 返回: 编辑: 根据@vnov的评论,在我

  • 我有一个使用Kafka 1.0作为队列的应用程序。Kafka主题有80个分区和80个正在运行的使用者。(Kafkapython消费者)。 通过运行命令: 我看到其中一个分区被卡在一个偏移位置,并且随着新记录的添加,延迟会不断增加。 上面命令的输出如下所示: 这是什么原因?此外,不需要使用重置偏移量命令重置偏移量,因为可能不会定期手动监视此服务器。 客户端作为Linux m/c中的并行进程在后台运行

  • 问题内容: 请考虑下表: 该列会自动增加,但包含空白。该列为数字。 我想通过设置相对于上述两行的方式来查看随时间的增长。那是针对行,我要相对于行(334)设置行(546 )的。因此,要为行计算的值是546/334 = 1.63473。 这是我想要实现的结果: 如何在MySQL中执行此类滞后? 请注意,该列包含空格,因此仅在与同一表上的连接将不起作用。 问题答案: 这是一个返回MySQL所需内容的解

  • 我在我的一个活动中使用了Android设计支持库中的BottomNavigationView,以及每个导航项的片段。 每次我在栏上选择一个项目时,我都会进行一个片段交易,就像下面的片段一样(为了简洁起见,删除了代码的某些部分): 问题是...底部栏动画变得超级滞后,只有在片段完全加载并显示在屏幕上后才会完成。 这个问题并不完全是新问题,因为在使用导航菜单时也会发生,但至少可以通过使用抽屉布局来解决

  • 我正在建立一个新的Kafka集群,为了测试目的,我创建了一个有1个分区和3个副本的主题。 有什么想法哪种配置或其他东西可以帮助我消费更多的数据吗?? 提前致谢