我读过很多关于在firehose中添加换行符的类似问题,但都是关于在源代码中添加换行符。问题是我没有访问源的权限,第三方正在将数据传输到我们的Kinesis实例,我无法将“\n”添加到源。
我尝试使用以下代码进行firehose数据转换:
'use strict';
console.log('Loading function');
exports.handler = (event, context, callback) => {
/* Process the list of records and transform them */
const output = [];
event.records.forEach((record) => {
const results = {
/* This transformation is the "identity" transformation, the data is left intact */
recordId: record.recordId,
result: record.data.event_type === 'alert' ? 'Dropped' : 'Ok',
data: record.data + '\n'
};
output.push(results);
});
console.log(`Processing completed. Successful records ${output.length}.`);
callback(null, { records: output });
};
但是,这条新线仍然丢失了。我也尝试了JSON。stringify(record.data)“\n”但是我得到了一个无效的输出结构错误。
下面是解决问题的代码
__Author__ = "Soumil Nitin Shah"
import json
import boto3
import base64
class MyHasher(object):
def __init__(self, key):
self.key = key
def get(self):
keys = str(self.key).encode("UTF-8")
keys = base64.b64encode(keys)
keys = keys.decode("UTF-8")
return keys
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
"""Get the payload from event bridge and just get data attr """""
serialize_payload = str(json.loads(payload)) + "\n"
hasherHelper = MyHasher(key=serialize_payload)
hash = hasherHelper.get()
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': hash
}
print("output_record", output_record)
output.append(output_record)
return {'records': output}
kinesis firehose cloudwatch logs处理器blueprint lambda可以做到这一点(对cloudwatch日志进行了一些额外的处理)。
这是今天蓝图中的lambda代码:
/*
For processing data sent to Firehose by Cloudwatch Logs subscription filters.
Cloudwatch Logs sends to Firehose records that look like this:
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"subscription_filter_name"
],
"logEvents": [
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
},
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
}
...
]
}
The data is additionally compressed with GZIP.
The code below will:
1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
*/
const zlib = require('zlib');
const AWS = require('aws-sdk');
/**
* logEvent has this format:
*
* {
* "id": "01234567890123456789012345678901234567890123456789012345",
* "timestamp": 1510109208016,
* "message": "log message 1"
* }
*
* The default implementation below just extracts the message and appends a newline to it.
*
* The result must be returned in a Promise.
*/
function transformLogEvent(logEvent) {
return Promise.resolve(`${logEvent.message}\n`);
}
function putRecordsToFirehoseStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
client.putRecordBatch({
DeliveryStreamName: streamName,
Records: records,
}, (err, data) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.RequestResponses.length; i++) {
const code = data.RequestResponses[i].ErrorCode;
if (code) {
codes.push(code);
failed.push(records[i]);
}
}
errMsg = `Individual error codes: ${codes}`;
}
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
}
} else {
resolve('');
}
});
}
function putRecordsToKinesisStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
client.putRecords({
StreamName: streamName,
Records: records,
}, (err, data) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.Records.length; i++) {
const code = data.Records[i].ErrorCode;
if (code) {
codes.push(code);
failed.push(records[i]);
}
}
errMsg = `Individual error codes: ${codes}`;
}
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
}
} else {
resolve('');
}
});
}
function createReingestionRecord(isSas, originalRecord) {
if (isSas) {
return {
Data: new Buffer(originalRecord.data, 'base64'),
PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
};
} else {
return {
Data: new Buffer(originalRecord.data, 'base64'),
};
}
}
function getReingestionRecord(isSas, reIngestionRecord) {
if (isSas) {
return {
Data: reIngestionRecord.Data,
PartitionKey: reIngestionRecord.PartitionKey,
};
} else {
return {
Data: reIngestionRecord.Data,
};
}
}
exports.handler = (event, context, callback) => {
Promise.all(event.records.map(r => {
const buffer = new Buffer(r.data, 'base64');
const decompressed = zlib.gunzipSync(buffer);
const data = JSON.parse(decompressed);
// CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
// They do not contain actual data.
if (data.messageType === 'CONTROL_MESSAGE') {
return Promise.resolve({
recordId: r.recordId,
result: 'Dropped',
});
} else if (data.messageType === 'DATA_MESSAGE') {
const promises = data.logEvents.map(transformLogEvent);
return Promise.all(promises)
.then(transformed => {
const payload = transformed.reduce((a, v) => a + v, '');
const encoded = new Buffer(payload).toString('base64');
return {
recordId: r.recordId,
result: 'Ok',
data: encoded,
};
});
} else {
return Promise.resolve({
recordId: r.recordId,
result: 'ProcessingFailed',
});
}
})).then(recs => {
const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn');
const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
const region = streamARN.split(':')[3];
const streamName = streamARN.split('/')[1];
const result = { records: recs };
let recordsToReingest = [];
const putRecordBatches = [];
let totalRecordsToBeReingested = 0;
const inputDataByRecId = {};
event.records.forEach(r => inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r));
let projectedSize = recs.filter(rec => rec.result === 'Ok')
.map(r => r.recordId.length + r.data.length)
.reduce((a, b) => a + b);
// 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) {
const rec = result.records[idx];
if (rec.result === 'Ok') {
totalRecordsToBeReingested++;
recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId]));
projectedSize -= rec.data.length;
delete rec.data;
result.records[idx].result = 'Dropped';
// split out the record batches into multiple groups, 500 records at max per group
if (recordsToReingest.length === 500) {
putRecordBatches.push(recordsToReingest);
recordsToReingest = [];
}
}
}
if (recordsToReingest.length > 0) {
// add the last batch
putRecordBatches.push(recordsToReingest);
}
if (putRecordBatches.length > 0) {
new Promise((resolve, reject) => {
let recordsReingestedSoFar = 0;
for (let idx = 0; idx < putRecordBatches.length; idx++) {
const recordBatch = putRecordBatches[idx];
if (isSas) {
const client = new AWS.Kinesis({ region: region });
putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
} else {
const client = new AWS.Firehose({ region: region });
putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
}
recordsReingestedSoFar += recordBatch.length;
console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName);
}
}).then(
() => {
console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName);
callback(null, result);
},
failed => {
console.log('Failed to reingest records. %s', failed);
callback(failed, null);
});
} else {
console.log('No records needed to be reingested.');
callback(null, result);
}
}).catch(ex => {
console.log('Error: ', ex);
callback(ex, null);
});
};
尝试解码record.data添加新行,然后再次将其编码为base 64。
这是python,但想法是一样的
for record in event['records']:
payload = base64.b64decode(record['data'])
# Do custom processing on the payload here
payload = payload + '\n'
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(payload))
}
output.append(output_record)
return {'records': output}
来自@Matt Westlake的评论:
对于那些寻找节点答案的人来说
const data=JSON。parse(new Buffer.from(record.data,'base64')。toString('utf8');
和
新Buffer.from(JSON. stringify(data)'\n')
我是Lambda的新手,正在尝试模拟一个简单的函数来执行Kinesis Fireshose的PUT。 我试着浏览AWS文档,但找不到任何确切的参考来编写一个简单的python脚本,以从API获取并通过Firehose将JSON发送到S3。下面是我试图发布到Lambda的代码,但我想按计划将其发送到Firehose,而不是文件系统。
有没有办法查询AWS Kinesis Firehose的当前统计数据,比如 当前缓冲区中的记录数? 占用了多少缓冲区?
null 我如何解决这个问题?
对于我的应用程序日志,我计划将日志从我的内部服务器转发到AWS Kinesis Firehose。我正在尝试使用Logstash和log-stash-output-plugin https://github.com/samcday/logstash-output-kinesis 但我认为这个插件需要将日志数据转发到Kinesis数据流,然后我们可以创建一个管道到Kinesis消防水管。如果我们指向
我想使用terraform、Kinesis数据流和数据消防软管创建,并将它们连接(作为管道)。当我使用UI时,当我去消防软管时,我可以去源- 这是创建动觉流的代码(我从官方动觉文档中获取): 这是数据消防水带的代码: 那么我如何连接它们呢,是不是类似于${aws\u kinesis\u stream.test\u stream.arn}?或者类似的东西? 我使用了aws_kinesis_strea