Skip to content

Instantly share code, notes, and snippets.

@ryanhanks-wf
Last active June 27, 2016 17:44
Show Gist options
  • Select an option

  • Save ryanhanks-wf/8474557e5dd361d9b2e81a88d6ff1b84 to your computer and use it in GitHub Desktop.

Select an option

Save ryanhanks-wf/8474557e5dd361d9b2e81a88d6ff1b84 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import datetime
import time
import threading
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client.client import GoogleCredentials
VERSION_NAME_WHITELIST = ['ah-builtin-datastoreservice', 'ah-builtin-python-bundle']
BACKOFF_RETRY_TIME = 60
BACKOFF_RETRIES = 5
responses_list = []
configuration_list = []
def main():
user_opts = parse_user_args()
appspot = user_opts.appspot
if not user_opts.list_appspots and not user_opts.clean_appspots and not user_opts.preview_clean:
print ('No effect. Check --help for available options.')
exit()
credentials = GoogleCredentials.get_application_default()
http = credentials.authorize(Http())
credentials.refresh(http)
client = build('appengine', 'v1beta5', credentials=credentials, http=http)
service_ids = ['default',
'bigskyf1',
'bigskyf4',
'high-memory-backend-module',
'validation',
'validationf1']
services = get_appspot_services(client, appspot, service_ids)
versions = get_appspot_service_versions(client, appspot, service_ids)
delete_appspot_service_versions(client, appspot, services, versions, keep_most_recent=1)
def parse_user_args():
parser = argparse.ArgumentParser(description='List/Clean versions on a given appspot.')
parser.add_argument('-A', metavar='appspot', dest='appspot', help='the appspot to deploy to. (Required)',
required=True)
parser.add_argument('--list', dest='list_appspots', default=False, action='store_true',
help='list all the app versions. (defaults False)')
parser.add_argument('--clean', dest='clean_appspots', default=False, action='store_true',
help='clean all but the latest 5 app versions. (defaults False)')
parser.add_argument('--preview', dest='preview_clean', default=False, action='store_true',
help='shows a preview of what the cleaning operation would do. (defaults False)')
return parser.parse_args()
def get_appspot_services(client, appspot, service_ids):
batch_request = client.new_batch_http_request()
services = {}
def _callback(request_id, response, exception):
service_id = request_id
if exception:
raise exception
services[service_id] = response
for service_id in service_ids:
get_service_request = client.apps().services().get(appsId=appspot,
servicesId=service_id)
batch_request.add(get_service_request, callback=_callback, request_id=service_id)
batch_request.execute()
return services
def get_appspot_service_versions(client, appspot, service_ids):
batch_request = client.new_batch_http_request()
versions = {}
def _callback(request_id, response, exception):
service_id = request_id
if exception:
raise expection
versions[service_id] = response['versions']
for service_id in service_ids:
request = client.apps().services().versions().list(
appsId=appspot,
servicesId=service_id)
batch_request.add(request, callback=_callback, request_id=service_id)
batch_request.execute()
return versions
def delete_appspot_service_versions(client, appspot, services, versions, keep_most_recent=2):
for service_id, service in services.iteritems():
versions_to_delete = get_version_ids_to_delete(service, versions, keep_most_recent)
version_id = versions_to_delete[0]
request = client.apps().services().versions().delete(
appsId=appspot,
servicesId=service_id,
versionsId=version_id
)
response = request.execute()
operation_name = response['name']
operation_id = operation_name.split('/')[-1]
import time
request = client.apps().operations().get(appsId=appspot, operationsId=operation_id)
operation_status = request.execute()
while operations_status['done'] is not True:
time.sleep(1)
request = client.apps().operations().get(appsId=appspot, operationsId=operation_id)
operation_status = request.execute()
def get_version_ids_to_delete(service, versions, keep_most_recent):
service_id = service['id']
return [version['id'] for version in versions[service_id]]
if __name__ == '__main__':
main()
@spencerbywater-wf
Copy link

119: Filter out latest versions and in-use versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment