- Go to Google Cloud Console – Credentials page.
- Select/create a project.
- Click "+ CREATE CREDENTIALS > OAuth client ID".
- Set Application type: Desktop app (recommended for local scripts).
- Name it (e.g. "GmailCleanup").
- Download the file as
credentials.json.
- Go to Gmail API page.
- Click "Enable" if not already enabled for your project.
- When creating the OAuth Client ID, the correct redirect URIs for Desktop Apps are pre-filled.
- If you use
run_local_server(port=3005), also addhttp://localhost:3005/to the "Authorized redirect URIs" in your OAuth client credentials properties (sometimes required).
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlibSave this script as delete_gmail.py:
import os
import base64
import datetime
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
SCOPES = ['https://mail.google.com/']
class GmailHelper:
def __init__(self, credentials_path='credentials.json', token_path='token.json', port=3005):
self.credentials_path = credentials_path
self.token_path = token_path
self.port = port
self.service = self._get_gmail_service()
def _get_gmail_service(self):
creds = None
if os.path.exists(self.token_path):
creds = Credentials.from_authorized_user_file(self.token_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(self.credentials_path, SCOPES)
creds = flow.run_local_server(port=self.port)
with open(self.token_path, 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
return service
# 1. List recent emails
def list_messages(self, user_id='me', max_results=10):
results = self.service.users().messages().list(userId=user_id, maxResults=max_results).execute()
return results.get('messages', [])
# 2. Read email content
def get_message(self, message_id, user_id='me', format='full'):
return self.service.users().messages().get(userId=user_id, id=message_id, format=format).execute()
# 3. Search emails
def search_messages(self, query, user_id='me', max_results=10):
results = self.service.users().messages().list(userId=user_id, q=query, maxResults=max_results).execute()
return results.get('messages', [])
# 4. Send email
def send_email(self, to, subject, body, user_id='me'):
message = MIMEText(body)
message['to'] = to
message['subject'] = subject
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
return self.service.users().messages().send(userId=user_id, body={'raw': raw}).execute()
# 5. Modify labels/stars
def modify_labels(self, message_id, add_labels=None, remove_labels=None, user_id='me'):
add_labels = add_labels if add_labels else []
remove_labels = remove_labels if remove_labels else []
body = {'addLabelIds': add_labels, 'removeLabelIds': remove_labels}
return self.service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()
# 6. Download attachments (simple version)
def get_attachments(self, message_id, user_id='me', download_dir='attachments'):
msg = self.get_message(message_id, user_id, format="full")
if not os.path.exists(download_dir):
os.makedirs(download_dir)
files = []
for part in msg['payload'].get('parts', []):
if part.get('filename'):
att_id = part['body'].get('attachmentId')
if att_id:
att = self.service.users().messages().attachments().get(userId=user_id, messageId=message_id, id=att_id).execute()
data = att['data']
file_data = base64.urlsafe_b64decode(data.encode("UTF-8"))
path = os.path.join(download_dir, part['filename'])
with open(path, "wb") as f:
f.write(file_data)
files.append(path)
return files
# 7. Trash email
def trash_message(self, message_id, user_id='me'):
return self.service.users().messages().trash(userId=user_id, id=message_id).execute()
# 8. Permanently delete email
def delete_message(self, message_id, user_id='me'):
return self.service.users().messages().delete(userId=user_id, id=message_id).execute()
# 9. Restore from trash
def untrash_message(self, message_id, user_id='me'):
return self.service.users().messages().untrash(userId=user_id, id=message_id).execute()
# 10. Create label
def create_label(self, label_name, user_id='me'):
label = {'name': label_name}
return self.service.users().labels().create(userId=user_id, body=label).execute()
# 11. List labels
def list_labels(self, user_id='me'):
return self.service.users().labels().list(userId=user_id).execute().get('labels', [])
# 12. Batch modify
def batch_modify(self, message_ids, add_labels=None, remove_labels=None, user_id='me'):
add_labels = add_labels if add_labels else []
remove_labels = remove_labels if remove_labels else []
body = {
'ids': message_ids,
'addLabelIds': add_labels,
'removeLabelIds': remove_labels
}
return self.service.users().messages().batchModify(userId=user_id, body=body).execute()
# 13. Batch delete
def batch_delete(self, message_ids, user_id='me'):
body = {'ids': message_ids}
return self.service.users().messages().batchDelete(userId=user_id, body=body).execute()
# 14. Delete old emails (by years)
def delete_old_emails(self, years=10, user_id='me'):
date_n_years_ago = (datetime.datetime.utcnow() - datetime.timedelta(days=years*365))
query = f'before:{date_n_years_ago.strftime("%Y/%m/%d")}'
print(f"Searching with query: {query}")
messages = []
response = self.service.users().messages().list(userId=user_id, q=query).execute()
if 'messages' in response:
messages.extend(response['messages'])
# Paginate
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
if 'messages' in response:
messages.extend(response['messages'])
print(f"Found {len(messages)} messages older than {years} years. Proceeding to delete.")
for msg in messages:
print(f"Deleting email with id: {msg['id']}")
self.delete_message(msg['id'], user_id=user_id)
# Example usage
if __name__ == "__main__":
gmail = GmailHelper(port=3005)
# List 5 messages
msgs = gmail.list_messages(max_results=5)
print("Recent messages:", msgs)
# For each message, print snippet and subject
for msg in msgs:
full = gmail.get_message(msg['id'])
subject = None
for header in full['payload']['headers']:
if header['name'] == 'Subject':
subject = header['value']
break
print(f"ID: {msg['id']} | Subject: {subject} | Snippet: {full.get('snippet', '')[:50]}")
# Example: search, send, delete old, etc.
# search = gmail.search_messages('from:boss@example.com')
# gmail.send_email('to@somewhere.com', 'Test', 'This is a test.')
# gmail.delete_old_emails(years=10)
## How to use:
- Instantiate the class: gmail = GmailHelper(port=3005)
- Call methods, e.g.:
- gmail.list_messages()
- gmail.get_message(message_id)
- gmail.send_email('to@domain.com', 'Subject', 'Body')
- gmail.delete_old_emails(years=10)- If you want a dry run first: print but don't call
delete()—or just comment that line. - Always backup or check your Gmail Trash after deletion.
- Don't forget to regularly update your credentials (
token.json), especially if you move the script to a new system.
- Set up Google Cloud → Enable Gmail API → Download
credentials.json - Install dependencies
- Use & run the script
- Confirm deletions