我正在学习dask,并在这里和那里得到这个错误:
InvalidIndexError: Reindexing only valid with uniquely valued Index objects
有一个预处理的dask df,我用它做了很多操作,但是一些简单的操作会引发这个错误。
我尝试重新设置索引,但似乎没有帮助。
问题:对于可能出现的问题,是否有高lvl的答案?我能在哪里读到它?为什么经常发生这种情况?谷歌搜索没有帮助。
例如,这里有一个奇怪的例子:我在每次操作后测试df统计数据,以可能看到任何可疑的东西。
df = load_data()
df.shape[0].compute(), df.npartitions
#ok
df = prepare_target(df)
df.shape[0].compute(), df.npartitions
#ok
x_train, x_test, y_train, y_test = dask_tts(df.drop('target', 1), df['target'], random_state=1)
#ok
x_train['target'] = y_train
x_test['target'] = y_test
#ok
x_train.shape[0].compute(), x_train.npartitions
x_test.shape[0].compute(), x_test.npartitions
#ok
x_train.index.nunique().compute()
x_test.index.nunique().compute()
#ok
train, smooth_dict = smoothed_likelyhood(x_train) # returns df and dict
train.shape[0].compute()
#ok
test, _ = smoothed_likelyhood(x_test)
test.shape[0].compute()
#ok
train.index.nunique().compute()
#ok
test.index.nunique().compute() # after this line - error
# InvalidIndexError: Reindexing only valid with uniquely valued Index objects
要指出的是,只有测试才会抛出错误
在这里,我试图重现它,但它的工作原理是:
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
def smoothed_likelyhood(df, alpha=1): # works with dask df
global_mean = df['target'].mean()
smooth_dict = {}
final_df = df.copy()
for c in [c for c in df.columns if c!='target']:
n_rows = df[c].value_counts()
all_means = df.groupby(by=c)['target'].mean()
temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))
final_df[c] = df[c].map(temp_result)
smooth_dict[c] = temp_result.compute().to_dict()
return final_df, smooth_dict
# TOY EXAMPLE
test = pd.DataFrame({'a':['mos', 'mos', 'london', 'dubai', 'ny', 'mos', 'london', 'dubai', 'shanghai', 'dubai', 'mos', 'london', 'dubai', 'dubai'],
'b':['big', 'big', 'big', 'med', 'med', 'med', 'small', 'small', 'small', 'small', 'big', 'med', 'small', 'med'],
'target':[1,0,0,1,0,1,1,0,1,1,1,0,0,0]})
df = dd.from_pandas(test, npartitions=2)
# -----------------------------------------------
print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], test_size=0.3, shuffle=True)
x_train['target'] = y_train
x_test['target'] = y_test
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())
train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)
print(train.shape[0].compute(), train.index.nunique().compute())
print(test.shape[0].compute(), test.index.nunique().compute())
# train.compute()
print(train['target'].mean().compute())
print(test['target'].mean().compute())
这部分工作正常,但当我尝试对真实数据执行相同操作时:
%%time
df = load_data(stage='prep_2', frac=config.FRAC, verbose=False) # loading many parquet files
df = prepare_target(df) # some small changes to df
# -----------------------------------------------
print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], random_state=1)
x_train['target'] = y_train
x_test['target'] = y_test
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())
train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())
这些是打印的结果:
npartitions: 10
1476758 164300
164018 106750
1476758 164300
164018 106750
这2项中的任何一项都会抛出上面提到的索引错误:
train['target'].mean().compute()
test['target'].mean().compute()
如果你有什么想法,我可能会进一步调查,除非我知道去哪里找。
谢谢
问题出现在平滑函数中:
final_df[c] = df[c].map(temp_result)
smooth_dict[c] = temp_result.compute().to_dict()
和DASK系列有关temp_result
通常,对于pandas,如果我想用df中的其他值替换一些值(映射它们),我会这样做:
dict = {'a':{'a1':1, 'a2':2...}, 'b':{'b1':1, 'b2':2...}}
for c in df.columns: # df.columns = ['a', 'b', ...]
df[c] = df[c].map(dict[c].get) # swap it inplace
它超快
但在这里,在dask,我无法将temp_结果转换为dict(仅在计算之后)。我尝试了一些方法,令人惊讶的是,简单的df.map(series)
成功了。正如在映射系列中一样,不是dict.Ok,但这引入了带有无效索引的超级狡猾的问题。
解决方案是使用计算的dict(平滑dict)作为熊猫,而不是系列(temp_结果),并用它进行映射。我认为在dask中过早地计算某些东西是次优的,但我需要结果,而且在这里,当我调用它时,它只计算一次。
以下是一个工作函数:
def smoothed_likelyhood(df, alpha=1):
"""
Discription:
preprocess based on mean values of positive events in each category
Args:
df: [df] - df to encode
alpha: [int/float] - regularization param. We can find it with CV
Returns:
encoded df, dict to encode user during prediction
"""
global_mean = df['target'].mean()
smooth_dict = {}
for c in [c for c in df.columns if c!='target']:
n_rows = df[c].value_counts()
all_means = df.groupby(by=c)['target'].mean()
temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))
smooth_dict[c] = temp_result.compute().to_dict()
df[c] = df[c].map(smooth_dict[c])
return df, smooth_dict
尽管如此,它需要25分钟来咀嚼800k*90的数据帧,这只是我数据的5%。如果有人能建议如何在dask中加速这个功能,那将不胜感激。
我试图训练一个数据集来预测输入的文本是否来自科幻小说。我对python比较陌生,所以我不知道我到底做错了什么。 代码: 错误:回溯(最近一次呼叫上次): 文件“”,第1行,在main()中 文件"C:/用户/用户/桌面/分配/SQL /Python/DA项目/class17.py",第36行,在主model_novels() modelêselectedModel中的文件“C:/Users/use
null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?
表元数据 下面这些方法用于获取表信息: 列出数据库的所有表 $this->db->list_tables(); 该方法返回一个包含你当前连接的数据库的所有表名称的数组。例如: $tables = $this->db->list_tables(); foreach ($tables as $table) { echo $table; } 检测表是否存在 $this->db->table_
我需要根据一些共享的键列将许多数据帧连接在一起。对于键值RDD,可以指定一个分区程序,以便具有相同键的数据点被洗牌到相同的执行器,因此连接更有效(如果在之前有与洗牌相关的操作)。可以在火花数据帧或数据集上做同样的事情吗?
问题内容: 我想获取基于条件选择的数据帧行数。我尝试了以下代码。 输出: 输出显示数据帧中每一列的计数。相反,我需要获得满足以上所有条件的单一计数?这该怎么做?如果您需要有关我的数据框的更多说明,请告诉我。 问题答案: 您要的是所有条件都为真的条件,所以答案是len,除非我误解了您的要求
我有一个如下所示的数据帧: 我需要提取lat=30.75和lon 76.25的行,对于我使用的行: 但这表明了这个错误:
数据与数据结构 1. 数据 1.1 数据(data) 数据:是信息的载体,是描述客观事物的数、字符,以及所有能输入到计算机中并被计算机程序识别和处理的符号的集合。 1.2 数据大致分的两类:(1)数值性数据;(2)非数值数据 数值性数据:主要包括整数、浮点数、复数、双精度数等,主要用于工程和科学计算,以及商业事务处理。 非数值数据:主要包括字符和字符串,以及文字、图形、图像、语音等数据。 1.3