我决定使用一个自定义的combinefn
函数来确定每个键的最小值和最大值。然后,使用COGROUPBYKEY
将它们与输入数据联接,并应用所需的映射来规范化值。
"""Normalize PCollection values."""
import logging
import argparse
import sys
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
# initialize min and max values (I assumed int type)
def create_accumulator(self):
return (sys.maxint, 0)
# update if current value is a new min or max
def add_input(self, min_max, input):
(current_min, current_max) = min_max
return min(current_min, input), max(current_max, input)
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, min_max):
return min_max
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
# create test data
pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]
# first run through data to apply custom combineFn and determine min/max per key
minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())
# group input data by key and append corresponding min and max
merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()
# apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))
# write results to output file
normalized | 'Write results' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
该代码段可以使用Python script_name.py--output output_filename
运行。我的测试数据(按键分组)是:
('foo', [1, 5])
('bar', [5, 9, 2])
CombineFn将按键最小值和最大值返回:
('foo', [(1, 5)])
('bar', [(2, 9)])
键操作的JOIN/COGROUP的输出:
('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))
而正常化后:
('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])
这只是一个简单的测试,所以我确信它可以针对所提到的数据量进行优化,但它似乎可以作为一个起点。考虑到可能需要进一步考虑(例如,如果min=max,则避免除以零)
问题内容: 我正在寻找python中整数的最小值和最大值。例如,在Java中,我们有和。python中是否有类似的东西? 问题答案: Python 3 在Python 3中,此问题不适用。普通int类型是无界的。 但是,你实际上可能正在寻找有关当前解释器的字长的信息,在大多数情况下,该信息将与机器的字长相同。该信息在Python 3中仍以形式提供,这是一个有符号的单词可以表示的最大值。等效地,它是
问题内容: 我有一本这样的字典: 我想获得该字典的5个最大值,并以此存储一个新的字典。要获得最大值,我做了: 也许这是一项容易的任务,但是我坚持了很长时间。请帮忙!!! 问题答案: 你近了 您可以使用 [docs] 对列表进行 排序 ,并采用前五个元素: __ 另请参阅:Python排序方法
问题内容: 我的代码没有给出错误,但是没有显示最小值和最大值。代码是: 我是否需要system.out.println()来显示它,否则返回应该起作用吗? 问题答案: 您正在调用方法,但不使用返回的值。
这是一个非常基本的算法(不能再简单了),但我被难住了。我们有一个元素数组,我们必须确定最小值和最大值。 通常的方法是遍历数组,找出最小值和最大值,即2n比较。 稍微有效的方法是首先对数组的连续元素进行比较,以确定任意两个元素的最大值和最小值(N/2比较)。我们现在有n/2 min和n/2 max元素。现在我们可以在n/2+n/2+n/2(前一步)=3/2*n或1.5n中得到最终的max和min 那
我正在编写一个代码,用户会被问到:“有多少个标记?”然后他们输入他们提到的分数。然后,它打印出最大标记和最小标记。 我不是最擅长编码的,所以我没有方向去寻找循环中的最大值和最小值。我找到了他们能够输入标记的部分,但我不确定如何找到最大值和最小值。我查找了如何进行最大值和最小值,但它通常显示为在数组中查找最大值和最小值,这不是我想要做的。
问题内容: 我想输出二维数组的最大值和最小值。Max可以很好地工作,但是即使在数组中没有零的情况下min也总是输出零。在本例中,我设置为99以防止较小的机会在数组中获得零。继承人完整代码: 问题答案: 由于您在中选择随机值的方式,不会存在小于零的值- 但也无法保证任何值都将恰好为零。但是,您将初始化为零,因为这是数组元素的默认值;没有什么比这更小了,所以答案总是零。 您应该在标记为“查找最小值”的