Created
September 29, 2020 15:57
-
-
Save cloudchopshop/e352606a0e6efeddcc14e78280220ddd to your computer and use it in GitHub Desktop.
Export ELB logs stored in s3 to Azure Sentinel / log analytics workspace using lambda new s3 file trigger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from io import StringIO, BytesIO | |
| import boto3 | |
| import gzip | |
| import re, sys, io, os | |
| import json | |
| import datetime | |
| import hashlib | |
| import hmac | |
| import base64 | |
| import urllib3 | |
| #region = os.environ['us-east-1'] | |
| shared_key = os.environ.get("shared_key") | |
| customer_id = os.environ.get("customer_id") | |
| # boto3 S3 initialization | |
| client = boto3.client("s3") | |
| # The log type is the name of the event that is being submitted | |
| log_type = 'WebMonitorTest' | |
| fields = [ | |
| "type", | |
| "timestamp", | |
| "alb", | |
| "client_ip", | |
| "client_port", | |
| "backend_ip", | |
| "backend_port", | |
| "request_processing_time", | |
| "backend_processing_time", | |
| "response_processing_time", | |
| "alb_status_code", | |
| "backend_status_code", | |
| "received_bytes", | |
| "sent_bytes", | |
| "request_verb", | |
| "request_url", | |
| "request_proto", | |
| "user_agent", | |
| "ssl_cipher", | |
| "ssl_protocol", | |
| "target_group_arn", | |
| "trace_id", | |
| "domain_name", | |
| "chosen_cert_arn", | |
| "matched_rule_priority", | |
| "request_creation_time", | |
| "actions_executed", | |
| "redirect_url", | |
| #"new_field", | |
| ] | |
| # REFERENCE: https://docs.aws.amazon.com/athena/latest/ug/application-load-balancer-logs.html#create-alb-table | |
| regex = r"([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" \"([^\"]*)\" ([A-Z0-9-]+) ([A-Za-z0-9.-]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" ([-.0-9]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" (.*)" | |
| ##################### | |
| ######Functions###### | |
| ##################### | |
| def build_json(): | |
| jdata = {} | |
| data = {} | |
| data['json_data'] = [] | |
| response = client.get_object(Bucket=bucket, Key=key) | |
| content = response['Body'].read() | |
| #print(content) | |
| fh = gzip.GzipFile(fileobj=io.BytesIO(content), mode='r') | |
| for line in fh: | |
| line = line.decode('ASCII') | |
| matches = re.search(regex, line) | |
| if matches: | |
| jdata = "" | |
| for i, field in enumerate(fields): | |
| begin = "{" if i == 0 else "" | |
| end = ", " if i < len(fields)-1 else "}\n" | |
| y = "%s\"%s\":\"%s\"%s" % (begin, field, matches.group(i+1), end) | |
| jdata+=(y) | |
| jjdata = json.loads(jdata) | |
| data['json_data'].append(jjdata) | |
| jbuild = data["json_data"] | |
| body = json.dumps(jbuild, indent=4) | |
| return(body) | |
| # Build the API signature | |
| def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource): | |
| x_headers = 'x-ms-date:' + date | |
| string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource | |
| bytes_to_hash = bytes(string_to_hash, encoding="utf-8") | |
| decoded_key = base64.b64decode(shared_key) | |
| encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode() | |
| authorization = "SharedKey {}:{}".format(customer_id,encoded_hash) | |
| print (authorization) | |
| return authorization | |
| # Build and send a request to the POST API | |
| def post_data(customer_id, shared_key, body, log_type): | |
| method = 'POST' | |
| content_type = 'application/json' | |
| resource = '/api/logs' | |
| rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') | |
| content_length = len(body) | |
| signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource) | |
| uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01' | |
| headers = { | |
| 'content-type': content_type, | |
| 'Authorization': signature, | |
| 'Log-Type': log_type, | |
| 'x-ms-date': rfc1123date | |
| } | |
| #response = requests.post(uri,data=body, headers=headers) #Depreciated | |
| http = urllib3.PoolManager() | |
| response = http.request('POST', uri, body=body, headers=headers) | |
| if (response.status >= 200 and response.status <= 299): | |
| print('Accepted') | |
| else: | |
| print("Response code: {}".format(response.status_code)) | |
| def lambda_handler(event, context): | |
| global bucket | |
| global key | |
| bucket = event['Records'][0]['s3']['bucket']['name'] | |
| key = event['Records'][0]['s3']['object']['key'] | |
| #print(key) | |
| post_data(customer_id, shared_key, build_json(), log_type) | |
| return { | |
| 'statusCode': 200, | |
| 'body': json.dumps('Hello from Lambda!') | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment