当前位置: 首页 > 知识库问答 >
问题:

Apache Beam中的窗口连接

印振国
2023-03-14

我对Apache Beam很陌生,并实现了我的第一个管道。

但现在我有点困惑了,我不知道如何将窗口和连接结合起来。

问题定义:

我有两个数据流,一个包含用户的页面视图,另一个包含用户的请求。它们共享描述用户会话的密钥session\u id,但每个都有其他附加数据。

目标是在请求发生之前计算会话中的页面浏览量。这意味着,我希望有一个包含每个请求的数据流,以及请求之前的页面浏览量。只要有过去5分钟的浏览量就足够了。

我所尝试的

为了加载请求,我使用这个片段,它从pubsub订阅加载请求,然后提取session_id作为密钥。最后,我应用一个窗口,当收到每个请求时直接发出它。

    requests = (p
               | 'Read Requests' >> (
                    beam.io.ReadFromPubSub(subscription=request_sub)
                    | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                    | 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
                    | 'Window'         >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.DISCARDING
                    )
                )
            )

类似地,这个片段加载页面视图,这将应用一个滑动窗口,每当页面视图进入时,该窗口就会累积发出。

pageviews = (p
               | 'Read Pageviews' >> (
                  beam.io.ReadFromPubSub(subscription=pageview_sub)
                  | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                  | 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
                  | 'Window'         >> beam.WindowInto(
                            windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                  )
               )
            )

要应用联接,我尝试了

combined = (
        {
            'requests': requests,
            'pageviews': pageviews
        }
    | 'Merge' >> beam.CoGroupByKey()
    | 'Print' >> beam.Map(print)
)

当我运行这个管道时,合并行中从来没有包含请求和页面浏览量的行,只有其中一个在那里。

我的想法是在请求之前过滤掉页面浏览量,在cogroupby之后计算它们。我需要做什么?我想我的问题是窗口和触发策略。

同样重要的是,处理请求的延迟要低,可能会丢弃延迟的页面视图。

共有1个答案

郎飞航
2023-03-14

我自己找到了一个解决方案,如果有人感兴趣,可以这样做:

诀窍是使用束组合这两个流。扁平化操作并使用状态DoFn来计算一个请求之前的页面浏览量。每个流都包含json字典。我通过使用{'请求':请求}{'页面浏览':页面浏览}作为周围块嵌入它们,这样我就可以在状态DoFn中将不同的事件分开。我还计算了第一个页面浏览时间戳和第一次页面浏览后的秒数。流必须使用session_id作为键,这样状态DoFn只接收一个会话的所有事件。

首先,这是管道代码:

# Beam pipeline, that extends requests by number of pageviews before request in that session
with beam.Pipeline(options=options) as p:
    # The stream of requests
    requests = (
          'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=request_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda request: (request['session_id'], request))
        | 'Add type of event'               >> beam.Map(lambda r: (r[0], ('request', r[1])))
    )

    # The stream of pageviews
    pageviews = (
          'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=pageview_sub)
        | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
        | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
        | 'Use Session ID as stream key'    >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
        | 'Add type of event'               >> beam.Map(lambda p: (p[0], ('pageview', p[1])))
    )

    # Combine the streams and apply Stateful DoFn
    combined = (
        (
            p | ('Prepare requests stream' >> requests),
            p | ('Prepare pageviews stream' >> pageviews)
        )
        | 'Combine event streams'       >> beam.Flatten()
        | 'Global Window'               >> beam.WindowInto(windowfn=window.GlobalWindows(),
                                                            trigger=trigger.AfterCount(1),
                                                            accumulation_mode=trigger.AccumulationMode.DISCARDING)
        | 'Stateful DoFn'               >> beam.ParDo(CountPageviews())
        | 'Compute processing delay'    >> beam.ParDo(LogTimeDelay())
        | 'Format for BigQuery output'  >> beam.ParDo(FormatForOutputDoFn())
    )

    # Write to BigQuery.
    combined | 'Write' >> beam.io.WriteToBigQuery(
        requests_extended_table,
        schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

有趣的部分是使用束组合两个流。扁平化并应用有状态的DoFnCountPageviews()

以下是所用自定义DOFN的代码:

# This DoFn just loads a json message
class ExtractJSON(beam.DoFn):
  def process(self, element):
    import json

    yield json.loads(element)

# This DoFn adds the event timestamp of messages into it json elements for further processing
class AssignTimestampFn(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime

    timestamped_element = element
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
    timestamped_element['timestamp_utc'] = timestamp_utc
    timestamped_element['timestamp'] = timestamp
    yield timestamped_element

# This class is a stateful dofn
# Input elements should be of form (session_id, {'event_type' : event}
# Where events can be requests or pageviews
# It computes on a per session basis the number of pageviews and the first pageview timestamp
# in its internal state
# Whenever a request comes in, it appends the internal state to the request and emits
# a extended request
# Whenever a pageview comes in, the internal state is updated but nothing is emitted
class CountPageviewsStateful(beam.DoFn):
  # The internal states
  NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum)
  FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder())

  def process(self,
              element,
              num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
              first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
              ):
    import datetime

    # Extract element
    session_id = element[0]
    event_type, event = element[1]

    # Process different event types
    # Depending on event type, different actions are done
    if event_type == 'request':
        # This is a request
        request = event

        # First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated
        first_pageview = first_pageview_state.read()
        if first_pageview is not None:
            seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview)

            first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview))
            first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
        else:
            seconds_since_first_pageview = -1
            first_pageview_timestamp = None

        # The calculated data is appended to the request
        request['num_pageviews'] = num_pageviews_state.read()
        request['first_pageview_timestamp'] = first_pageview_timestamp
        request['seconds_since_first_pageview'] = seconds_since_first_pageview
        
        # The pageview counter is reset
        num_pageviews_state.clear()
        
        # The request is returned
        yield (session_id, request)
    elif event_type == 'pageview':
        # This is a pageview
        pageview = event

        # Update first pageview state
        first_pageview = first_pageview_state.read()
        if first_pageview is None:
            first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
        elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
            first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))

        # Increase number of pageviews
        num_pageviews_state.add(1)
          
        # Do not return anything, pageviews are not further processed

