当前位置: 首页 > 知识库问答 >
问题:

在Apache Beam中为每个键应用有状态DoFn

商琛
2023-03-14

共有1个答案

韩博简
2023-03-14

我想你想要的是一把CombinePerkey。它只对值或每个键应用一个组合。

此外,在使用组合时考虑Reduce阶段也很重要。

希望这个例子对你有帮助。(添加了打印,这样您就可以看到Reduce阶段以及为什么要使用ifs)

with beam.Pipeline(options=pipeline_options) as p:

    keyed_elements = [
        (47001, {"user_id": 1, "fake_key":"fake_value"}),
        (47001, {"user_id": 2, "fake_key": "fake_value"}),
        (47002, {"user_id": 3, "fake_key": "fake_value"}),
        (47002, {"user_id": 4, "fake_key": "fake_value"}),
        (47003, {"user_id": 5, "fake_key": "fake_value"}),
        (47001, {"user_id": 6, "fake_key": "fake_value"}),
        (47001, {"user_id": 7, "fake_key": "fake_value"}),
        (47001, {"user_id": 8, "fake_key": "fake_value"}),
        (47001, {"user_id": 9, "fake_key": "fake_value"}),
        (47001, {"user_id": 10, "fake_key": "fake_value"}),
        (47001, {"user_id": 11, "fake_key": "fake_value"}),
    ]

    def group_users(elements_values):

        #to test paralellism in reduce phase
        print(f"ELEMENT: {elements_values}")


        final_output = []
        for value in elements_values:
            if isinstance(value, dict):
                final_output.append(value['user_id'])
            elif isinstance(value, list):
                final_output += value
            else:
                pass

        return final_output

    (p | Create(keyed_elements)
       | beam.CombinePerKey(group_users)
       | Map(print)
     )
 类似资料:
  • 我写了以下代码: 我只使用了一个useState在一个对象中存储“name”、“link”、“error”等属性。因为我想将FormObj和ValidateLink的逻辑保持在一起。所有三个属性仅使用一个useEffect。因此,我认为最好将所有三个属性都保留在useState中,而不是创建3个不同的useState。 但是我的经理和技术架构师告诉我要为每个属性创建3个useState,一个use

  • 我有一个哈希地图如下 现在我想用这些值填充一个对象。如何在 for 循环中调用 setter,而不是手动调用每个方法。假设我想用这些属性填充一个对象 ob。

  • 我们正在尝试使用Drool作为我们的规则引擎服务。我们到目前为止所做的如下 部署的工作台7.2.final 已部署的KIE服务器7.2.0。final 配置了一些数据对象、规则,将更改部署到KIE服务器,我们可以使用rest API执行规则 无状态会话满足了我们的大部分需求(给出一组数据,执行规则并返回数据,仅此而已)。但是使用无状态时,我们必须牺牲Drools有状态会话提供的许多重要特性。 我们

  • 我使用@ControllerAdvision在Spring Boot控制器中捕获异常。在@ExceptionHandler中,我可以手动指定数量有限的异常及其状态代码。 我的问题是如何处理所有其他异常及其状态?

  • 从输入字段中,我将值作为参数发送给设置状态的函数。我有多个输入字段,所以希望使用它们的名称(等于它们的状态键),然后使用相同的函数,并将键和值传递给设置状态的函数。 这是我的代码。