#!/usr/bin/env python """MailBox class for processing IMAP email. (To use with Gmail: enable IMAP access in your Google account settings) usage with GMail: import mailbox with mailbox.MailBox(gmail_username, gmail_password) as mbox: print mbox.get_count() print mbox.print_msgs() for other IMAP servers, adjust settings as necessary. """ import imaplib import time import uuid from email import email IMAP_SERVER = 'imap.gmail.com' IMAP_PORT = '993' IMAP_USE_SSL = True class MailBox(object): def __init__(self, user, password): self.user = user self.password = password if IMAP_USE_SSL: self.imap = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) else: self.imap = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT) def __enter__(self): self.imap.login(self.user, self.password) return self def __exit__(self, type, value, traceback): self.imap.close() self.imap.logout() def get_count(self): self.imap.select('Inbox') status, data = self.imap.search(None, 'ALL') return sum(1 for num in data[0].split()) def fetch_message(self, num): self.imap.select('Inbox') status, data = self.imap.fetch(str(num), '(RFC822)') email_msg = email.message_from_string(data[0][1]) return email_msg def delete_message(self, num): self.imap.select('Inbox') self.imap.store(num, '+FLAGS', r'\Deleted') self.imap.expunge() def delete_all(self): self.imap.select('Inbox') status, data = self.imap.search(None, 'ALL') for num in data[0].split(): self.imap.store(num, '+FLAGS', r'\Deleted') self.imap.expunge() def print_msgs(self): self.imap.select('Inbox') status, data = self.imap.search(None, 'ALL') for num in reversed(data[0].split()): status, data = self.imap.fetch(num, '(RFC822)') print 'Message %s\n%s\n' % (num, data[0][1]) def get_latest_email_sent_to(self, email_address, timeout=300, poll=1): start_time = time.time() while ((time.time() - start_time) < timeout): # It's no use continuing until we've successfully selected # the inbox. And if we don't select it on each iteration # before searching, we get intermittent failures. status, data = self.imap.select('Inbox') if status != 'OK': time.sleep(poll) continue status, data = self.imap.search(None, 'TO', email_address) data = [d for d in data if d is not None] if status == 'OK' and data: for num in reversed(data[0].split()): status, data = self.imap.fetch(num, '(RFC822)') email_msg = email.message_from_string(data[0][1]) return email_msg time.sleep(poll) raise AssertionError("No email sent to '%s' found in inbox " "after polling for %s seconds." % (email_address, timeout)) def delete_msgs_sent_to(self, email_address): self.imap.select('Inbox') status, data = self.imap.search(None, 'TO', email_address) if status == 'OK': for num in reversed(data[0].split()): status, data = self.imap.fetch(num, '(RFC822)') self.imap.store(num, '+FLAGS', r'\Deleted') self.imap.expunge() if __name__ == '__main__': # example: imap_username = 'youremail@gmail.com' imap_password = 'secret' with MailBox(imap_username, imap_password) as mbox: print mbox.get_count() print mbox.print_msgs()