当前位置: 首页 > 知识库问答 >
问题:

Apache Beam未从订阅中删除无效元素

公良飞尘
2023-03-14

刚意识到我的管道是错误的,当涉及到错误事件时,它们会不断被处理,并且从未从订阅中删除。

基本上,我有一个简单的管道,其中包含一个触发器,可以将这些事件提取到文件中。

在其中一个阶段中,它会处理通过PubSub接收到的消息的有效负载,并将其转发到下一个阶段。然而,在某些情况下,这将失败。

        pipeline
        .apply("Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getSubscription()))
        .apply("Map to MyClass",
            ParDo.of(new PubSubMessageToMyClass())) // Exception thrown in this stage.
        .apply("Apply Timestamps", WithTimestamps.of(new SetTimestampFn()).withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
        ...
        );

当错误发生时,我会在管道中一遍又一遍地看到同一事件,就像它永远不会结束处理一样。

是否有任何方法可以明确告诉Apache Beam使给定消息无效并防止进一步失败的处理?

共有1个答案

越星晖
2023-03-14

数据流处理任意捆绑包中的元素,并在该捆绑包中的任何元素出现错误时重试整个捆绑包。在批处理模式下运行时,包含失败项的捆绑包将重试4次。当单个捆绑包发生4次故障时,管道将完全失效。在流模式下运行时,包含失败项目的捆绑包将无限期重试,这可能会导致管道永久暂停。

考虑通过添加异常处理程序来防止代码中的错误。例如,如果您想删除在ParDo中完成的某些自定义输入验证失败的元素,请在ParDo中使用try/catch块来处理异常并删除元素。

 类似资料:
  • 刚开始玩推送通知,我设法处理了所有的订阅过程,我正在数据库中保存endpoint和密钥。我的问题是,如果有的话,我应该遵循什么策略来删除数据库中的旧字幕详细信息?。所以,如果有人允许通知,他们撤销了权限,我怎么知道是谁从数据库中删除了详细信息?。因为如果用户取消订阅,我只会从pushManager获得空订阅。

  • 我登录到luis.ai,它显示下面的错误信息。我的订阅上有所有者角色或参与者角色。如何修复错误?谢谢。 401:由于订阅密钥无效或API终结点错误,访问被拒绝。确保为活动订阅提供有效密钥,并为您的资源使用正确的区域APIendpoint。

  • 问题内容: 以下列表具有一些重复的子列表,其子元素的顺序不同: 如何删除重复项并保留看到的第一个实例,以获得: 我尝试过了: 但是,我不知道这是否是处理大型列表的最快方法,而且我的尝试没有按预期进行。关于如何有效删除它们的任何想法? 问题答案: 这个有点棘手。您想从冻结的计数器中删除字典,但是计数器在Python中不可哈希。为了使渐进复杂度稍有下降,可以使用已排序的元组代替冻结计数器: 单线的相同

  • 问题内容: Python具有实现堆数据结构的模块,并且它支持一些基本操作(推送,弹出)。 如何从O(log n)中的堆中删除第i个元素?甚至可以使用还是必须使用另一个模块? 请注意,文档底部有一个示例:http : //docs.python.org/library/heapq.html ,它建议了一种可能的方法-这不是我想要的。我希望删除元素,而不仅仅是标记为已删除。 问题答案: 您可以很容易地

  • 问题内容: 当我尝试使用迭代器从CopyOnWriteArrayList删除元素时,出现异常。我注意到它已记录在案 不支持对迭代器本身进行元素更改操作(删除,设置和添加)。这些方法抛出UnsupportedOperationException。 (来自http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CopyOnWr

  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很