Created
December 17, 2021 18:34
-
-
Save collinmutembei/16bbf96f4e81fe2445dffe96b9a718f5 to your computer and use it in GitHub Desktop.
Revisions
-
collinmutembei created this gist
Dec 17, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,119 @@ import os import requests from requests import exceptions, request class GitHubQuery: BASE_URL = "https://api.github.com/graphql" ORGANIZATION = "org_name" def __init__( self, github_token=None, query=None, variables=None, additional_headers=None ): self.github_token = github_token self.query = query self.variables = variables or dict() self.additional_headers = additional_headers or dict() @property def headers(self): default_headers = dict( Authorization=f"token {self.github_token}", ) return { **default_headers, **self.additional_headers } def generator(self): while(True): try: yield request( 'post', GitHubQuery.BASE_URL, headers=self.headers, json=dict(query=self.query, variables=self.variables) ).json() except exceptions.HTTPError as http_err: raise http_err except Exception as err: raise err def iterator(self): pass class Deployments(GitHubQuery): DEPLOYMENTS_QUERY = """ query GetDeployments ($after: String) { repository(name: "repo_name", owner: "org_name") { deployments(first: 100, after: $after) { nodes { id } pageInfo { endCursor startCursor hasNextPage } } } } """ QUERY_VARIABLES = dict(after='') ADDITIONAL_HEADERS = dict( Accept="application/vnd.github.vixen-preview+json", ) def __init__(self, github_token): super().__init__( github_token=github_token, query=Deployments.DEPLOYMENTS_QUERY, variables=Deployments.QUERY_VARIABLES, additional_headers=Deployments.ADDITIONAL_HEADERS ) def iterator(self): self.variables = dict(after="initial_cursor") generator = self.generator() hasNextPage = True repo_deployments = [] while(hasNextPage): response = next(generator) endCursor = response["data"]["repository"]["deployments"]["pageInfo"]["endCursor"] self.variables = dict(after=endCursor) repo_deployments.extend(response["data"]["repository"]["deployments"]["nodes"]) hasNextPage = response["data"]["repository"]["deployments"]["pageInfo"]["hasNextPage"] return repo_deployments if __name__ == "__main__": DEPLOYMENTS_MUTATION = """ mutation DeleteDeployment($deploymentId: ID!){ deleteDeployment(input: { id: $deploymentId }) { clientMutationId } } """ deployments_service = Deployments(github_token=os.environ.get("GITHUB_TOKEN")) all_deployments = deployments_service.iterator() for deployments in all_deployments: resp = requests.post( GitHubQuery.BASE_URL, json=dict( query=DEPLOYMENTS_MUTATION, variables=dict(deploymentId=deployments["id"]) ), headers=dict( Accept="application/vnd.github.vixen-preview+json", Authorization=f"token {os.environ.get('GITHUB_TOKEN')}" ) ) print(resp.json())