通过 Kinesis 和 Kinesis Firehose 将 CloudWatch log group 日志流 发送到 OpenSearch


通过 Kinesis Firehose 将传输到 Kinesis Stream 中的 CW log 发送到 ES时,需要使用到 Lambda 中的 蓝图 kinesis-firehose-cloudwatch-logs-processor 和 kinesis-firehose-cloudwatch-logs-processor-python 。 而其中最重要的问题是 ES 只能接受 JSON 数据格式, 而 CWLog 如果不是 JSON 格式,就需要自己进行转换, 并且发送是批量的,需要进行拆分。

以MySQL General Log 为例,日志格式为非 JSON 数据,并且不能在 日志生成阶段进行调整, 这就要我们在 Firehose 中的数据转换过程中进行调整。


RDS MySQL Genral Log --> CloudWatch Log Group --> Kinesis --> Kinesis Firehose (Transform records: Lambda) --> OpenSearch (ElasticSearch)

1. RDS 发送至 CW Log, 这一步是在 RDS 端进行配置,更改实例,将日志发送至 CW。
(1) 配置自定义参数组,修改参数 general_log =1 , log_output = FILE
(2) 将日志发送至 CW

2. 首先创建一个 Kinesis Data Strea。 之后在 CW Log 中找到对应的 日志组, 创建一个 Kinesis Subscription Filter, 将日志输出到创建的 Kinesis Data Stream 中。

3. 创建 Delivery stream ( Kinesis Firehose), Source选择 Kinesis Data stream, Destination 选择 OpenSearch,

* 推荐将索引按照 小时或9天进行轮换, 避免索引过大。

* Role 需要有 firehose, lambda 的信任关系。

需要 Enable 数据转换(Transform records), Lambda 我们选择K inesis Data Firehose Cloudwatch Logs Processor, 或者在 Lambda 控制台创建函数进行配置。
在 Kinesis Firehose 控制台,我们只能创建 NodeJS 版本的 Lambda , 在 Lambda 控制台使用蓝图创建我们可以选择 Python 版本的。

创建 Lambda ,注意需要有 Cloud watch 的权限,这样才能看日志。
以下为 传输 General Log 需要的代码。 输出到ES 中,将一条 log 拆分为 Time, PID , Perform, Statement 四项。
General Log:
2022-01-15T08:44:45.161343Z     9566 Execute    select * from information_schema.rds_events_threads_waits_current where (type <> 'BACKGROUND' or name = 'thread/sql/slave_sql') and command <> 'Sleep'

Time: 2022-01-15T08:44:45.161343Z    
PID: 9566
Perform: Execute
Statement: select * from information_schema.rds_events_threads_waits_current where (type <> 'BACKGROUND' or name = 'thread/sql/slave_sql') and command <> 'Sleep'

# Copyright 2014, Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Amazon Software License (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#  http://aws.amazon.com/asl/
# or in the "license" file accompanying this file. This file is distributed
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

For processing data sent to Firehose by Cloudwatch Logs subscription filters.

Cloudwatch Logs sends to Firehose records that look like this:

  "messageType": "DATA_MESSAGE",
  "owner": "123456789012",
  "logGroup": "log_group_name",
  "logStream": "log_stream_name",
  "subscriptionFilters": [
  "logEvents": [
      "id": "01234567890123456789012345678901234567890123456789012345",
      "timestamp": 1510109208016,
      "message": "log message 1"
      "id": "01234567890123456789012345678901234567890123456789012345",
      "timestamp": 1510109208017,
      "message": "log message 2"

The data is additionally compressed with GZIP.

NOTE: It is suggested to test the cloudwatch logs processor lambda function in a pre-production environment to ensure
the 6000000 limit meets your requirements. If your data contains a sizable number of records that are classified as
Dropped/ProcessingFailed, then it is suggested to lower the 6000000 limit within the function to a smaller value
(eg: 5000000) in order to confine to the 6MB (6291456 bytes) payload limit imposed by lambda. You can find Lambda
quotas at https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html

The code below will:

1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
   processing error output. Such records do not contain any log events. You can modify the code to set the result to
   Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
   each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
   transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
   this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
6) Any individual record exceeding 6,000,000 bytes in size after decompression and encoding is marked as
   ProcessingFailed within the function. The original compressed record will be backed up to the S3 bucket
   configured on the Firehose.
7) Any additional records which exceed 6MB will be re-ingested back into Firehose.
8) The retry count for intermittent failures during re-ingestion is set 20 attempts. If you wish to retry fewer number
   of times for intermittent failures you can lower this value.

import base64
import json
import gzip
from io import BytesIO
import boto3
import re

