Skip to content

Instantly share code, notes, and snippets.

@ismisepaul
Last active March 5, 2024 23:28
Show Gist options
  • Select an option

  • Save ismisepaul/03acb1d5319b9cce5066fb10e71deea9 to your computer and use it in GitHub Desktop.

Select an option

Save ismisepaul/03acb1d5319b9cce5066fb10e71deea9 to your computer and use it in GitHub Desktop.
import copy
import hashlib
import json
import os
import sys
from datetime import datetime, timezone, timedelta
import elasticsearch
import elastic_transport
import requests
from elasticsearch.helpers import scan
from loguru import logger
from requests.auth import HTTPBasicAuth
logger.remove()
logger.add(sys.stderr, level="INFO")
class GithubApi:
def __init__(self, github_url: str, username: str, token: str, proxies: dict = None, self_signed_cert: str = None):
self.url = github_url
logger.debug(f"Base URL: {self.url}")
self.basic_auth = HTTPBasicAuth(username, token)
logger.debug(f"Username: {username}")
self.proxies = proxies
self.self_signed_cert = self_signed_cert
def list_organization_rule_suites(self, organization, repository_name: str = None, time_period: str = "day",
actor_name: str = None,
rule_suite_result: str = "all", per_page=30, page=1):
query_params = {
'page': page,
'per_page': per_page,
'repository_name': repository_name,
'time_period': time_period,
'actor_name': actor_name,
'rule_suite_result': rule_suite_result
}
url = self.url + f"/orgs/{organization}/rulesets/rule-suites"
response = requests.get(
url=url,
auth=self.basic_auth,
params=query_params
)
logger.info(f"Got page {page} from {response.url}")
if response.status_code == 200:
return response
else:
logger.warning(response.status_code)
logger.warning(response.content)
return response
def get_organization_rule_suite(self, organization, rule_suite_id):
url = self.url + f"/orgs/{organization}/rulesets/rule-suites/{rule_suite_id}"
response = requests.get(
url=url,
auth=self.basic_auth,
)
if response.status_code == 200:
return response
else:
logger.warning(response.status_code)
logger.warning(response.content)
return response
def get_commits(self, owner, repo, sha=None, path=None, author=None, committer=None, since=None, until=None,
per_page=30, page=1):
url = self.url + f"/repos/{owner}/{repo}/commits"
query_params = {
'sha': sha,
'path': path,
'author': author,
'committer': committer,
'since': since,
'until': until,
'per_page': per_page,
'page': page
}
response = requests.get(
url=url,
auth=self.basic_auth,
params=query_params
)
if response.status_code == 200:
return response
else:
logger.warning(response.status_code)
logger.warning(response.content)
return response
def get_commit(self, owner, repo, ref):
url = self.url + f"/repos/{owner}/{repo}/git/commits/{ref}"
response = requests.get(
url=url,
auth=self.basic_auth
)
if response.status_code == 200:
return response
else:
logger.warning(f"Could not get {url}")
logger.warning(response.status_code)
logger.warning(response.content)
return response
class GithubGraphQL:
def __init__(
self, username: str, token: str, github_url: str = 'https://api.github.com/graphql',
proxies: dict = None, self_signed_cert: str = None
):
self.url = github_url
logger.debug(f"GraphQL URL: {self.url}")
self.basic_auth = HTTPBasicAuth(username, token)
logger.debug(f"Username: {username}")
self.proxies = proxies
self.self_signed_cert = self_signed_cert
logger.debug("Initializing GithubGraphQLTeams")
def get_permission_user_has_on_repository(self, github_org, github_username, repo_name):
graphql_query = f"""
{{
organization(login: "{github_org}") {{
repository(name: "{repo_name}") {{
collaborators(first: 100, query: "{github_username}") {{
edges {{
permission
node {{
login
}}
}}
}}
}}
}}
}}
"""
response = requests.post(
url=self.url,
auth=self.basic_auth,
json={"query": graphql_query}
)
if response.status_code == 200:
return response
else:
logger.warning(response.status_code)
logger.warning(response.content)
return response
def compare_dates(datetime_str_to_compare: str, specified_date_str: str, specified_time_str: str) -> bool:
datetime_to_compare = datetime.fromisoformat(datetime_str_to_compare)
specified_date = datetime.strptime(f"{specified_date_str} {specified_time_str}", "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
if specified_date > datetime_to_compare:
logger.info(f"{specified_date} > {datetime_to_compare}")
return True
else:
logger.info(f"{specified_date} < {datetime_to_compare}")
return False
def generate_unique_id(json_object):
"""
Create an Elasticsearch document ID by sha256'in the full JSON object
:param json_object:
:return:
"""
json_string = json.dumps(json_object, sort_keys=True)
sha256_hash = hashlib.sha256()
sha256_hash.update(json_string.encode('utf-8'))
unique_id = sha256_hash.hexdigest()
return unique_id
def get_elasticsearch_connection(elasticsearch_host):
"""
Create a connection to Elasticsearch and return a client to interact with the server
:return: elasticsearch client
"""
logger.info(f"Connecting to Elasticsearch: {elasticsearch_host}")
try:
es_client = elasticsearch.Elasticsearch(
elasticsearch_host,
basic_auth=(elastic_username, elastic_password),
request_timeout=30
)
return es_client
except elastic_transport.ConnectionTimeout as err:
logger.error(err.errors)
return None
except elastic_transport.ConnectionError as err:
logger.error(err.errors)
return None
def get_elasticsearch_index_info(es_conn: elasticsearch.Elasticsearch, elasticsearch_index):
"""
Get information on the index specified in the config.yml
:param elasticsearch_index: the index in Elasticsearch
:param es_client: an elasticsearch client connection
"""
try:
es_index = elasticsearch_index
index_exists = es_conn.indices.exists(index=es_index)
if index_exists:
logger.info(f"Index Metadata: {es_conn.indices.get(index=elasticsearch_index)}")
else:
logger.error(f"Specified Index {es_index} doesn't exist")
sys.exit(1)
except elasticsearch.ApiError as err:
logger.error(err)
sys.exit(1)
def get_elasticsearch_index_ids(es_conn: elasticsearch.Elasticsearch, es_index_name: str):
logger.info(f"Getting document ids from {es_index_name}")
scan_result = scan(es_conn, query={"query": {"match_all": {}}}, index=es_index_name)
document_ids = [doc_id['_id'] for doc_id in scan_result]
return document_ids
def create_flat_json_object(original_object) -> list:
rule_evaluations = original_object.get("rule_evaluations", [])
# Remove the rule_evaluations from the original object
original_object.pop("rule_evaluations", None)
split_objects = []
for rule_evaluation in rule_evaluations:
new_object = copy.deepcopy(original_object)
# Add the current rule_evaluation to the new object
new_object["rule_evaluations"] = [rule_evaluation]
split_objects.append(new_object)
return split_objects
def get_commit_meta(repo, sha):
result = github_api.get_commit(github_org, repo, sha)
if result.status_code != 200:
return None
commit_meta = {
'commit_author_name': result.json().get('author')['name'],
'commit_author_email': result.json().get('author')['email'],
'committer_name': result.json().get('committer')['name'],
'committer_email': result.json().get('committer')['email'],
'commit_message': result.json().get('message')
}
return commit_meta
def upload_ruleset_insight_to_elasticsearch(ruleset_insights: list, page_num, stop_at_date: str = None,
stop_at_date_include_time: str = None, stop_when_dup_doc_exists=False):
logger.info("Uploading to Elasticsearch")
index_document_ids = None
count = 0
if stop_when_dup_doc_exists:
index_document_ids = get_elasticsearch_index_ids(es, es_index_name)
for ruleset_insight in ruleset_insights:
rs_result = github_api.get_organization_rule_suite(github_org, ruleset_insight.get('id'))
rs_result_json = rs_result.json()
ruleset_meta = create_flat_json_object(rs_result_json)
for meta_data in ruleset_meta:
commit_meta = get_commit_meta(meta_data.get('repository_name'), meta_data.get('after_sha'))
if commit_meta:
combined_meta = {**meta_data, **commit_meta}
else:
combined_meta = meta_data
repo_permission = get_permission_user_has_on_repository(github_org, meta_data.get('actor_name'), meta_data.get('repository_name'))
if repo_permission:
combined_meta['permission'] = repo_permission
if stop_at_date:
datetime_from_api = rs_result_json.get('pushed_at')
is_date_greater = compare_dates(
datetime_str_to_compare=datetime_from_api, specified_date_str=stop_at_date,
specified_time_str=stop_at_date_include_time
)
if is_date_greater:
logger.info(f"Datetime {stop_at_date} {stop_at_date_include_time} has been reached")
sys.exit(0)
logger.debug(combined_meta)
document_id = generate_unique_id(combined_meta)
if index_document_ids: # check if document already exists
if document_id in index_document_ids:
logger.warning("Document already exists in Elasticsearch")
logger.debug(combined_meta)
return False
es_result = es.index(index=es_index_name, body=combined_meta, id=document_id)
count += 1
logger.info(f"{count} documents indexed from page {page_num}")
logger.debug(es_result)
return True
def get_ruleset_insights(
stop_at_date: str = None, stop_at_date_include_time: str = None, stop_when_dup_doc_exists=False,
page_num=0, per_page=100
):
is_new_doc = True # the document doesn't exist in Elasticsearch
while is_new_doc:
list_organization_rule_suites = github_api.list_organization_rule_suites(
organization=github_org, per_page=per_page, page=page_num, time_period="month"
)
if list_organization_rule_suites.status_code != 200:
logger.error(list_organization_rule_suites.content)
logger.error(f"Failed at page {page_num}")
sys.exit(1)
ruleset_insights = list_organization_rule_suites.json()
if page_num == 0:
dt = datetime.fromisoformat(ruleset_insights[0].get('pushed_at'))
dt_minus_five = dt - timedelta(minutes=5)
with open(datetime_file, "w") as dt_file:
dt_file.write(dt_minus_five.isoformat())
logger.info("Wrote datetime file")
upload_finish = upload_ruleset_insight_to_elasticsearch(
ruleset_insights=ruleset_insights, page_num=page_num, stop_when_dup_doc_exists=stop_when_dup_doc_exists,
stop_at_date=stop_at_date, stop_at_date_include_time=stop_at_date_include_time
)
page_num += 1
if not upload_finish:
is_new_doc = False
logger.info("Finished getting ruleset result")
def get_permission_user_has_on_repository(github_org, github_username, repo_name):
response = github_graphql.get_permission_user_has_on_repository(github_org, github_username, repo_name).json()
try:
return response.get('data')['organization']['repository']['collaborators']['edges'][0]['permission']
except IndexError:
logger.warning(f"Could not get permission for {github_username} in the repo {repo_name} for the org {github_org}")
return None
if __name__ == "__main__":
ES_URL = "https://localhost:443"
elastic_username = "ingest_admin"
es_index_name = "org-rulesets"
github_org = 'elastic'
datetime_file = 'datetime.txt'
github_login_username = 'ismisepaul'
github_token = os.getenv('GH_ADMIN_TOKEN')
elastic_password = os.getenv('ELASTIC_BRIGID_INGEST_ADMIN_PASSWORD')
if not elastic_password:
logger.error("Elastic Password not accessible in environment variable")
sys.exit(1)
if not github_token:
logger.error("GitHub Token not accessible in environment variable")
sys.exit(1)
es = get_elasticsearch_connection(ES_URL)
get_elasticsearch_index_info(es, es_index_name)
github_api = GithubApi(
github_url="https://api.github.com",
username=github_login_username,
token=github_token
)
github_graphql = GithubGraphQL(
username=github_login_username,
token=github_token
)
with open(datetime_file, "r") as dt_from_file:
dt_in_file = datetime.fromisoformat(dt_from_file.read())
# Extract date and time components
stop_at_date = dt_in_file.strftime("%Y-%m-%d")
stop_at_date_include_time = dt_in_file.strftime("%H:%M:%S")
logger.info(f"Date: {stop_at_date} and time: {stop_at_date_include_time}")
get_ruleset_insights(stop_at_date=stop_at_date, stop_at_date_include_time=stop_at_date_include_time, page_num=0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment