当前位置: 首页 > 知识库问答 >
问题:

AWS lambda到Firehose的转换:Python

昌琪
2023-03-14

import base64
import json

def lambda_handler(event, context):
    output = []
    for record in event['Records']:
        # your own business logic.
        json_object = {"name": "this is a test"}
        output_record = {
            'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right. 
            'result': 'Ok',
            'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return {'records': output}

{
    'Records': [
        {
            'kinesis': {
                'kinesisSchemaVersion': '1.0', 
                'partitionKey': '1', 
                'sequenceNumber': '49603262076998903856573762341186472148109820820203765762', 
                'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=', 
                'approximateArrivalTimestamp': 1596314234.567
            }, 
            'eventSource': 'aws:kinesis', 
            'eventVersion': '1.0', 
            'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762', 
            'eventName': 'aws:kinesis:record', 
            'invokeIdentityArn':'xxx', 
            'awsRegion': 'us-east-1', 
            'eventSourceARN': 'xxx'
        }
    ]
}

共有1个答案

卫成和
2023-03-14

这取决于你如何配置你的Kinesis、消防水管和Lambda管道。

如果您的Kinesis流触发一个Lambda将数据发送到Firehose,那么您将对Kinesis记录事件感兴趣。使用AWS Lambda和Amazon Kinesis结账。下面的示例事件

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

另一个设置可能是Firehose轮询Kinesis流。此外,我们还可以灵活地为Firehose设置转换Lambda(Amazon Kinesis Data Firehose Data transformation)。在此设置中,示例事件如下所示(使用AWS Lambda和Amazon Kinesis Data Firehose)

{
  "invocationId": "invoked123",
  "deliveryStreamArn": "aws:lambda:events",
  "region": "us-west-2",
  "records": [
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record1",
      "approximateArrivalTimestamp": 1510772160000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000000",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
        "approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
        "subsequenceNumber": ""
      }
    },
    {
      "data": "SGVsbG8gV29ybGQ=",
      "recordId": "record2",
      "approximateArrivalTimestamp": 151077216000,
      "kinesisRecordMetadata": {
        "shardId": "shardId-000000000001",
        "partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
        "approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
        "sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
        "subsequenceNumber": ""
      }
    }
  ]
}
    null
 类似资料:
  • 我有一个用例,在将有效负载存储到S3之前,我需要对发送到我的Kinesis Firehose流的有效负载进行API调用。 流程为:运动数据流- 基本上,对于由我的Kinesis Firehose流使用的记录,我需要调用另一个后端服务来获取与该记录相关的其他数据,然后再存储到S3中,以便我们的EMR作业使用和编写查询。 我的问题是,有没有可能从Kinesis Firehose转换Lambda进行网络

  • 参考:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies 这个例子有python版本吗?参考只有java等价物。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html.发现了

  • 我正在使用Spring Boot开发一个应用程序,在这里我实现了身份验证和授权。这是我的角色实体 Rold id在BaseModel中。我想从数据库中获取所有角色,为此我在RoleRepository中编写了一个方法,如下所示 但这又让我犯了这个错误 org . spring framework . core . convert . converternotfoundexception:未找到能够

  • 问题内容: 什么是转换不同的方法来,我知道有一个选项,但想知道是什么其他可用于相同? 注意: 我没有进一步的要求,因此我不能在这个问题上添加更多内容,但是在此时,如果我能知道有哪些不同的方法可用于转换,那将非常棒。 更新: 建议的不同方法是: javax.xml.bind.Marshaller和javax.xml.bind.Unmarshaller XStream的 XMLBean jaxb ca

  • 问题内容: 我想将java.awt.color对象转换为字符串。 它打印java.awt.Color [r = 255,g = 0,b = 0] 现在,我想解析该字符串并解码颜色名称,例如蓝色等。 我在想有没有其他方法可以将Color直接转换为String。 提前致谢。 编辑:我想将此颜色值存储在数据库中。我应该将其存储为String还是RGB值? 问题答案: 否。至少是因为: 几个rgb组合具有

  • 我有pdf格式的数据,我想把它转换成文本。我想删除图像,页眉和页脚,而数据将只以多行表格的形式保留,你能建议转换它的最佳方式吗?我尝试了Tabula和apache tika,但结果并不理想。