Skip to content

Instantly share code, notes, and snippets.

@Yankzy
Last active May 17, 2025 21:50
Show Gist options
  • Select an option

  • Save Yankzy/9364bcf8026ff85eaa6a10fb8a5be778 to your computer and use it in GitHub Desktop.

Select an option

Save Yankzy/9364bcf8026ff85eaa6a10fb8a5be778 to your computer and use it in GitHub Desktop.
Delete Gmail Messages Older Than X Years Using Python & Gmail API

1. Set up your Google Cloud Project

  • Go to Google Cloud Console – Credentials page.
  • Select/create a project.
  • Click "+ CREATE CREDENTIALS > OAuth client ID".
  • Set Application type: Desktop app (recommended for local scripts).
  • Name it (e.g. "GmailCleanup").
  • Download the file as credentials.json.

2. Enable the Gmail API

  • Go to Gmail API page.
  • Click "Enable" if not already enabled for your project.

3. Set Redirect URI

  • When creating the OAuth Client ID, the correct redirect URIs for Desktop Apps are pre-filled.
  • If you use run_local_server(port=3005), also add http://localhost:3005/ to the "Authorized redirect URIs" in your OAuth client credentials properties (sometimes required).

4. Install Required Python Packages

pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

5. Save the Script

Save this script as delete_gmail.py:

import os
import base64
import datetime
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request

SCOPES = ['https://mail.google.com/']

class GmailHelper:
    def __init__(self, credentials_path='credentials.json', token_path='token.json', port=3005):
        self.credentials_path = credentials_path
        self.token_path = token_path
        self.port = port
        self.service = self._get_gmail_service()
    
    def _get_gmail_service(self):
        creds = None
        if os.path.exists(self.token_path):
            creds = Credentials.from_authorized_user_file(self.token_path, SCOPES)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(self.credentials_path, SCOPES)
                creds = flow.run_local_server(port=self.port)
            with open(self.token_path, 'w') as token:
                token.write(creds.to_json())
        service = build('gmail', 'v1', credentials=creds)
        return service

    # 1. List recent emails
    def list_messages(self, user_id='me', max_results=10):
        results = self.service.users().messages().list(userId=user_id, maxResults=max_results).execute()
        return results.get('messages', [])

    # 2. Read email content
    def get_message(self, message_id, user_id='me', format='full'):
        return self.service.users().messages().get(userId=user_id, id=message_id, format=format).execute()

    # 3. Search emails
    def search_messages(self, query, user_id='me', max_results=10):
        results = self.service.users().messages().list(userId=user_id, q=query, maxResults=max_results).execute()
        return results.get('messages', [])

    # 4. Send email
    def send_email(self, to, subject, body, user_id='me'):
        message = MIMEText(body)
        message['to'] = to
        message['subject'] = subject
        raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
        return self.service.users().messages().send(userId=user_id, body={'raw': raw}).execute()

    # 5. Modify labels/stars
    def modify_labels(self, message_id, add_labels=None, remove_labels=None, user_id='me'):
        add_labels = add_labels if add_labels else []
        remove_labels = remove_labels if remove_labels else []
        body = {'addLabelIds': add_labels, 'removeLabelIds': remove_labels}
        return self.service.users().messages().modify(userId=user_id, id=message_id, body=body).execute()

    # 6. Download attachments (simple version)
    def get_attachments(self, message_id, user_id='me', download_dir='attachments'):
        msg = self.get_message(message_id, user_id, format="full")
        if not os.path.exists(download_dir):
            os.makedirs(download_dir)
        files = []
        for part in msg['payload'].get('parts', []):
            if part.get('filename'):
                att_id = part['body'].get('attachmentId')
                if att_id:
                    att = self.service.users().messages().attachments().get(userId=user_id, messageId=message_id, id=att_id).execute()
                    data = att['data']
                    file_data = base64.urlsafe_b64decode(data.encode("UTF-8"))
                    path = os.path.join(download_dir, part['filename'])
                    with open(path, "wb") as f:
                        f.write(file_data)
                    files.append(path)
        return files

    # 7. Trash email
    def trash_message(self, message_id, user_id='me'):
        return self.service.users().messages().trash(userId=user_id, id=message_id).execute()

    # 8. Permanently delete email
    def delete_message(self, message_id, user_id='me'):
        return self.service.users().messages().delete(userId=user_id, id=message_id).execute()

    # 9. Restore from trash
    def untrash_message(self, message_id, user_id='me'):
        return self.service.users().messages().untrash(userId=user_id, id=message_id).execute()

    # 10. Create label
    def create_label(self, label_name, user_id='me'):
        label = {'name': label_name}
        return self.service.users().labels().create(userId=user_id, body=label).execute()

    # 11. List labels
    def list_labels(self, user_id='me'):
        return self.service.users().labels().list(userId=user_id).execute().get('labels', [])

    # 12. Batch modify
    def batch_modify(self, message_ids, add_labels=None, remove_labels=None, user_id='me'):
        add_labels = add_labels if add_labels else []
        remove_labels = remove_labels if remove_labels else []
        body = {
            'ids': message_ids,
            'addLabelIds': add_labels,
            'removeLabelIds': remove_labels
        }
        return self.service.users().messages().batchModify(userId=user_id, body=body).execute()

    # 13. Batch delete
    def batch_delete(self, message_ids, user_id='me'):
        body = {'ids': message_ids}
        return self.service.users().messages().batchDelete(userId=user_id, body=body).execute()

    # 14. Delete old emails (by years)
    def delete_old_emails(self, years=10, user_id='me'):
        date_n_years_ago = (datetime.datetime.utcnow() - datetime.timedelta(days=years*365))
        query = f'before:{date_n_years_ago.strftime("%Y/%m/%d")}'
        print(f"Searching with query: {query}")
        messages = []
        response = self.service.users().messages().list(userId=user_id, q=query).execute()
        if 'messages' in response:
            messages.extend(response['messages'])
        # Paginate
        while 'nextPageToken' in response:
            page_token = response['nextPageToken']
            response = self.service.users().messages().list(userId=user_id, q=query, pageToken=page_token).execute()
            if 'messages' in response:
                messages.extend(response['messages'])
        print(f"Found {len(messages)} messages older than {years} years. Proceeding to delete.")
        for msg in messages:
            print(f"Deleting email with id: {msg['id']}")
            self.delete_message(msg['id'], user_id=user_id)

