Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save justin-morgan/5105b308a9f8dcd222f9 to your computer and use it in GitHub Desktop.

Select an option

Save justin-morgan/5105b308a9f8dcd222f9 to your computer and use it in GitHub Desktop.
A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3.
import boto3
import logging
import json
import gzip
from StringIO import StringIO
logger = logging.getLogger()
logger.setLevel(logging.INFO)
client = boto3.client('firehose')
def lambda_handler(event, context):
#capture the CloudWatch log data
outEvent = str(event['awslogs']['data'])
#decode and unzip the log data
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
#convert the log data from JSON into a dictionary
cleanEvent = json.loads(outEvent)
#initiate a list
s = []
#set the name of the Kinesis Firehose Stream
firehoseName = 'FlowLogTest'
#loop through the events line by line
for t in cleanEvent['logEvents']:
#Transform the data and store it in the "Data" field.
p={
#Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus]
'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n"
}
#write the data to our list
s.insert(len(s),p)
#limit of 500 records per batch. Break it up if you have to.
if len(s) > 499:
#send the response to Firehose in bulk
SendToFireHose(firehoseName, s)
#Empty the list
s = []
#when done, send the response to Firehose in bulk
if len(s) > 0:
SendToFireHose(firehoseName, s)
#function to send record to Kinesis Firehose
def SendToFireHose(streamName, records):
response = client.put_record_batch(
DeliveryStreamName = streamName,
Records=records
)
#log the number of data points written to Kinesis
print "Wrote the following records to Firehose: " + str(len(records))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment