当前位置: 首页 > 知识库问答 >
问题:

PySpark UDF生成的DF无法显示“值错误:”MyColumn“名称不在列表中”

郭德惠
2023-03-14
data.show()
+---------------+--------------------+--------------------+
|       features|                meta|           telemetry|
+---------------+--------------------+--------------------+
|   [seattle, 3]|[seattle, 3, 5344...|[[47, 1, 27, 92, ...|
|     [miami, 1]|[miami, 1, 236881...|[[31, 84, 24, 67,...|
|     [miami, 3]|[miami, 3, 02f4ca...|[[84, 5, 4, 93, 2...|
|   [seattle, 3]|[seattle, 3, ec48...|[[43, 16, 94, 93,...|
|   [seattle, 1]|[seattle, 1, 7d19...|[[70, 22, 45, 74,...|
|[kitty hawk, 3]|[kitty hawk, 3, d...|[[46, 15, 56, 94,...|

您可以从以下链接下载生成的.json示例:https://aiaccqualitytelcapture.blob.core.windows.net/streamanalytics/2019/08/21/10/0_43cbc7b0c9e845a187ce182b46eb4a3a_1.json?

特别是,您可以看到其中每一个中的实际数据实际上都是一个字典:我们感兴趣的“features”列的形式如下:{“factory_id”:“西雅图”,“line_id”:“3”}

我试图通过经典的函数方法将特性中的数据编码为one_hot。

见下文:

def one_hot(value, categories_list):
  num_cats = len(categories_list)
  one_hot = np.eye(num_cats)[categories_list.index(value)]
  return one_hot

def one_hot_features(row, feature_keys, u_features):
  """
  feature_keys must be sorted.
  """
  cur_key = feature_keys[0]
  vector = one_hot(row["features"][cur_key], u_features[cur_key])
  for i in range(1, len(feature_keys)):
    cur_key = feature_keys[i]
    n_vector = one_hot(row["features"][cur_key], u_features[cur_key])
    vector = np.concatenate((vector,  n_vector), axis=None)
  return vector

本例中的feature_keys和u_features包含以下数据

feature_keys = ['factory_id', 'line_id']

u_features = {'factory_id': ['kitty hawk', 'miami', 'nags head', 'seattle'], 'line_id': ['1', '2', '3']}
def calc_onehot_udf(feature_keys, u_features):
  return udf(lambda x: one_hot_features(x, feature_keys, u_features))

n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)( col("features") ))

n_data.show()

理想的输出是一个新的dataframe,其列为:“hot_features”,其中包含features列中的1维一个hot encoded数组。

共有1个答案

苏边浩
2023-03-14

结果发现有几个关键问题:

  1. 您应该或必须在UDF中指定返回类型。在本例中,它是ArrayType(FloatType())
  2. 没有从one_hot_features返回nd数组,而是调用了vectors.tolist()
  3. 传递col(“features”)逐行发送features列内的实际值,而不是实际行数据;因此,像最初那样调用row[“features”]是不正确的,因为没有访问器,因为我已经有了该行的值。因此,我将第一个参数重命名为“features_val”,而不是“row”,以更好地反映预期的输入。

下面是one_hot_features的新代码。

def one_hot_features(features_val, feature_keys, u_features):
  cur_key = feature_keys[0]
  vector = one_hot(features_val[cur_key], u_features[cur_key])
  for i in range(1, len(feature_keys)):
    cur_key = feature_keys[i]
    n_vector = one_hot(features_val[cur_key], u_features[cur_key])
    vector = np.concatenate((vector,  n_vector), axis=None)
  return vector.tolist()
def calc_onehot_udf(feature_keys, u_features):
  return udf(lambda x: one_hot_features(x, feature_keys, u_features), 
ArrayType(FloatType()))

n_data = data.withColumn("hot_feature", calc_onehot_udf(feature_keys, 
u_features)(col("features")))
n_data.show()
 类似资料:
  • 问题内容: 这是我的代码: 但是,当我运行代码时,该表不显示列名。我在这一行设置1时,它仅显示一个空行: 请告诉我我的代码在哪里错误?谢谢! 编辑:有人问了同样的问题(JTable中没有标题),但是答案是将Jtable添加到JScrollPane,这对@@无济于事。编辑:嗨, 丹 ,我已经添加了完整的Constructor,这是方法的代码: 谢谢! 问题答案: 您使用了错误的方式。为使其正常工作,

  • 突然间得到了所有这些错误所有构建错误“在Android Studio中存在多个错误的清单合并失败” 当我没有得到这个建议的时候,我就用android studio提出了替换建议 "清单合并失败:属性application@appComponentFactoryvalue=(android.support.v4.app.CoreComponentFactory)from[com.android.su

  • 我创建了一个debezium连接器到docker MySQL容器。我尝试为消息设置过滤器: 我看到这个: { "connector": { "state": "RUNNING "," worker_id": "172.21.0.13:8083" }," name": "my_connector "," tasks": [ { "id": 0," state": "FAILED "," trace

  • 问题内容: 桌子: 我想生成一个子弹名称列: 可以通过SQL实际完成吗?还是我需要使用其他语言编写脚本? 编辑:我正在使用此函数在PHP中生成段塞: 到目前为止,我的SQL技能还很基本。 如何在SQL中遍历每一行并进行设置? 问题答案: 您当然可以使用MySQL进行字符串替换。在官方文件列出相当多的字符串函数您可能会发现有用的。 我还浏览了有关在MySQL中使用正则表达式的博客文章。 更新:我提到

  • 我有一个大约有100列的xlsx文件。当我使用函数时,它会显示一些第一列和最后一列,但不是全部列。我想通过for循环显示(打印)所有列名(标题)。我该怎么做呢?

  • 问题内容: 我有一个 包含4个不同的。对于每一个孤单一个重要性的多数民众赞成。我想回到的地方是显示。因此,对于以下内容,我想在标记值为2时返回名称。 输出: 原来如此 我通过这样做 然后更改行。但这不是很有效。 我也希望将输出从 问题答案: 用途: 要么, 或者,作为列表: …或系列: