30 Apr 2022 starting note.
Scripts usable with any email inbox supporting the IMAP protocol (=most).
IMAP is the standard protocol for managing email maiboxes (with POP).
In some cases, IMAP needs to be activated, eg with Gmail.
09 Sep 2022
For Google/Gmail:it is required to allow "Less secure apps" for the script to work:
https://myaccount.google.com/lesssecureappsHowever Google announced...
"To help keep your account secure, from May 30, 2022, Google no longer supports the use of third-party apps or devices which ask you to sign in to your Google Account using only your username and password.
This deadline does not apply to Google Workspace or Google Cloud Identity customers. The enforcement date for these customers will be announced on the Workspace blog at a later date."= I'm currently using Google Workspace, so all good for now, but need to rework login logic in the future, or use different library for it.
Mass delete emails
Logic: delete email if keyword(s) provided are in To
, From
and/or Subject
fields.
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
####################
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT_ND")
PASSWORD = os.getenv("PASSWORD_ND")
EMAIL_SERVER = os.getenv("EMAIL_SERVER_ND")
print(f"\n\n---\nCHECK CREDENTIALS:\n{EMAIL_ACCOUNT=}\n{PASSWORD=}\n---\n")
count_errors = 0
count_deleted = 0
delete_if_in_to = ( #needs to be a tuple
'xxx',
'yyy',
'zzz',
)
delete_if_in_from = ( #needs to be a tuple
'xxx',
'yyy',
'zzz',
)
delete_if_in_subject = ( #needs to be a tuple
'xxx',
'yyy',
'zzz',
)
with MailBox(EMAIL_SERVER).login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
# for msg in mailbox.fetch(AND(from_='Mail Delivery'), mark_seen=False): # example with search string
for msg in mailbox.fetch(mark_seen=False, reverse=True, bulk=True): # get all emails from most recent without changing read status
### REFERENCE
# criteria = ‘ALL’, message search criteria, query builder
# charset = ‘US-ASCII’, indicates charset of the strings that appear in the search criteria. See rfc2978
# limit = None, limit on the number of read emails, useful for actions with a large number of messages, like “move”
# miss_no_uid = True, miss emails without uid
# mark_seen = True, mark emails as seen on fetch
# reverse = False, in order from the larger date to the smaller
# headers_only = False, get only email headers (without text, html, attachments)
# bulk = False, False - fetch each message separately per N commands - low memory consumption, slow; True - fetch all messages per 1 command - high memory consumption, fast
count += 1
print("\r" + str(count), end='')
### DELETE
## BASED ON TO FIELD (delete_if_in_to)
try:
if len(msg.to) > 0:
if any(ele in msg.to[0] for ele in delete_if_in_to):
print(count)
print(f"{msg.date_str=}")
print(f"{msg.from_=}")
print(f"{msg.to=}")
print(f"{msg.subject=}")
print(f"DELETING {msg.uid=}")
mailbox.delete([msg.uid])
count_deleted += 1
print()
except Exception as e:
print(f"ERROR with {date}-{msg.from_}: {e}")
count_errors += 1
continue
## BASED ON FROM FIELD (delete_if_in_from)
try:
if any(ele in msg.from_ for ele in delete_if_in_from):
print(count)
print(f"{msg.date_str=}")
print(f"{msg.from_=}")
print(f"{msg.to=}")
print(f"{msg.subject=}")
print(f"DELETING {msg.uid=}")
mailbox.delete([msg.uid])
count_deleted += 1
print()
except Exception as e:
print(f"ERROR with {date}-{msg.from_}: {e}")
count_errors += 1
continue
## BASED ON SUBJECT FIELD (delete_if_in_subject)
try:
if any(ele in msg.from_ for ele in delete_if_in_subject):
print(count)
print(f"{msg.date_str=}")
print(f"{msg.from_=}")
print(f"{msg.to=}")
print(f"{msg.subject=}")
print(f"DELETING {msg.uid=}")
mailbox.delete([msg.uid])
count_deleted += 1
print()
except Exception as e:
print(f"ERROR with {date}-{msg.from_}: {e}")
count_errors += 1
continue
Output:
-------------------------------
Count Total Emails = 17703 emails found
count_errors = 0
Email DELETED = 209
-------------------------------
imapee_mail-nd-com.py finished in 0.9 minutes at 16:06:15.
Download all attachments from a mailbox
Save all attachments from all emails in a mailbox to a local folder with file pattern date-from_email-filename
:
from dotenv import load_dotenv
load_dotenv()
from imap_tools import MailBox, AND, MailMessageFlags
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT")
PASSWORD = os.getenv("PASSWORD")
EMAIL_SERVER = os.getenv("EMAIL_SERVER")
print(f"\n\n---\nCHECK CREDENTIALS:\n{EMAIL_ACCOUNT=}\n{PASSWORD=}\n---\n")
save_to = '/path/to/local/folder'
count = 0
count_errors = 0
count_attach = 0
list_errors = []
with MailBox(EMAIL_SERVER).login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
for msg in mailbox.fetch(mark_seen=False, reverse=True, bulk=False): # get all emails from most recent without changing read status
count += 1
date = msg.date.strftime('%y%m%d')
for att in msg.attachments:
count_attach += 1
print(count, date, att.filename, att.content_type)
try:
with open(f'{save_to}/{date}-{msg.from_}-{att.filename}', 'wb') as f:
f.write(att.payload)
except Exception as e:
print(f"ERROR with {date}-{msg.from_}-{att.filename}: {e}")
list_errors.append(f"{date}-{msg.from_}-{att.filename}")
count_errors += 1
continue
print()
for error in list_errors:
print(error)
10 Sep 2022
Mass "Mark as Read" emails / Out Of Office
For example after sending an email campaign, need to mass "mark as read" the Out Of Office and other Undeliverables emails, following specific criteria (leaving as unread previous unrelated emails).
Native web UI of email providers is usually very limited to filter properly. This script allows to mass Mark As Read based on specific keywords in From
, To
, Subject
or Body
.
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
####################
# List of domains to exclude from email address collection
blacklist_domains = grist_XX.get_blacklist_domains()
print(f"{len(blacklist_domains)} domains blacklisted for XX")
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT_XX")
PASSWORD = os.getenv("PASSWORD_XX")
print(f"\n\nCHECK CREDENTIALS:\n{EMAIL_ACCOUNT=}\n{PASSWORD=}\n")
####################
from imap_tools import MailBox, AND, MailMessageFlags
run = 'out_of_office' # out_of_office / clean_mail_delivery / sweep
list_remove = [
'?',
'png',
'reply',
'DAEMON',
'Daemon',
'jpg@',
'xml@',
]
count_mark_as_read = 0
original_stdout = sys.stdout
with MailBox('imap.gmail.com').login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
# mailbox.folder.set('[Gmail]/Spam')
if run == 'out_of_office':
print(f"Starting out_of_office run...\n")
ooo_keywords_subject = [
'Automatic reply',
# 'ponse automatique:', # Réponse automatique:: 'ascii' codec can't encode character '\xe9' in position 11: ordinal not in range(128)
'automatique:', ### workaround for Réponse automatique as seems to work only matching full words?
'Autorespond',
'Automatische Antwort',
'OOO',
'Out-of-Office',
# 'Automatyczna odpowied', # Automatyczna odpowiedź:: 'ascii' codec can't encode character '\u017a' in position 31: ordinal not in range(128)
# 'Automatyczna odpowiedź'.encode("utf-8"),
'Automatyczna odpowiedź',
'Automatyczna',
# 'Respuesta autom', # Respuesta automática: 'ascii' codec can't encode character '\xe1' in position 25: ordinal not in range(128)
'Respuesta',
'Autoresponder',
'Risposta Automatica',
'Automatisk svar',
'Automatisch antwoord',
'AUTO:',
'On Maternity Leave',
'Out of the office',
'Out of office',
'Out Of Office',
'Risposta automatica',
'Pregnancy leave',
'Thank you for your email',
'Autosvar',
]
## NOTE: still issues with Unicode characters so workarounds above. Opened note here to troubleshoot: https://notes.nicolasdeville.com/python/unicode/
for keyword in ooo_keywords_subject:
print(f"running {keyword}")
try:
for msg in mailbox.fetch(AND(subject=keyword), mark_seen=True): # mark_seen=True is the core of this script, combined with AND(subject=keyword) which applies it only to emails where keyword in subject matches
try:
count += 1
print(f'\n{run}---------------{keyword}')
print(f"run {keyword}: #{count}")
print('msg.uid', type(msg.uid), '------', msg.uid)
print('msg.from_', type(msg.from_), '------', msg.from_)
print('msg.subject', type(msg.subject), '------', msg.subject)
except Exception as e:
print(f"ERROR with message uid {msg.uid}: {e}")
except Exception as e:
print(f"ERROR with {keyword}: {e}")
########################################################################################################
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
Collect undeliverable emails and mark as read
When running campaigns, a number of emails will come back as undeliverable. No matter how good email cleaning and validation was.
12 Sep 2022
Here is my script to retrieve all emails returned as undeliverable from my Inbox, and mark all email notifications as read.
This generates a log file (.txt
) for each run with all emails, as backup. Data (ie list of email) can be processed subsequently - eg update the "Do Not Email" flag in a system (Hubspot, Grist like below, etc..).
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
####################
# List of domains to exclude from email address collection
blacklist_domains = grist_XX.get_blacklist_domains()
print(f"{len(blacklist_domains)} domains blacklisted for XX")
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT_XX")
PASSWORD = os.getenv("PASSWORD_XX")
print(f"\n\nCHECK CREDENTIALS:\n{EMAIL_ACCOUNT=}\n{PASSWORD=}\n")
####################
new_emails_in_csv = []
from imap_tools import MailBox, AND, MailMessageFlags
import re
run = 'clean_mail_delivery' # out_of_office / clean_mail_delivery / sweep # used as part of the larger script, to switch function
EMAIL_REGEX1 = r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"""
# EMAIL_REGEX2 = r"[a-z0-9\.\-+_]+@[a-z0-9\.\-+_]+\.[a-z]+"
set_emails_dne = set() # Set of dicts: {email: , src:, first: }
list_remove = [
'?',
'png',
'reply',
'DAEMON',
'Daemon',
]
count_mark_as_read = 0
original_stdout = sys.stdout
with MailBox('imap.gmail.com').login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
# mailbox.folder.set('[Gmail]/Spam')
if run == 'clean_mail_delivery':
with open(f"log/{ts_file}_{run}_log.txt", 'w') as f:
print(f"Starting clean_mail_delivery run...\n")
delivery_keywords_from = [
'Microsoft Outlook',
'Postmaster',
'Mail Delivery',
'Microsoft Outlook',
]
delivery_keywords_subject = [
'Delivery Status Notification',
'Unzustellbar',
'No longer with',
'Mail Delivery',
'Microsoft Outlook',
'Undeliverable',
'Email Delivery Failure',
]
### keywords in From
try:
for keyword in delivery_keywords_from:
try:
for msg in mailbox.fetch(AND(from_=keyword), mark_seen=True):
count += 1
print(f'\n{run}---------------{keyword}')
print(count)
print('msg.uid', type(msg.uid), '------', msg.uid)
print('msg.from_', type
(msg.from_), '------', msg.from_)
print('msg.to', type(msg.to), '------', msg.to)
email_clean = msg.from_values.email
domain = my_utils.domain_from_email(email_clean)
if domain not in blacklist_domains:
if not any(element in email_clean for element in list_remove):
set_emails_dne.add(email_clean)
### write to file as backup
sys.stdout = f
print(email_clean)
sys.stdout = original_stdout
print('msg.subject', type(msg.subject), '------', msg.subject)
print("Emails from body:")
for re_match in re.finditer(EMAIL_REGEX1, msg.html.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
# if 'reply' not in email_clean and '?' not in email_clean:
set_emails_dne.add(new_email)
### write to file as backup
sys.stdout = f
print(new_email)
sys.stdout = original_stdout
for re_match in re.finditer(EMAIL_REGEX1, msg.text.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
# if 'reply' not in email_clean and '?' not in email_clean:
set_emails_dne.add(new_email)
### write to file as backup
sys.stdout = f
print(new_email)
sys.stdout = original_stdout
except Exception as e:
print(f"--- ERROR with {msg.uid}: {e}")
continue
except Exception as e:
print(f"--- ERROR with {keyword}: {e}")
### keywords in Subject line
try:
for keyword in delivery_keywords_subject:
try:
for msg in mailbox.fetch(AND(subject=keyword), mark_seen=True):
count += 1
print(f'\n{run}---------------{keyword}')
print(count)
print('msg.uid', type(msg.uid), '------', msg.uid)
print('msg.from_', type
(msg.from_), '------', msg.from_)
print('msg.to', type(msg.to), '------', msg.to)
email_clean = msg.from_values.email
domain = my_utils.domain_from_email(email_clean)
if domain not in blacklist_domains:
if not any(element in email_clean for element in list_remove):
set_emails_dne.add(email_clean)
### write to file as backup
sys.stdout = f
print(email_clean)
sys.stdout = original_stdout
print('msg.subject', type(msg.subject), '------', msg.subject)
print("Emails from body:")
for re_match in re.finditer(EMAIL_REGEX1, msg.html.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
# if 'reply' not in email_clean and '?' not in email_clean:
set_emails_dne.add(new_email)
### write to file as backup
sys.stdout = f
print(new_email)
sys.stdout = original_stdout
for re_match in re.finditer(EMAIL_REGEX1, msg.text.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
# if 'reply' not in email_clean and '?' not in email_clean:
set_emails_dne.add(new_email)
### write to file as backup
sys.stdout = f
print(new_email)
sys.stdout = original_stdout
except Exception as e:
print(f"--- ERROR with {msg.uid}: {e}")
continue
except Exception as e:
print(f"--- ERROR with {keyword}: {e}")
### Update Grist with DNE
list_to_update = []
for email in set_emails_dne:
if email in dict_existing_emails:
list_to_update.append({'id': dict_existing_emails[email], 'dne': True,})
if len(list_to_update) > 0:
grist_XX.Contacts.update_records('Master', list_to_update)
print(f"{len(list_to_update)} updated with DNE in XX Contacts")
### Print set_emails_dne
print("set_emails_dne:")
for email in set_emails_dne:
print(email)
print()
print('set_emails_dne: ', len(set_emails_dne))
########################################################################################################
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
Sweep mailbox for new email addresses not in CRM
13 Sep 2022
From all emails, incl. Out Of Office messages, in the From/Cc fields or email body.
Good way to fish out contacts not in database/CRM.
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
####################
# List of domains to exclude from email address collection
blacklist_domains = grist_XX.get_blacklist_domains()
print(f"{len(blacklist_domains)} domains blacklisted for XX")
EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT_XX")
PASSWORD = os.getenv("PASSWORD_XX")
print(f"\n\nCHECK CREDENTIALS:\n{EMAIL_ACCOUNT=}\n{PASSWORD=}\n")
####################
###
new_emails_in_csv = []
from imap_tools import MailBox, AND, MailMessageFlags
import re
run = 'sweep' # out_of_office / clean_mail_delivery / sweep
if run == 'sweep':
# Dict of existing XX emails
dict_existing_emails = {}
data_contacts_XX = grist_XX.Contacts.fetch_table('Master')
for contact in data_contacts_XX:
dict_existing_emails[contact.email] = contact.id
EMAIL_REGEX1 = r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"""
# EMAIL_REGEX2 = r"[a-z0-9\.\-+_]+@[a-z0-9\.\-+_]+\.[a-z]+"
set_new_emails = set()
# for sweeping emails:
new_emails_csv = f'data/{ts_file}-new_emails.csv' # timestamp allows to have one file per run (avoid overwrites, enables analysis/troubleshooting). Can be removed to overwrite the same file at every run.
list_remove = [
'?',
'png',
'reply',
'DAEMON',
'Daemon',
'jpg@',
'xml@',
]
count_mark_as_read = 0
original_stdout = sys.stdout
with MailBox('imap.gmail.com').login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
# mailbox.folder.set('[Gmail]/Spam')
if run == 'sweep':
print(f"Starting sweep run...\n")
try:
for msg in mailbox.fetch(mark_seen=False):
try:
count += 1
print(f'\n{run}---------------{datetime.now().strftime("%y%m%d-%H%M")}')
print(count)
print(f'msg.uid {type(msg.uid)}\t{msg.uid}')
print(f'msg.uid {type(msg.from_)}\t{msg.from_}')
print('msg.subject', type(msg.subject), '------', msg.subject)
# BODY
# print('msg.text', type(msg.text), '------', msg.text)
# print('msg.html', type(msg.html), '------', msg.html)
# EMAILS FROM TEXT
print("Emails from body:")
for re_match in re.finditer(EMAIL_REGEX1, msg.text.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if new_email not in new_emails_in_csv:
if new_email not in dict_existing_emails:
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
set_new_emails.add(new_email)
with open(new_emails_csv, 'a', newline='', encoding='utf-8') as i:
writer = csv.writer(i, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
writer.writerow([new_email])
# Avoid duplicates in same run
new_emails_in_csv.append(new_email)
else:
print(f"{new_email} is a generic email")
else:
print(f"{new_email}'s domain in XX blaclist")
else:
print(f"{new_email} already in XX Contacts")
# EMAILS FROM HTML
for re_match in re.finditer(EMAIL_REGEX1, msg.html.lower()):
new_email = re_match.group()
print('new_email', type(new_email), '------', new_email)
domain_new_email = my_utils.domain_from_email(new_email)
if new_email not in new_emails_in_csv:
if new_email not in dict_existing_emails:
if domain_new_email not in blacklist_domains:
if not any(element in new_email for element in list_remove):
set_new_emails.add(new_email)
with open(new_emails_csv, 'a', newline='', encoding='utf-8') as i:
writer = csv.writer(i, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
writer.writerow([new_email])
# Avoid duplicates in same run
new_emails_in_csv.append(new_email)
else:
print(f"{new_email} is a generic email")
else:
print(f"{new_email}'s domain in XX blaclist")
else:
print(f"{new_email} already in XX Contacts")
except Exception as e:
print(f"ERROR with {msg.uid} from {msg.from_}: {e}")
continue
except Exception as e:
print(f"ERROR: {e}")
# # Mark email as Read
# mailbox.flag(msg.uid, MailMessageFlags.SEEN, True)
# count_mark_as_read += 1
## Batch mark as read/unread /// Does not work
# mailbox.flag(mailbox.fetch(AND(from_="Milsom")), MailMessageFlags.SEEN, False)
### Print set_new_emails
# print("set_new_emails:")
# for email in set_new_emails:
# print(email)
# print()
# print('set_new_emails: ', len(set_new_emails))
########################################################################################################
# default boilerplate removed for brevity, see https://notes.nicolasdeville.com/python/boilerplate/
Alternative libraries
Imapbox
Dump IMAP inbox to a local folder in a regular backupable format: HTML, PDF, JSON and attachments.