Skip to content

Instantly share code, notes, and snippets.

@collinmutembei
Created December 17, 2021 18:34
Show Gist options
  • Select an option

  • Save collinmutembei/16bbf96f4e81fe2445dffe96b9a718f5 to your computer and use it in GitHub Desktop.

Select an option

Save collinmutembei/16bbf96f4e81fe2445dffe96b9a718f5 to your computer and use it in GitHub Desktop.
Delete Github Deployments
import os
import requests
from requests import exceptions, request
class GitHubQuery:
BASE_URL = "https://api.github.com/graphql"
ORGANIZATION = "org_name"
def __init__(
self,
github_token=None,
query=None,
variables=None,
additional_headers=None
):
self.github_token = github_token
self.query = query
self.variables = variables or dict()
self.additional_headers = additional_headers or dict()
@property
def headers(self):
default_headers = dict(
Authorization=f"token {self.github_token}",
)
return {
**default_headers,
**self.additional_headers
}
def generator(self):
while(True):
try:
yield request(
'post',
GitHubQuery.BASE_URL,
headers=self.headers,
json=dict(query=self.query, variables=self.variables)
).json()
except exceptions.HTTPError as http_err:
raise http_err
except Exception as err:
raise err
def iterator(self):
pass
class Deployments(GitHubQuery):
DEPLOYMENTS_QUERY = """
query GetDeployments ($after: String) {
repository(name: "repo_name", owner: "org_name") {
deployments(first: 100, after: $after) {
nodes {
id
}
pageInfo {
endCursor
startCursor
hasNextPage
}
}
}
}
"""
QUERY_VARIABLES = dict(after='')
ADDITIONAL_HEADERS = dict(
Accept="application/vnd.github.vixen-preview+json",
)
def __init__(self, github_token):
super().__init__(
github_token=github_token,
query=Deployments.DEPLOYMENTS_QUERY,
variables=Deployments.QUERY_VARIABLES,
additional_headers=Deployments.ADDITIONAL_HEADERS
)
def iterator(self):
self.variables = dict(after="initial_cursor")
generator = self.generator()
hasNextPage = True
repo_deployments = []
while(hasNextPage):
response = next(generator)
endCursor = response["data"]["repository"]["deployments"]["pageInfo"]["endCursor"]
self.variables = dict(after=endCursor)
repo_deployments.extend(response["data"]["repository"]["deployments"]["nodes"])
hasNextPage = response["data"]["repository"]["deployments"]["pageInfo"]["hasNextPage"]
return repo_deployments
if __name__ == "__main__":
DEPLOYMENTS_MUTATION = """
mutation DeleteDeployment($deploymentId: ID!){
deleteDeployment(input: { id: $deploymentId }) {
clientMutationId
}
}
"""
deployments_service = Deployments(github_token=os.environ.get("GITHUB_TOKEN"))
all_deployments = deployments_service.iterator()
for deployments in all_deployments:
resp = requests.post(
GitHubQuery.BASE_URL,
json=dict(
query=DEPLOYMENTS_MUTATION,
variables=dict(deploymentId=deployments["id"])
),
headers=dict(
Accept="application/vnd.github.vixen-preview+json",
Authorization=f"token {os.environ.get('GITHUB_TOKEN')}"
)
)
print(resp.json())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment