当前位置: 首页 > 知识库问答 >
问题:

GCP数据流+Apache波束缓存问题

段干德泽
2023-03-14

我对GCP、Dataflow、Apache Beam、Python和一般的OOP都是新手。我来自函数式javascript领域,对于上下文。

现在,我已经用Apache Beam python sdk构建了一个流管道,并将其部署到GCP的数据流中。管道的源是pubsub订阅,接收器是数据存储。

管道从pubsub订阅中获取消息,根据配置对象+消息内容做出决定,然后根据做出的决定将其放在数据存储中的适当位置。目前这一切都在工作。

def run(argv=None):

    options = PipelineOptions(
        streaming=True,
        save_main_session=True
    )

    configuration = get_configuration() # api call to fetch config

    with beam.Pipeline(options=options) as pipeline:

        # read incoming messages from pubsub
        incoming_messages = (
            pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))

         # make a decision based off of the message + the config
         decision_messages = (
                incoming_messages
                | "Create Decision Messages" >> beam.FlatMap(create_decision_message, configuration)
        )

create_decision_message从流+配置文件中接收传入消息,然后(您猜对了)做出决定。这是相当简单的逻辑。思考“如果消息是苹果,而配置说我们只关心橘子,那么不要对消息做任何操作”。我们需要能够在飞行中更新它,以说“没关系,我们现在突然也关心苹果了”。

我需要找出一种方法让管道知道它需要每15分钟重新获取一次配置文件。我不完全确定用我正在使用的工具做这件事的最好方法是什么。如果是javascript,我会做一些类似的事情:

(请原谅伪代码,不确定这是否真的会运行,但你明白了)


let fetch_time = Date.now()  // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins 
let config = getConfigFromApi() // fetch config right when app starts

function fetchConfig(now){
    if (fetch_time + expiration < now) { 
      // if fetch_time + expiration is less than the current time, we need to re-fetch the config
      config = getConfigFromApi() // assign new value to config var
      fetch_time = now // assign new value to fetch_time var
  } 
    return config 
}

...

const someLaterTime = Date.now() // later in the code, within the pipeline, I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config, or a just-recently-fetched config

我不确定如何将这个概念翻译成python,也不确定是否应该这样做。这是一件合理的事情吗?或者这种类型的行为与我使用的堆栈不一致?我是我的团队中唯一一个从事这方面工作的人,这是一个新的项目,所以没有任何地方的例子说明过去是如何做的。我不确定我是否应该试图解决这个问题,或者我是否应该说“对不起,博斯曼,我们需要另一个解决方案”。

任何帮助都很感激,不管有多小...谢谢!

共有1个答案

松高爽
2023-03-14

我认为有多种方法可以实现您想要实现的目标,最直接的方法可能是通过有状态处理,在有状态DoFn中通过状态记录配置,并设置循环计时器来刷新记录。

您可以在https://beam.apache.org/blog/timery-processing/阅读更多关于有状态处理的信息

更多关于状态和计时器的信息来自beam编程指南:https://beam.apache.org/documentation/programming-guide/#types-of-state。

class MakeDecision(beam.DoFn):
  CONFIG = ReadModifyWriteState('config', coders.StrUtf8Coder())
  REFRESH_TIMER = TimerSpec('output', TimeDomain.REAL_TIME)

  def process(self,
              element,
              config=DoFn.StateParam(CONFIG),
              timer=DoFn.TimerParam(REFRESH_TIMER)):
    valid_config={}
    if config.read():
      valid_config=json.loads(config.read())
    else: # config is None and hasn't been fetched before.
      valid_config=fetch_config() # your own fetch function.
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))
    # Do what ever you need with the config.
    ...
  
  @on_timer(REFRESH_TIMER)
  def refresh_config(self,
                     config=DoFn.StateParam(CONFIG),
                     timer=DoFn.TimerParam(REFRESH_TIMER)):
      valid_config=fetch_config()
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))
with beam.Pipeline(options=options) as pipeline:
    pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))
            | "Make decision" >> beam.ParDo(MakeDecision())

 类似资料:
  • 我没有找到任何文档允许将错误处理应用于此步骤,也没有找到将其重写为DOFN的方法。对此应用错误处理有什么建议吗?谢谢

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。

  • 我们有一个波束/数据流管道(使用数据流SDK 2.0.0-beta3 但是,我们正在设置 参数,我们可以看到所有二进制文件/jar 等都已上传到我们在 参数中指定的存储桶。 但是,Beam/Dataflow 随后会在我们项目的 GCS 中创建以下僵尸存储桶: 为什么会发生这种情况,如果我们清楚地设置参数?

  • 这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr

  • 我们在实验中发现,在DataFlow/Apache Beam管道中设置显式的输出碎片#会导致更差的性能。我们的证据表明,Dataflow在最后秘密地做了另一个GroupBy。我们已经转向让Dataflow自动选择碎片数(shards=0)。但是,对于某些管道,这会导致大量相对较小的输出文件(~15K文件,每个<1MB)。

  • null 我注意到这太慢了--CPU资源只被利用了几%。我怀疑每个节点都得到了一个zip文件,但是工作不是在本地CPU之间分配的--所以每个节点只有一个CPU在工作。我不明白为什么会这样,因为我使用了平面地图。