当前位置: 首页 > 知识库问答 >
问题:

用python实现apache beam中PCollection中几个字段的最大值和最小值

汪晨
2023-03-14

共有1个答案

易自珍
2023-03-14

我决定使用一个自定义的combinefn函数来确定每个键的最小值和最大值。然后,使用COGROUPBYKEY将它们与输入数据联接,并应用所需的映射来规范化值。

"""Normalize PCollection values."""

import logging
import argparse
import sys

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max      
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max


def run(argv=None):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--output',
                      dest='output',
                      required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)

  # create test data
  pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]

  # first run through data to apply custom combineFn and determine min/max per key
  minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())

  # group input data by key and append corresponding min and max 
  merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()

  # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
  normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))

  # write results to output file
  normalized | 'Write results' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

该代码段可以使用Python script_name.py--output output_filename运行。我的测试数据(按键分组)是:

('foo', [1, 5])
('bar', [5, 9, 2])

CombineFn将按键最小值和最大值返回:

('foo', [(1, 5)])
('bar', [(2, 9)])

键操作的JOIN/COGROUP的输出:

('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))

而正常化后:

('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])

这只是一个简单的测试,所以我确信它可以针对所提到的数据量进行优化,但它似乎可以作为一个起点。考虑到可能需要进一步考虑(例如,如果min=max,则避免除以零)

 类似资料:
  • 问题内容: 我正在寻找python中整数的最小值和最大值。例如,在Java中,我们有和。python中是否有类似的东西? 问题答案: Python 3 在Python 3中,此问题不适用。普通int类型是无界的。 但是,你实际上可能正在寻找有关当前解释器的字长的信息,在大多数情况下,该信息将与机器的字长相同。该信息在Python 3中仍以形式提供,这是一个有符号的单词可以表示的最大值。等效地,它是

  • 问题内容: 我有一本这样的字典: 我想获得该字典的5个最大值,并以此存储一个新的字典。要获得最大值,我做了: 也许这是一项容易的任务,但是我坚持了很长时间。请帮忙!!! 问题答案: 你近了 您可以使用 [docs] 对列表进行 排序 ,并采用前五个元素: __ 另请参阅:Python排序方法

  • 问题内容: 我的代码没有给出错误,但是没有显示最小值和最大值。代码是: 我是否需要system.out.println()来显示它,否则返回应该起作用吗? 问题答案: 您正在调用方法,但不使用返回的值。

  • 这是一个非常基本的算法(不能再简单了),但我被难住了。我们有一个元素数组,我们必须确定最小值和最大值。 通常的方法是遍历数组,找出最小值和最大值,即2n比较。 稍微有效的方法是首先对数组的连续元素进行比较,以确定任意两个元素的最大值和最小值(N/2比较)。我们现在有n/2 min和n/2 max元素。现在我们可以在n/2+n/2+n/2(前一步)=3/2*n或1.5n中得到最终的max和min 那

  • 我正在编写一个代码,用户会被问到:“有多少个标记?”然后他们输入他们提到的分数。然后,它打印出最大标记和最小标记。 我不是最擅长编码的,所以我没有方向去寻找循环中的最大值和最小值。我找到了他们能够输入标记的部分,但我不确定如何找到最大值和最小值。我查找了如何进行最大值和最小值,但它通常显示为在数组中查找最大值和最小值,这不是我想要做的。

  • 问题内容: 我想输出二维数组的最大值和最小值。Max可以很好地工作,但是即使在数组中没有零的情况下min也总是输出零。在本例中,我设置为99以防止较小的机会在数组中获得零。继承人完整代码: 问题答案: 由于您在中选择随机值的方式,不会存在小于零的值- 但也无法保证任何值都将恰好为零。但是,您将初始化为零,因为这是数组元素的默认值;没有什么比这更小了,所以答案总是零。 您应该在标记为“查找最小值”的