def transformLogEvent(log_event):
    """Transform each log event.

    The default implementation below just extracts the message and appends a newline to it.

    log_event (dict): The original log event. Structure is {"id": str, "timestamp": long, "message": str}

    str: The transformed log event.
    log = log_event['message']
    # split log 
    #2022-02-04T07:46:01.385765Z	15578 Execute	select * from information_schema.rds_events_threads_waits_current where (type <> 'BACKGROUND' or name = 'thread/sql/slave_sql') and command <> 'Sleep'
    pattern = re.compile(r'([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{6}Z)\s(\d*)\s(\w*)\s(.*)')
    return_logevent = {
        'Time' : pattern.match(log)[1],
        'PID' : pattern.match(log)[2],
        'Perform' : pattern.match(log)[3],
        'Statement' : pattern.match(log)[4]
    print (return_logevent)
    return return_logevent

def processRecords(records):
    print ('AAA Records :  ', records )
    for r in records:
        print ('AAA r: ' , r)
        data = base64.b64decode(r['data'])
        striodata = BytesIO(data)
        with gzip.GzipFile(fileobj=striodata, mode='r') as f:
            data = json.loads(f.read())

        recId = r['recordId']
        CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
        They do not contain actual data.
        if data['messageType'] == 'CONTROL_MESSAGE':
            yield {
                'result': 'Dropped',
                'recordId': recId
        elif data['messageType'] == 'DATA_MESSAGE':
            jdata = []
            for i,e in enumerate(data['logEvents']):
                eachData = transformLogEvent(e)
                encodedData = base64.encodebytes(json.dumps(eachData, ensure_ascii=False).encode())
                if len(encodedData) <= 6000000:
                    print ('encodeData < 6000000 : ' , encodedData)
                    yield {
                        'data': encodedData,
                        'result': 'Ok',
                        'recordId': int(recId) + i
                    yield {
                        'result': 'ProcessingFailed',
                        'recordId': int(recId) + i
            yield {
                'result': 'ProcessingFailed',
                'recordId': recId

def putRecordsToFirehoseStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_record_batch throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
        response = client.put_record_batch(DeliveryStreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedPutCount'] > 0:
        for idx, res in enumerate(response['RequestResponses']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:


        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecordBatch to Firehose stream, retrying. %s' % (errMsg))
            putRecordsToFirehoseStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))

def putRecordsToKinesisStream(streamName, records, client, attemptsMade, maxAttempts):
    failedRecords = []
    codes = []
    errMsg = ''
    # if put_records throws for whatever reason, response['xx'] will error out, adding a check for a valid
    # response will prevent this
    response = None
        response = client.put_records(StreamName=streamName, Records=records)
    except Exception as e:
        failedRecords = records
        errMsg = str(e)

    # if there are no failedRecords (put_record_batch succeeded), iterate over the response to gather results
    if not failedRecords and response and response['FailedRecordCount'] > 0:
        for idx, res in enumerate(response['Records']):
            # (if the result does not have a key 'ErrorCode' OR if it does and is empty) => we do not need to re-ingest
            if 'ErrorCode' not in res or not res['ErrorCode']:


        errMsg = 'Individual error codes: ' + ','.join(codes)

    if len(failedRecords) > 0:
        if attemptsMade + 1 < maxAttempts:
            print('Some records failed while calling PutRecords to Kinesis stream, retrying. %s' % (errMsg))
            putRecordsToKinesisStream(streamName, failedRecords, client, attemptsMade + 1, maxAttempts)
            raise RuntimeError('Could not put records after %s attempts. %s' % (str(maxAttempts), errMsg))

def createReingestionRecord(isSas, originalRecord):
    if isSas:
        return {'data': base64.b64decode(originalRecord['data']), 'partitionKey': originalRecord['kinesisRecordMetadata']['partitionKey']}
        return {'data': base64.b64decode(originalRecord['data'])}

def getReingestionRecord(isSas, reIngestionRecord):
    if isSas:
        return {'Data': reIngestionRecord['data'], 'PartitionKey': reIngestionRecord['partitionKey']}
        return {'Data': reIngestionRecord['data']}

def lambda_handler(event, context):
    print ('AAAAA event --> ' , event)
    isSas = 'sourceKinesisStreamArn' in event
    streamARN = event['sourceKinesisStreamArn'] if isSas else event['deliveryStreamArn']
    region = streamARN.split(':')[3]
    streamName = streamARN.split('/')[1]
    records = list(processRecords(event['records']))
    projectedSize = 0
    dataByRecordId = {rec['recordId']: createReingestionRecord(isSas, rec) for rec in event['records']}
    putRecordBatches = []
    recordsToReingest = []
    totalRecordsToBeReingested = 0

    for idx, rec in enumerate(records):
        if rec['result'] != 'Ok':
        projectedSize += len(rec['data']) + len(str(rec['recordId']))
        # 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
        if projectedSize > 6000000:
            totalRecordsToBeReingested += 1
                getReingestionRecord(isSas, dataByRecordId[rec['recordId']])
            records[idx]['result'] = 'Dropped'

        # split out the record batches into multiple groups, 500 records at max per group
        if len(recordsToReingest) == 500:
            recordsToReingest = []

    print ('recordsToReingest: ' ,recordsToReingest)
    if len(recordsToReingest) > 0:
        # add the last batch

    # iterate and call putRecordBatch for each group
    recordsReingestedSoFar = 0
    if len(putRecordBatches) > 0:
        client = boto3.client('kinesis', region_name=region) if isSas else boto3.client('firehose', region_name=region)
        for recordBatch in putRecordBatches:
            if isSas:
                putRecordsToKinesisStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
                putRecordsToFirehoseStream(streamName, recordBatch, client, attemptsMade=0, maxAttempts=20)
            recordsReingestedSoFar += len(recordBatch)
            print('Reingested %d/%d records out of %d' % (recordsReingestedSoFar, totalRecordsToBeReingested, len(event['records'])))
        print('No records to be reingested')

    print ('-----> return records: ' , records)
    return {"records": records}
