Updating SPF records from DMARC reports automatically

painful process made easy with Python

12 Oct 2024

Recently figured out how to properly configure SPF records for a domain by using DMARC reports.

This is a quick guide on how to do it with Python, as it's cumbersome to do manually, and needs to be done regularly (at least in the days and weeks after the initial setup).

See Email Deliverability: SPF, DKIM & DMARC for more details on what SPF/DMARC/DKIM are & how to do the original setup.

The automated process goes as follows:

1) fetch all emails sent to my DMARC email address, and download the XML attachments to a dedicated folder
2) parse the XML files to extract the IP addresses
3) compare the IP addresses in the DMARC reports with the SPF records in the database
4) if any IP addresses are missing from the SPF records, update the SPF records with the new IP addresses

Last step is manual for now, until API access is available for the DNS providers.

But the Python script does everything else automatically, and opens the DNS config page for each, along with having the SPF value copied to clipboard, so the manual work is only a couple clicks.

Note: this script/approach are meant to be used to maximise email deliverability, not security.

Store all your DNS configuration locally

I created a table in my local SQLite database to store all the DNS configuration for each domain, including the SPF record values.

The schema for that table is as follows:

  {
    "id": int,
    "domain": str,
    "dkim_type": "TXT",
    "dkim_host": "spacemail._domainkey",
    "dkim_value": str,
    "dmarc_type": "TXT",
    "dmarc_host": "_DMARC",
    "dmarc_value": "v=DMARC1; p=none; rua=mailto:dedicated_dmarc_email_of_your_choice@mydomain.com; ruf=dedicated_dmarc_email_of_your_choice@mydomain.com; fo=1",
    "spf_type": "TXT",
    "spf_host": "@",
    "spf_value": "v=spf1 ip4:1.2.3.4 ip4:1.2.3.4 ip4:1.2.3.4 ip6:2a02:333:f444:2699::888 ~all", # example only, with scrambled IPs
    "provider": "spaceship",
    "dns_config_url": "https://www.spaceship.com/application/advanced-dns-application/manage/my_domain.com/", # where my_domain.com is the domain
    "updated": str,
  }

Fetch the DMARC XML files from the email inbox

Using IMAP to fetch the emails and download the attachments.

import os
from datetime import datetime
import time
from dotenv import load_dotenv
from imap_tools import MailBox, AND
import zipfile
import io
import pprint

def process_dmarc_reports():
    load_dotenv()

    # Hardcoded values
    EMAIL_ACCOUNT = os.getenv("EMAIL_ACCOUNT_ND")
    PASSWORD = os.getenv("PASSWORD_ND")
    EMAIL_SERVER = os.getenv("EMAIL_SERVER_ND")
    DMARC_ADDRESS = "dedicated_dmarc_email_of_your_choice@mydomain.com"
    OUTPUT_DIR = "/path/to/folder_with_dmarc_reports_as_xml_files"
    VERBOSE = True

    os.makedirs(OUTPUT_DIR, exist_ok=True)

    count = 0
    count_processed = 0
    count_errors = 0
    count_xml_saved = 0

    start_time = time.time()

    if VERBOSE:
        print(f"Starting DMARC report processing at {datetime.now().strftime('%H:%M:%S')}")

    with MailBox(EMAIL_SERVER).login(EMAIL_ACCOUNT, PASSWORD) as mailbox:
        if VERBOSE:
            print(f"\n\nProcessing emails for: {DMARC_ADDRESS}")
        for msg in mailbox.fetch(AND(to=DMARC_ADDRESS), mark_seen=False, bulk=True):
            count += 1
            if VERBOSE:
                print(f"\r    Processing email {count} - From: {msg.from_} - Subject: {msg.subject}", end='')

            try:
                xml_saved = False
                for att in msg.attachments:
                    if att.filename.lower().endswith(('.zip', '.gz')):
                        content = io.BytesIO(att.payload)
                        if att.filename.lower().endswith('.zip'):
                            if zipfile.is_zipfile(content):
                                with zipfile.ZipFile(content) as zf:
                                    for xml_file in zf.namelist():
                                        if xml_file.lower().endswith('.xml'):
                                            xml_content = zf.read(xml_file)
                                            xml_filename = f"dmarc_report_{count}_{xml_file}"
                                            xml_path = os.path.join(OUTPUT_DIR, xml_filename)
                                            with open(xml_path, 'wb') as f:
                                                f.write(xml_content)
                                            count_xml_saved += 1
                                            xml_saved = True
                        elif att.filename.lower().endswith('.gz'):
                            import gzip
                            try:
                                with gzip.GzipFile(fileobj=content) as gz:
                                    xml_content = gz.read()
                                    xml_filename = f"dmarc_report_{count}_{att.filename[:-3]}.xml"
                                    xml_path = os.path.join(OUTPUT_DIR, xml_filename)
                                    with open(xml_path, 'wb') as f:
                                        f.write(xml_content)
                                    count_xml_saved += 1
                                    xml_saved = True
                            except gzip.BadGzipFile:
                                print(f"\nError: {att.filename} is not a valid gzip file.")
                        count_processed += 1
                if xml_saved:
                    mailbox.delete([msg.uid])
                    if VERBOSE:
                        print(f"\n❌ ✅    Deleted email {msg.uid} after successful XML extraction")
            except Exception as e:
                if VERBOSE:
                    print(f"\nError processing email: {e}")
                count_errors += 1
    results = {
        'total_emails': count,
        'zip_files_processed': count_processed,
        'xml_files_saved': count_xml_saved,
        'errors': count_errors,
        'output_directory': OUTPUT_DIR,
        'runtime_minutes': round((time.time() - start_time)/60, 1)
    }

    if VERBOSE:
        pp = pprint.PrettyPrinter(indent=4)
        print("\n")
        print('-------------------------------')
        pp.pprint(results)
        print('-------------------------------')
        print(f"Finished at {datetime.now().strftime('%H:%M:%S')}.")

    return results

