当前位置: 首页 > 知识库问答 >
问题:

当lambda中的事件失败时,kinesis如何保持偏移量并再次推送记录

柳珂
2023-03-14

我是AWS lambda和Kinesis的新手。请帮助解决以下问题

我有一个运动流作为lambda的源,目标也是运动。我有以下疑问。系统不想丢失记录。

如果任何记录在lambda中处理失败,它如何再次拉入lambda?它如何保存未处理的记录?kinesis如何跟踪偏移以处理下一条记录?

请更新。

共有1个答案

穆文斌
2023-03-14

AWS Lambda文档中关于将Lambda与Kinesis结合使用的内容:

如果函数返回错误,Lambda将重试批处理,直到处理成功或数据过期。在问题解决之前,不会处理碎片中的任何数据。为了避免停滞的碎片和潜在的数据丢失,请确保在代码中处理和记录处理错误。

在这种情况下,还要考虑运动的保留期:

保留期是将数据记录添加到流后可访问的时间长度。流的保留期设置为创建后24小时的默认值。您可以将保留期延长至168小时(7天)

正如第一句话中提到的,AWS将在保留期到期后删除事件。这对您来说意味着:

a) 注意Lambda函数正确处理错误。

b) 如果保留所有记录很重要,还可以将它们存储在持久性存储器中,例如DynamoDB。

除此之外,您还应该阅读有关重复Lambda执行的内容。有一篇很好的博客文章解释了如何实现幂等实现。阅读这里的另一个StackOverflow问题

 类似资料:
  • 问题内容: 我在Oracle数据库中有一个交易表。我正在尝试为涉及多种交易类型的交付系统收集一份报告。实际上,“请求”类型可以是四个子类型之一(在此示例中为“ A”,“ B”,“ C”和“ D”),而“传递”类型可以是四个不同子类型之一类型(“ PULL”,“ PICKUP”,“ MAIL”)。从“请求”到“交付”之间可以有1到5个事务,并且“交付”类型中的许多也是中间事务。 我需要的是像这样的报

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我已经实现了下面链接中的代码,用于从事件中心接收事件。但是假设有10个事件,每5个事件检查一次。现在程序在读取第7个事件时异常退出,如果我再次重启事件处理器主机,那么事件(1,2,3,4,6)将被重新读取。请建议我如何再次避免重读和阅读第7次事件?任何例子都值得欣赏。谢了。 https://github.com/Azure/azure-event-hubs/blob/master/samples/

  • 我正试着把我的头绕在Kafka的交易上,而且只绕了一次。 我已经创建了一个事务性消费者,我想确保阅读和处理某个主题的所有消息。如果事务失败,消息因此丢失,Kafka仍会提交偏移量。 更正式地说,如果流处理应用程序使用消息A并生成消息B,使得B=F(A),那么恰好一次处理意味着当且仅当成功生成B时才认为A被消耗,反之亦然。来源 基于此,我假设消息A没有被消费,因此将再次被重新处理。但这条信息将如何重

  • 我是Kafka的新手,一直在尝试实现一个消费者。下面是我的场景 启动消费者应用程序 产生来自生产者的消息。这些消息被消费者消费 停止消费者并再次生成消息。当我启动消费者时,在消费者被停止时发布的消息不会被读取 虽然会消耗消息,但它会消耗发布到主题的所有消息。我想只消耗那些在消费者关闭时发布的消息。

  • 在提交之前,我创建工作流来测试我的Python应用程序。问题是,如果测试失败,无论如何都会推送提交。如果测试不成功,我如何添加一个条件来避免推送? 下面是工作流文件. yml的结构。 `名称:Python应用程序on:push:branchs:[master]pull\u请求:branchs:[master] 工作:建造: 测试失败截图