更新:似乎我的错误可能是因为我如何安装Spark和/或Hive。在Databricks(托管)笔记本中使用窗口函数似乎非常简单。我得想办法在本地设置这个。
我有一个Spark DataFrame,我需要在上面使用一个窗口函数。*我试着按照这里的说明操作,但我遇到了一些问题。
设置我的环境:
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
test_df = ts_to_df(test_ts)
test_df.show()
+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
在这里,我不知道我在做什么,一切都开始出错:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
这就给了我这个错误:
PY4JJavaError:调用O59.Select时出错。:org.apache.spark.sql.analysisException:无法解析窗口函数“lead”。注意,使用窗口函数目前需要一个HiveContext;
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
*我需要知道我的数据是否有漏洞。我有“日期”列,对于按日期排序的每一行,我想知道下一行是什么,如果有遗漏的天数或错误的数据,那么我想使用该行上一天的数据。如果你知道一个更好的方法,让我知道。但我还是想知道如何让这些窗口函数工作。
这是一个古老的问题,因此没有实际意义,因为您可能已经使用了新版本的Spark。我自己在运行Spark2.0,所以这可能是作弊。
但是FWIW:2个可能的问题。在第一个示例中,我认为.todf()
可能默认为SQLContext,因为您都调用了。在第二种情况下,当您进行重构时,您是否正在函数内部调用hivecontext?
如果我重构第二个ts_to_df
函数,使其在函数外部调用hivecontext,那么一切都很好。
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
return data
data = ts_to_df(test_ts)
test_rdd = sc.parallelize(data).map(lambda x: Row(**x))
test_df = hiveContext.createDataFrame(test_rdd)
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
+----------+
| Next_Date|
+----------+
|2005-08-26|
|2005-08-29|
|2005-08-30|
|2005-08-31|
|2005-09-01|
|2005-09-02|
.....
test1 a=new test 1(3,“b”);
我试图获得对创建java的GoogleMap对象的引用,但我不知道如何获得这个引用。
Spring4.1实例化了Jackson实例。我有理由将该实例放入我的一个控制器中:该控制器使用Jackson进行一些小的JSON解析,但是它使用的应该是Spring本身使用的同一个实例。我该怎么做呢? 注意,我不是在问如何自定义配置Spring使用的;我对默认值很满意。我只想检索Spring out使用的实例,这样我就可以在自己的代码中重用现有的实例。
问题内容: 我正在尝试运行JMeter脚本,但登录失败。原因是,密码是使用RSA算法(即使用javascript)进行加密的。因此在录制时保存的密码将无法使用,并且由于使用JMeter不支持的javascript对其进行加密,因此无法获得加密密码的动态值。由于运行时使用javascript,因此我无法使用正则表达式查找响应数据,因为这不是响应的一部分。 我试图登录到Tableau报表服务器。 问题
问题内容: 我试图在这个问题中使用答案,但不能使其起作用:如何使用Python的ElementTree创建“虚拟根”? 这是我的代码: 当我打开生成的“ myfile.tmx”文件时,它包含以下内容: 我想念什么?还是有更好的工具? 问题答案: 您可以使用lxml及其功能: =>
问题内容: 我有一个SQL查询,该查询对3-4个表执行JOIN操作以获取数据。现在,我们正转向elasticsearch以获得更好的性能。如何使用elasticsearch复制相同的JOIN查询?我已经阅读了有关父/子文档的信息,但是我的数据没有任何严格的父/子类型的数据。 问题答案: Elasticsearch不支持JOIN,这首先是NoSQL技术的全部目的。有多种方法可以使用父/子关系(如您所