if __name__ == "__main__":
    process_dmarc_reports()

Process the local DMARC XML files

The script above can be run separately, or imported into the script below at the start of it to only run a single Python file.

This script processes the DMARC XML files that have been downloaded locally, extracts the IP addresses, and compares them with the SPF records in the database.

If there are any missing IP addresses, it provides the updated SPF record value, copied in clipboard, and opens the DNS config page, ready to past the new value.
Once confirmed that the SPF record has been updated, it updates the record in my local database.

from datetime import datetime
import os
ts_db = f"{datetime.now().strftime('%Y-%m-%d %H:%M')}"

import xml.etree.ElementTree as ET
import re
import os
import sqlite3
import webbrowser
import pyperclip

from DB.tools import select_all_records, update_record, create_record, delete_record

from dotenv import load_dotenv
load_dotenv()
DB_BTOB = os.getenv("DB_BTOB")

print()

with sqlite3.connect(DB_BTOB) as conn:
    cur = conn.cursor()
    cur.execute(f"""
        SELECT rowid, domain, spf_value, provider, dns_config_url
        FROM email_domains_configuration 
    """)
    rows = cur.fetchall()
    dict_spf = {x[1]: (x[0], x[2], x[3], x[4]) for x in rows}  # Include dns_config_url

def extract_spf_components(spf_string):
    ip4_addresses = re.findall(r'ip4:[\d\.]+', spf_string)
    ip6_addresses = re.findall(r'ip6:[\da-fA-F:]+', spf_string)
    includes = re.findall(r'include:[^\s]+', spf_string)

    return set(ip4_addresses), set(ip6_addresses), set(includes)

def check_if_same_spf(spf1, spf2):
    ip4_1, ip6_1, include_1 = extract_spf_components(spf1)
    ip4_2, ip6_2, include_2 = extract_spf_components(spf2)

    return ip4_1 == ip4_2 and ip6_1 == ip6_2 and include_1 == include_2


def extract_dmarc_domain(xml_file_path):
    """
    Extracts the domain from the DMARC XML report file.

    Args:
    - xml_file_path: Path to the DMARC XML file.

    Returns:
    - The domain that the DMARC report is for.
    """
    tree = ET.parse(xml_file_path)
    root = tree.getroot()

    domain = root.find(".//policy_published/domain").text
    return domain

