Last active
March 5, 2024 23:28
-
-
Save ismisepaul/03acb1d5319b9cce5066fb10e71deea9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import copy | |
| import hashlib | |
| import json | |
| import os | |
| import sys | |
| from datetime import datetime, timezone, timedelta | |
| import elasticsearch | |
| import elastic_transport | |
| import requests | |
| from elasticsearch.helpers import scan | |
| from loguru import logger | |
| from requests.auth import HTTPBasicAuth | |
| logger.remove() | |
| logger.add(sys.stderr, level="INFO") | |
| class GithubApi: | |
| def __init__(self, github_url: str, username: str, token: str, proxies: dict = None, self_signed_cert: str = None): | |
| self.url = github_url | |
| logger.debug(f"Base URL: {self.url}") | |
| self.basic_auth = HTTPBasicAuth(username, token) | |
| logger.debug(f"Username: {username}") | |
| self.proxies = proxies | |
| self.self_signed_cert = self_signed_cert | |
| def list_organization_rule_suites(self, organization, repository_name: str = None, time_period: str = "day", | |
| actor_name: str = None, | |
| rule_suite_result: str = "all", per_page=30, page=1): | |
| query_params = { | |
| 'page': page, | |
| 'per_page': per_page, | |
| 'repository_name': repository_name, | |
| 'time_period': time_period, | |
| 'actor_name': actor_name, | |
| 'rule_suite_result': rule_suite_result | |
| } | |
| url = self.url + f"/orgs/{organization}/rulesets/rule-suites" | |
| response = requests.get( | |
| url=url, | |
| auth=self.basic_auth, | |
| params=query_params | |
| ) | |
| logger.info(f"Got page {page} from {response.url}") | |
| if response.status_code == 200: | |
| return response | |
| else: | |
| logger.warning(response.status_code) | |
| logger.warning(response.content) | |
| return response | |
| def get_organization_rule_suite(self, organization, rule_suite_id): | |
| url = self.url + f"/orgs/{organization}/rulesets/rule-suites/{rule_suite_id}" | |
| response = requests.get( | |
| url=url, | |
| auth=self.basic_auth, | |
| ) | |
| if response.status_code == 200: | |
| return response | |
| else: | |
| logger.warning(response.status_code) | |
| logger.warning(response.content) | |
| return response | |
| def get_commits(self, owner, repo, sha=None, path=None, author=None, committer=None, since=None, until=None, | |
| per_page=30, page=1): | |
| url = self.url + f"/repos/{owner}/{repo}/commits" | |
| query_params = { | |
| 'sha': sha, | |
| 'path': path, | |
| 'author': author, | |
| 'committer': committer, | |
| 'since': since, | |
| 'until': until, | |
| 'per_page': per_page, | |
| 'page': page | |
| } | |
| response = requests.get( | |
| url=url, | |
| auth=self.basic_auth, | |
| params=query_params | |
| ) | |
| if response.status_code == 200: | |
| return response | |
| else: | |
| logger.warning(response.status_code) | |
| logger.warning(response.content) | |
| return response | |
| def get_commit(self, owner, repo, ref): | |
| url = self.url + f"/repos/{owner}/{repo}/git/commits/{ref}" | |
| response = requests.get( | |
| url=url, | |
| auth=self.basic_auth | |
| ) | |
| if response.status_code == 200: | |
| return response | |
| else: | |
| logger.warning(f"Could not get {url}") | |
| logger.warning(response.status_code) | |
| logger.warning(response.content) | |
| return response | |
| class GithubGraphQL: | |
| def __init__( | |
| self, username: str, token: str, github_url: str = 'https://api.github.com/graphql', | |
| proxies: dict = None, self_signed_cert: str = None | |
| ): | |
| self.url = github_url | |
| logger.debug(f"GraphQL URL: {self.url}") | |
| self.basic_auth = HTTPBasicAuth(username, token) | |
| logger.debug(f"Username: {username}") | |
| self.proxies = proxies | |
| self.self_signed_cert = self_signed_cert | |
| logger.debug("Initializing GithubGraphQLTeams") | |
| def get_permission_user_has_on_repository(self, github_org, github_username, repo_name): | |
| graphql_query = f""" | |
| {{ | |
| organization(login: "{github_org}") {{ | |
| repository(name: "{repo_name}") {{ | |
| collaborators(first: 100, query: "{github_username}") {{ | |
| edges {{ | |
| permission | |
| node {{ | |
| login | |
| }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| }} | |
| """ | |
| response = requests.post( | |
| url=self.url, | |
| auth=self.basic_auth, | |
| json={"query": graphql_query} | |
| ) | |
| if response.status_code == 200: | |
| return response | |
| else: | |
| logger.warning(response.status_code) | |
| logger.warning(response.content) | |
| return response | |
| def compare_dates(datetime_str_to_compare: str, specified_date_str: str, specified_time_str: str) -> bool: | |
| datetime_to_compare = datetime.fromisoformat(datetime_str_to_compare) | |
| specified_date = datetime.strptime(f"{specified_date_str} {specified_time_str}", "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc) | |
| if specified_date > datetime_to_compare: | |
| logger.info(f"{specified_date} > {datetime_to_compare}") | |
| return True | |
| else: | |
| logger.info(f"{specified_date} < {datetime_to_compare}") | |
| return False | |
| def generate_unique_id(json_object): | |
| """ | |
| Create an Elasticsearch document ID by sha256'in the full JSON object | |
| :param json_object: | |
| :return: | |
| """ | |
| json_string = json.dumps(json_object, sort_keys=True) | |
| sha256_hash = hashlib.sha256() | |
| sha256_hash.update(json_string.encode('utf-8')) | |
| unique_id = sha256_hash.hexdigest() | |
| return unique_id | |
| def get_elasticsearch_connection(elasticsearch_host): | |
| """ | |
| Create a connection to Elasticsearch and return a client to interact with the server | |
| :return: elasticsearch client | |
| """ | |
| logger.info(f"Connecting to Elasticsearch: {elasticsearch_host}") | |
| try: | |
| es_client = elasticsearch.Elasticsearch( | |
| elasticsearch_host, | |
| basic_auth=(elastic_username, elastic_password), | |
| request_timeout=30 | |
| ) | |
| return es_client | |
| except elastic_transport.ConnectionTimeout as err: | |
| logger.error(err.errors) | |
| return None | |
| except elastic_transport.ConnectionError as err: | |
| logger.error(err.errors) | |
| return None | |
| def get_elasticsearch_index_info(es_conn: elasticsearch.Elasticsearch, elasticsearch_index): | |
| """ | |
| Get information on the index specified in the config.yml | |
| :param elasticsearch_index: the index in Elasticsearch | |
| :param es_client: an elasticsearch client connection | |
| """ | |
| try: | |
| es_index = elasticsearch_index | |
| index_exists = es_conn.indices.exists(index=es_index) | |
| if index_exists: | |
| logger.info(f"Index Metadata: {es_conn.indices.get(index=elasticsearch_index)}") | |
| else: | |
| logger.error(f"Specified Index {es_index} doesn't exist") | |
| sys.exit(1) | |
| except elasticsearch.ApiError as err: | |
| logger.error(err) | |
| sys.exit(1) | |
| def get_elasticsearch_index_ids(es_conn: elasticsearch.Elasticsearch, es_index_name: str): | |
| logger.info(f"Getting document ids from {es_index_name}") | |
| scan_result = scan(es_conn, query={"query": {"match_all": {}}}, index=es_index_name) | |
| document_ids = [doc_id['_id'] for doc_id in scan_result] | |
| return document_ids | |
| def create_flat_json_object(original_object) -> list: | |
| rule_evaluations = original_object.get("rule_evaluations", []) | |
| # Remove the rule_evaluations from the original object | |
| original_object.pop("rule_evaluations", None) | |
| split_objects = [] | |
| for rule_evaluation in rule_evaluations: | |
| new_object = copy.deepcopy(original_object) | |
| # Add the current rule_evaluation to the new object | |
| new_object["rule_evaluations"] = [rule_evaluation] | |
| split_objects.append(new_object) | |
| return split_objects | |
| def get_commit_meta(repo, sha): | |
| result = github_api.get_commit(github_org, repo, sha) | |
| if result.status_code != 200: | |
| return None | |
| commit_meta = { | |
| 'commit_author_name': result.json().get('author')['name'], | |
| 'commit_author_email': result.json().get('author')['email'], | |
| 'committer_name': result.json().get('committer')['name'], | |
| 'committer_email': result.json().get('committer')['email'], | |
| 'commit_message': result.json().get('message') | |
| } | |
| return commit_meta | |
| def upload_ruleset_insight_to_elasticsearch(ruleset_insights: list, page_num, stop_at_date: str = None, | |
| stop_at_date_include_time: str = None, stop_when_dup_doc_exists=False): | |
| logger.info("Uploading to Elasticsearch") | |
| index_document_ids = None | |
| count = 0 | |
| if stop_when_dup_doc_exists: | |
| index_document_ids = get_elasticsearch_index_ids(es, es_index_name) | |
| for ruleset_insight in ruleset_insights: | |
| rs_result = github_api.get_organization_rule_suite(github_org, ruleset_insight.get('id')) | |
| rs_result_json = rs_result.json() | |
| ruleset_meta = create_flat_json_object(rs_result_json) | |
| for meta_data in ruleset_meta: | |
| commit_meta = get_commit_meta(meta_data.get('repository_name'), meta_data.get('after_sha')) | |
| if commit_meta: | |
| combined_meta = {**meta_data, **commit_meta} | |
| else: | |
| combined_meta = meta_data | |
| repo_permission = get_permission_user_has_on_repository(github_org, meta_data.get('actor_name'), meta_data.get('repository_name')) | |
| if repo_permission: | |
| combined_meta['permission'] = repo_permission | |
| if stop_at_date: | |
| datetime_from_api = rs_result_json.get('pushed_at') | |
| is_date_greater = compare_dates( | |
| datetime_str_to_compare=datetime_from_api, specified_date_str=stop_at_date, | |
| specified_time_str=stop_at_date_include_time | |
| ) | |
| if is_date_greater: | |
| logger.info(f"Datetime {stop_at_date} {stop_at_date_include_time} has been reached") | |
| sys.exit(0) | |
| logger.debug(combined_meta) | |
| document_id = generate_unique_id(combined_meta) | |
| if index_document_ids: # check if document already exists | |
| if document_id in index_document_ids: | |
| logger.warning("Document already exists in Elasticsearch") | |
| logger.debug(combined_meta) | |
| return False | |
| es_result = es.index(index=es_index_name, body=combined_meta, id=document_id) | |
| count += 1 | |
| logger.info(f"{count} documents indexed from page {page_num}") | |
| logger.debug(es_result) | |
| return True | |
| def get_ruleset_insights( | |
| stop_at_date: str = None, stop_at_date_include_time: str = None, stop_when_dup_doc_exists=False, | |
| page_num=0, per_page=100 | |
| ): | |
| is_new_doc = True # the document doesn't exist in Elasticsearch | |
| while is_new_doc: | |
| list_organization_rule_suites = github_api.list_organization_rule_suites( | |
| organization=github_org, per_page=per_page, page=page_num, time_period="month" | |
| ) | |
| if list_organization_rule_suites.status_code != 200: | |
| logger.error(list_organization_rule_suites.content) | |
| logger.error(f"Failed at page {page_num}") | |
| sys.exit(1) | |
| ruleset_insights = list_organization_rule_suites.json() | |
| if page_num == 0: | |
| dt = datetime.fromisoformat(ruleset_insights[0].get('pushed_at')) | |
| dt_minus_five = dt - timedelta(minutes=5) | |
| with open(datetime_file, "w") as dt_file: | |
| dt_file.write(dt_minus_five.isoformat()) | |
| logger.info("Wrote datetime file") | |
| upload_finish = upload_ruleset_insight_to_elasticsearch( | |
| ruleset_insights=ruleset_insights, page_num=page_num, stop_when_dup_doc_exists=stop_when_dup_doc_exists, | |
| stop_at_date=stop_at_date, stop_at_date_include_time=stop_at_date_include_time | |
| ) | |
| page_num += 1 | |
| if not upload_finish: | |
| is_new_doc = False | |
| logger.info("Finished getting ruleset result") | |
| def get_permission_user_has_on_repository(github_org, github_username, repo_name): | |
| response = github_graphql.get_permission_user_has_on_repository(github_org, github_username, repo_name).json() | |
| try: | |
| return response.get('data')['organization']['repository']['collaborators']['edges'][0]['permission'] | |
| except IndexError: | |
| logger.warning(f"Could not get permission for {github_username} in the repo {repo_name} for the org {github_org}") | |
| return None | |
| if __name__ == "__main__": | |
| ES_URL = "https://localhost:443" | |
| elastic_username = "ingest_admin" | |
| es_index_name = "org-rulesets" | |
| github_org = 'elastic' | |
| datetime_file = 'datetime.txt' | |
| github_login_username = 'ismisepaul' | |
| github_token = os.getenv('GH_ADMIN_TOKEN') | |
| elastic_password = os.getenv('ELASTIC_BRIGID_INGEST_ADMIN_PASSWORD') | |
| if not elastic_password: | |
| logger.error("Elastic Password not accessible in environment variable") | |
| sys.exit(1) | |
| if not github_token: | |
| logger.error("GitHub Token not accessible in environment variable") | |
| sys.exit(1) | |
| es = get_elasticsearch_connection(ES_URL) | |
| get_elasticsearch_index_info(es, es_index_name) | |
| github_api = GithubApi( | |
| github_url="https://api.github.com", | |
| username=github_login_username, | |
| token=github_token | |
| ) | |
| github_graphql = GithubGraphQL( | |
| username=github_login_username, | |
| token=github_token | |
| ) | |
| with open(datetime_file, "r") as dt_from_file: | |
| dt_in_file = datetime.fromisoformat(dt_from_file.read()) | |
| # Extract date and time components | |
| stop_at_date = dt_in_file.strftime("%Y-%m-%d") | |
| stop_at_date_include_time = dt_in_file.strftime("%H:%M:%S") | |
| logger.info(f"Date: {stop_at_date} and time: {stop_at_date_include_time}") | |
| get_ruleset_insights(stop_at_date=stop_at_date, stop_at_date_include_time=stop_at_date_include_time, page_num=0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment