当前位置: 首页 > 知识库问答 >
问题:

AWS Lambda函数从kinesis流中无限读取记录

华星剑
2023-03-14

我有一个kinesis流,有一个碎片和一个用Python编写的lambda函数。我添加了kinesis流作为批量大小为5的事件源。我在kinesis中添加了数百条记录,lambda函数得到了正确的调用和执行。但是对于最后3条记录,lambda函数被无限地调用,即使函数返回是成功的。

from __future__ import print_function

import base64
import json
import urllib2
import json

print('Loading function')

def is_valid_url(url):
    try:
        urllib2.urlopen(url)
        print("Valid URL ...")
        return True         # URL Exist
    except ValueError, ex:
        print("URL is not formatted...")
        return False        # URL not well formatted
    except urllib2.URLError, ex:
        print("Invalid URL ...")
        return False        # URL don't seem to be alive


def lambda_handler(event, context):     
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        payload = base64.b64decode(record['kinesis']['data'])
        params = json.loads(payload)
        print("Decoded payload: " + payload  + " : " +     str(is_valid_url(params['url'])) + " : " + str(len(event['Records'])))
    return 'Successfully processed {} records.'.format(len(event['Records']))
  START RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3 Version:   $LATEST 
  Loading function 
  Valid URL ... 
  Decoded payload: { "url": "https://google.com" }
  Valid URL ... 
  Decoded payload: { "url": "https://google.com" }
  Valid URL ... 
  Decoded payload: { "url": "https://google.com" }
  Valid URL ...  
  Decoded payload: { "url": "https://google.com" }
  END RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3 
  REPORT RequestId: d6033244-1c43-40ea-8886-f38b8c48daa3    Duration: 3003.00 ms    Billed Duration: 3000 ms Memory Size: 128 MB    Max Memory Used: 10 MB   
  2016-03-04T17:32:01.030Z d6033244-1c43-40ea-8886-f38b8c48daa3 Task timed out after 3.00 seconds

共有1个答案

松和泰
2023-03-14

因为您的函数正在超时,Lambda将运行视为错误。Kinesis的错误处理策略是重试记录,直到它脱离裁剪范围(通常是24小时),因此您的函数将重试24小时,或者直到它没有超时。

根据你发布的内容,我不知道为什么你的函数会超时。一个快速的解决办法是简单地增加Lambda控制台上的超时值(在配置选项卡的高级下)

 类似资料:
  • 我正在为一个小项目尝试一些Java的套接字编程。我遇到了从外部进程读取无限InputStream的问题。程序进入无限循环。 我怀疑readLine()必须在EOF之前读取流。 我放了一些打印语句,我确信程序达到了while循环。 这是我的方法:

  • 我试图使用类似于https://github.com/aws-sample/amazon-kinesis-learning的Kinesis客户端库来使用Kinesis数据流。但在这个例子中,他们计划了这个过程。我想消费没有调度器传入的记录。 我不想使用DynamoDB,CloudWatch。期望一个简单的使用者使用流中的记录 有没有什么方法可以在没有调度程序的情况下使用java处理记录

  • 我是AWS的新手,希望得到一些指导。 我想处理最古老的未处理记录,但似乎无法正确获取参数。 当前架构 对于碎片迭代器: 我试过TRIM_HORIZON从一开始就给了我所有的记录。 我也试过LATEST,它只给了我一张最新的唱片。 不确定这些额外的细节是否有帮助,但。。。 我通过Lambda将自己的记录放在AWS控制台上 提前感谢!

  • 我想从Amazon Kinesis流中获取最新记录。我打算从中提取时间戳,并将其与消费者应用程序检查指向的最后一条记录的时间戳进行比较,以检查消费者是否落后。 我不能使用最新的shard迭代器类型。这是因为LATEST指向最近的记录之后,因此它不能用于访问最近的记录。 有没有简单的方法可以获得最新记录? 我正在考虑的一种方法是获取消费者最近处理的记录序列号的碎片迭代器,使用该碎片迭代器发出GetR

  • 我试图在lambda函数中的特定时间戳之后从动觉流中读取记录。我得到碎片,碎片迭代器,然后是数据<当我得到第一个迭代器时,我得到数据,并使用NextShardIterator(返回的数据中存在)递归地调用同一个函数。根据文档,当没有更多数据可读取且达到$latest时,NextShardIterator将返回null<但它从不返回null,函数不断被调用,最终我得到了配置吞吐量超过异常<我还尝试使

  • 我们如何从AWS的运动流中读回时间 使用AWS Kinesis stream,可以发送事件流,消费者应用程序可以读取事件。Kinesis Stream worker从最后一个检查点获取记录并将其传递给IRecordProcessor#processRecords 但是,如果我需要读取回溯到时间的记录,例如从2小时前开始处理记录,我如何配置我的运动工作者来获取这些记录?