def extract_ips_from_spf(spf_string):
    """
    Extracts the IP addresses (both IPv4 and IPv6) from the SPF string.
    """
    ipv4_pattern = r'ip4:(\d+\.\d+\.\d+\.\d+)'
    ipv6_pattern = r'ip6:([0-9a-fA-F:]+)'

    ipv4_matches = re.findall(ipv4_pattern, spf_string)
    ipv6_matches = re.findall(ipv6_pattern, spf_string)

    return set(ipv4_matches), set(ipv6_matches)

def extract_ips_from_dmarc(dmarc_xml):
    """
    Extracts the IP addresses from the DMARC XML report.
    """
    tree = ET.ElementTree(ET.fromstring(dmarc_xml))
    root = tree.getroot()

    ipv4_addresses = set()
    ipv6_addresses = set()

    for record in root.findall(".//record"):
        source_ip = record.find("row/source_ip").text
        if ':' in source_ip:  # IPv6 address
            ipv6_addresses.add(source_ip)
        else:  # IPv4 address
            ipv4_addresses.add(source_ip)

    return ipv4_addresses, ipv6_addresses

def update_spf_if_needed(spf_string, dmarc_xml):
    """
    Takes an SPF string and a DMARC XML, checks if any IPs from the DMARC report
    are missing in the SPF string, and returns the updated SPF string if needed, else returns None.
    """
    spf_ipv4, spf_ipv6 = extract_ips_from_spf(spf_string)
    dmarc_ipv4, dmarc_ipv6 = extract_ips_from_dmarc(dmarc_xml)

    missing_ipv4 = dmarc_ipv4 - spf_ipv4
    missing_ipv6 = dmarc_ipv6 - spf_ipv6

    if not missing_ipv4 and not missing_ipv6:
        return None  # No update needed

    updated_spf = spf_string
    for ip in missing_ipv4:
        updated_spf = updated_spf.replace('~all', f'ip4:{ip} ~all')

    for ip in missing_ipv6:
        updated_spf = updated_spf.replace('~all', f'ip6:{ip} ~all')

    return updated_spf

def process_dmarc_reports():
    """
    Processes all DMARC XML reports in the specified directory.
    """
    directory = "/path/to/folder_with_dmarc_reports_as_xml_files
    updated_spfs = {}

    for filename in os.listdir(directory):
        if filename.endswith('.xml'):
            file_path = os.path.join(directory, filename)
            domain = extract_dmarc_domain(file_path)
            print(f"Processing DMARC report for domain: {domain}")

            # Read the XML file content
            with open(file_path, 'r') as file:
                dmarc_xml = file.read()

            # Get the current SPF record for the domain from dict_spf
            if domain in dict_spf:
                rowid, spf_string, provider, dns_config_url = dict_spf[domain]  # Unpack 4 values
                updated_spf = update_spf_if_needed(spf_string, dmarc_xml)

                if updated_spf and not check_if_same_spf(spf_string, updated_spf):
                    updated_spfs[domain] = (rowid, updated_spf, provider, dns_config_url)  # Include provider and dns_config_url

    # Update SPF records with user confirmation
    if updated_spfs:
        print("\n\n\n❌ SPF records that need to be updated:")
        for domain, (rowid, new_spf, provider, dns_config_url) in updated_spfs.items():
            original_spf = dict_spf[domain][1]
            print(f"\nDomain: {domain}")
            print(f"Row ID: {rowid}")
            print(f"Original SPF: {original_spf}")
            print(f">Updated SPF: {new_spf}")

            webbrowser.open(dns_config_url)

            # Copy new SPF record to clipboard
            pyperclip.copy(new_spf)
            print(f"\n\nℹ️  New SPF record for {domain} has been copied to clipboard.")

            user_input = input(f"\n\n>>> Has this SPF record for {domain} been updated in {dict_spf[domain][2].upper()}? (y/n): ").lower()
            if user_input == 'y':

                update_record(DB_BTOB, "email_domains_configuration", {
                    "rowid": rowid,
                    "spf_value": new_spf,
                    "updated": ts_db,
                    })

            else:
                print(f"Skipping update for {domain}.")

        print("\n\n✅ All SPF records have been processed.")
    else:
        print("\n\n✅ No SPF records need to be updated.")

process_dmarc_reports()

links

social