当前位置: 首页 > 面试题库 >

是否可以在Spark中按组缩放数据?

龚安民
2023-03-14
问题内容

我想用StandardScalerfrom pyspark.mllib.feature import StandardScaler)缩放数据,现在我可以通过将RDD的值传递给transform函数来做到这一点,但是问题是我想保留键。无论如何,我是否通过保留数据密钥来扩展数据?

样本数据集

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf.

进口货

import sys
import os
from collections import OrderedDict
from numpy import array
from math import sqrt
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.mllib.clustering import KMeans
    from pyspark.mllib.feature import StandardScaler
    from pyspark.statcounter import StatCounter

    print ("Successfully imported Spark Modules")
except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

代码部分

    sc = SparkContext(conf=conf)   
    raw_data = sc.textFile(data_file)
    parsed_data = raw_data.map(Parseline)

Parseline 功能:

def Parseline(line):
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))

问题答案:

这不是一个很好的解决方案,但是您可以调整我对类似Scala问题的答案。让我们从一个示例数据开始:

import numpy as np

np.random.seed(323)

keys = ["foo"] * 50 + ["bar"] * 50
values = (
    np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) +
    np.random.rand(100, 10)
)

rdd = sc.parallelize(zip(keys, values))

不幸的MultivariateStatisticalSummary是,它只是围绕JVM模型的包装,并且它并不是真正的Python友好。幸运的是,有了NumPy数组,我们可以使用standardStatCounter通过键来计算统计信息:

from pyspark.statcounter import StatCounter

def compute_stats(rdd):
    return rdd.aggregateByKey(
        StatCounter(), StatCounter.merge, StatCounter.mergeStats
    ).collectAsMap()

最后我们可以map归一化:

def scale(rdd, stats):
    def scale_(kv):
        k, v = kv
        return (v - stats[k].mean()) / stats[k].stdev()
    return rdd.map(scale_)

scaled = scale(rdd, compute_stats(rdd))
scaled.first()

## array([ 1.59879188, -1.66816084,  1.38546532,  1.76122047,  1.48132643,
##    0.01512487,  1.49336769,  0.47765982, -1.04271866,  1.55288814])


 类似资料:
  • 可能重复: 在锚中放置div是否正确? 我想有一个复杂元素的链接(包含图像、段落和其他内容)。我知道它是有效的,但是有

  • 我可以粘贴/拖动 图像显示正确(但过大)。然而,我无法缩放它们。我尝试了这个问题的varius解决方案:在降价中更改图像大小 不幸的是,它们都不适合我。按shift Enter后,单元格只显示输入的文本,图像就不见了。我做错了什么?

  • 问题内容: 我想知道是否可以在Django模型中存储数组? 我问这个问题是因为我需要在一个字段中存储一个数组(例如[1,2,3]),然后能够搜索特定的数组并与之匹配,或者通过它的可能组合来进行匹配。 我当时正在考虑将该数组存储为s中的字符串,然后在需要搜索内容时,将值(通过过滤其他模型获得)与’[‘,’]’和’,’连接在一起,然后使用带有生成字符串。问题在于,我将必须生成每种可能的组合,然后逐个过

  • 我制作了一个类 在另一个类中 正如您所看到的,我创建了这个数组,并将UImages与txt和txt2一起放置在其中。简单地说,我要向用户显示一个图像,然后输入一个描述图像的输入,然后检查它是否匹配txt和txt2。在运行模拟器时,我得到以下错误: ***由于未捕获异常“nSunKnownKeyException”而终止应用程序,原因:“[ SetValue:ForUndefinedKey:]:对于

  • 一些上下文:我有多个cron作业每天、每周、每小时运行,其中一些需要很大的处理能力。我想向这些容器cron pod添加请求和限制,以尝试启用垂直扩展,并确保所分配的节点在初始化时具有足够的容量。这将使我不必在任何时候都有多个大型节点可用,也可以让我轻松地修改并行运行的cron数量。我想尽量避免定时缩放,因为cron作业的处理时间会随着应用程序的增长而增加。 编辑 - 附加信息 : 目前我正在使用数

  • 问题内容: 在Java中是否可以覆盖Objects数组的toString? 例如,假设我创建了一个简单的类(由于这是一个普遍的问题,所以实际上是哪个类并不重要)。客户端一旦创建了一个数组并使用了它,是否有可能不打印该数组的地址,而是打印一个定制的? PS:当然,我不能只在类中重写它,因为它与单个实例有关。 问题答案: 不会。您当然可以创建一个静态方法User.toString(User []),但