# This DoFn logs the delay between the event time and the processing time
class LogTimeDelay(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    import datetime
    import logging

    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()

    logging.warning('Delayed by %s seconds', seconds_delay)

    yield element

这似乎奏效了,让我在direct runner上平均延迟约1-2秒。在云数据流上,平均延迟约为0.5-1秒。总而言之,这似乎解决了问题的定义

不过,还有一些悬而未决的问题:

  • 我使用的是全局窗口,这意味着就我而言,内部状态将永远保持不变。也许会话窗口是正确的方式:当x秒钟内没有页面浏览/请求时,窗口关闭,内部状态免费
  • 处理延迟有点高,但可能我需要稍微调整pubsub部分
  • 我不知道这个解决方案比标准的beam方法增加了多少开销或内存消耗。我也没有测试高工作负载和并行化
 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?

  • 我写了一个小GUI程序与python在tkinter的窗口。我的窗口必须在全屏游戏窗口的前面。 此刻,我用这句话: 它适用于普通窗口(浏览器,浏览器,...),但如果我启动游戏到全屏模式,我的窗口隐藏在游戏后面。 为什么会发生这种情况?调用游戏可能类似于覆盖我的属性的? 我的问题还有别的解决办法吗?也许可以告诉windows,我的窗口应该在特定窗口(游戏窗口)的前面?

  • 我正在尝试加入Flink中的两种类型(比如A和B)。我想确认我的理解是否正确。事件的某些属性- 事件A立即流入flink,延迟几分钟(5-10分钟) 事件B以15-30分钟的轻微延迟流动 事件a和事件B之间存在1:1连接 我已将事件A的数据流配置为10分钟的BoundedAutofordernessTimestampExtractor,将事件B的数据流配置为30分钟。稍后,我使用表API进行时间窗

  • 问题内容: 我正在探索Hive中的窗口功能,并且能够理解所有UDF的功能。虽然,我无法理解我们与其他功能配合使用的分区和顺序。以下是与我计划构建的查询非常相似的结构。 只是试图了解两个关键字都涉及的后台过程。 感谢帮助:) 问题答案: 分析函数为数据集中每个分区的每一行分配一个等级。 子句确定行的分布方式(如果是配置单元,则在缩减程序之间)。 确定行在分区中的排序方式。 第一阶段由分配 ,数据集中

  • 窗口的创建与控制 进程:主进程​ 1 //在主进程中. 2 const {BrowserWindow} = require('electron') 3 ​ 4 //或者从渲染器进程中使用 `remote`. 5 // const {BrowserWindow} = require('electron').remote 6 ​ 7 let win = new BrowserWindow({width