# Example usage
if __name__ == "__main__":
    gmail = GmailHelper(port=3005)
    
    # List 5 messages
    msgs = gmail.list_messages(max_results=5)
    print("Recent messages:", msgs)

    # For each message, print snippet and subject
    for msg in msgs:
        full = gmail.get_message(msg['id'])
        subject = None
        for header in full['payload']['headers']:
            if header['name'] == 'Subject':
                subject = header['value']
                break
        print(f"ID: {msg['id']} | Subject: {subject} | Snippet: {full.get('snippet', '')[:50]}")

    # Example: search, send, delete old, etc.
    # search = gmail.search_messages('from:boss@example.com')
    # gmail.send_email('to@somewhere.com', 'Test', 'This is a test.')
    # gmail.delete_old_emails(years=10)

## How to use:
- Instantiate the class: gmail = GmailHelper(port=3005)
- Call methods, e.g.:
    - gmail.list_messages()
    - gmail.get_message(message_id)
    - gmail.send_email('to@domain.com', 'Subject', 'Body')
    - gmail.delete_old_emails(years=10)

7. Gotchas & Tips

  • If you want a dry run first: print but don't call delete()—or just comment that line.
  • Always backup or check your Gmail Trash after deletion.
  • Don't forget to regularly update your credentials (token.json), especially if you move the script to a new system.

Summary

  • Set up Google Cloud → Enable Gmail API → Download credentials.json
  • Install dependencies
  • Use & run the script
  • Confirm deletions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment