当前位置: 首页 > 知识库问答 >
问题:

DAG源在emitFromTraverser上返回false,处理器在开始处理之前等待源加载的所有元素

端木宏盛
2023-03-14

给定DAG的这个(simpized版本)

发出5个A类型项的S1源(从带有分区的数据库中读取)本地并行度=1

S2发出150k个B类型项的源代码(迭代器,从DB中批量读取100个,并进行分区)本地并行度=1

写入B1本地并行度=1的WR接收器

注意:为了给过滤器处理器赋予意义:在DAG中,有其他源流入同一个适配器AD,然后使用过滤器处理器进入其他路径。

S1-->AD

FB-->CL(到序数1,优先级为1)

CL-->WR

如果源S2有“很少”项要加载(即15K),则emitFromTraverser从不返回false。

    null
protected void init(Context context) throws Exception {
    super.init(context);
    this.iterator = new BQueryIterator(querySupplier, batchSize);
    this.traverser = Traversers.traverseIterator(this.iterator);
}

public boolean complete() {
    boolean result = emitFromTraverser(this.traverser);
    return result;
}
    null

谢了!

共有1个答案

充阳秋
2023-03-14

您遭受由优先级导致的死锁。您的DAG从AD分支,然后重新加入CL,但有一个优先级。

AD --+---- FA ----+-- CL
      \          /
       +-- FB --+

设置优先级会导致在处理来自高优先级边缘的所有项之前,不处理来自低优先级边缘的项。AD最终会被来自低优先级路径的反压阻塞,而CL不会处理该路径。因此,ad被阻塞,因为它不能发射到低优先级边缘,而CL被阻塞,因为它仍然在等待来自高优先级边缘的项,从而导致死锁。

在您的例子中,您可以通过创建2个ad顶点来解决这个问题,每个顶点处理来自其中一个源的项:

S1 --- AD1 ----+--- CL
              /
S2 --- AD2 --+
 类似资料:
  • 问题内容: 我希望通过外部服务为我的路由创建简单的身份验证检查。 我在路由对象上定义访问要求: 然后,我检查我是否有权参加活动。 实际上,它是可行的- 如果未通过身份验证的用户将其移至登录页面,如果已通过身份验证但该路由仅适用于匿名用户,则将其移至另一个页面,依此类推。 但是-这种重定向实际上是在加载控制器和模板之后发生的! 即使我未经身份验证,它也会导致我的控制器对我的REST API进行一些不

  • 我正在使用Spring Webflow R2DBC将一些数据插入数据库。 要求提供数据- 控制器 服务 道 主要问题是我不知道如何让它等待所有结果返回并添加到最终

  • 我使用和来获取网页请求的所有资源的(包括广告的URL,通常位于嵌套的iframes中)。但是,这可能不适用于和请求中的资源。阅读有关这方面的文档并查看其他答案,似乎可以使用等待元素出现。但是在我的情况下,我必须加载几个网站(几千个站点),可能没有任何公共元素可以等待。我一直在使用,因为秒似乎足够长的时间让所有嵌套的iframes最终出现。我想知道是否有人能证实这是否是正确的方法,是否有任何其他方法

  • 问题内容: 我有一个简单的功能,可以从Firebase加载数据。 当前,即使有要加载的数据,此函数也会始终返回。之所以这样做,是因为它永远不会执行执行完成块,在函数返回之前,它不会在数组中加载数组。我正在寻找一种使函数仅在调用完成块后才返回的方法,但不能将return放在完成块中。 问题答案: (关于此问题的变化经常出现在SO上。我永远找不到一个好的,全面的答案,因此下面尝试提供这样的答案) 你不

  • 我有以下兔子听者: 我需要将listener配置为在它处理一条消息后等待15分钟,然后再接收下一条消息。不需要在此方法中等待。我所需要的只是在处理完一条后不接收任何消息。可以通过来完成,但我不确定这是否是实现这一点的最佳方法。对于这种情况有没有rabbitmq的配置?

  • 问题内容: 我想将当前位置作为请求变量添加到Web视图中加载的URL,以便可以在页面上使用此信息而无需重新加载或额外请求。 我有一个带有shouldOverrideUrlLoading的WebviewClient,但是当使用loadUrl传递请求时,它不会接收请求。我读到可以使用shouldInterceptRequest截取所请求的URL,但是我的API级别为8,所以我真的不想仅为此更改它。 有