import base64
import json
def lambda_handler(event, context):
output = []
for record in event['Records']:
# your own business logic.
json_object = {"name": "this is a test"}
output_record = {
'recordId': record['eventID'], # is this the problem? I used sequenceNumber, it is not right.
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_object).encode('utf-8')).decode('utf-8')
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['Records'])))
return {'records': output}
{
'Records': [
{
'kinesis': {
'kinesisSchemaVersion': '1.0',
'partitionKey': '1',
'sequenceNumber': '49603262076998903856573762341186472148109820820203765762',
'data':'eyJwcm9wIjogIjc5IiwgInRpbWVzdGFtcCI6ICIxNTk2MzE0MjM0IiwgInRoaW5nX2lkIjogImFhLWJiIn0=',
'approximateArrivalTimestamp': 1596314234.567
},
'eventSource': 'aws:kinesis',
'eventVersion': '1.0',
'eventID': 'shardId-000000000000:49603262076998903856573762341186472148109820820203765762',
'eventName': 'aws:kinesis:record',
'invokeIdentityArn':'xxx',
'awsRegion': 'us-east-1',
'eventSourceARN': 'xxx'
}
]
}
这取决于你如何配置你的Kinesis、消防水管和Lambda管道。
如果您的Kinesis流触发一个Lambda将数据发送到Firehose,那么您将对Kinesis记录事件感兴趣。使用AWS Lambda和Amazon Kinesis结账。下面的示例事件
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
另一个设置可能是Firehose轮询Kinesis流。此外,我们还可以灵活地为Firehose设置转换Lambda(Amazon Kinesis Data Firehose Data transformation)。在此设置中,示例事件如下所示(使用AWS Lambda和Amazon Kinesis Data Firehose)
{
"invocationId": "invoked123",
"deliveryStreamArn": "aws:lambda:events",
"region": "us-west-2",
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"approximateArrivalTimestamp": 1510772160000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000000",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
"approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
"subsequenceNumber": ""
}
},
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record2",
"approximateArrivalTimestamp": 151077216000,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000001",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
"approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
"subsequenceNumber": ""
}
}
]
}
我有一个用例,在将有效负载存储到S3之前,我需要对发送到我的Kinesis Firehose流的有效负载进行API调用。 流程为:运动数据流- 基本上,对于由我的Kinesis Firehose流使用的记录,我需要调用另一个后端服务来获取与该记录相关的其他数据,然后再存储到S3中,以便我们的EMR作业使用和编写查询。 我的问题是,有没有可能从Kinesis Firehose转换Lambda进行网络
参考:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies 这个例子有python版本吗?参考只有java等价物。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html.发现了
我正在使用Spring Boot开发一个应用程序,在这里我实现了身份验证和授权。这是我的角色实体 Rold id在BaseModel中。我想从数据库中获取所有角色,为此我在RoleRepository中编写了一个方法,如下所示 但这又让我犯了这个错误 org . spring framework . core . convert . converternotfoundexception:未找到能够
问题内容: 什么是转换不同的方法来,我知道有一个选项,但想知道是什么其他可用于相同? 注意: 我没有进一步的要求,因此我不能在这个问题上添加更多内容,但是在此时,如果我能知道有哪些不同的方法可用于转换,那将非常棒。 更新: 建议的不同方法是: javax.xml.bind.Marshaller和javax.xml.bind.Unmarshaller XStream的 XMLBean jaxb ca
问题内容: 我想将java.awt.color对象转换为字符串。 它打印java.awt.Color [r = 255,g = 0,b = 0] 现在,我想解析该字符串并解码颜色名称,例如蓝色等。 我在想有没有其他方法可以将Color直接转换为String。 提前致谢。 编辑:我想将此颜色值存储在数据库中。我应该将其存储为String还是RGB值? 问题答案: 否。至少是因为: 几个rgb组合具有
我有pdf格式的数据,我想把它转换成文本。我想删除图像,页眉和页脚,而数据将只以多行表格的形式保留,你能建议转换它的最佳方式吗?我尝试了Tabula和apache tika,但结果并不理想。