Multiprocessing with Python

get those cores working!

3 ways:

asyncio:

  • primarily for I/O bound tasks (eg reading/writing to disk, network, etc)
  • cooporative pausing/waiting

threading

  • non-cooperative pausing/waiting
  • good for I/O bound tasks
  • good to do long running tasks in the background

Global Interpreter Lock (GIL) - only one thread can execute at a time.

multiprocessing

primarily for CPU bound tasks

code

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

filenames = ['file1.txt', 'file2.txt', 'file3.txt']

with Pool(processes=25) as pool:
    results = pool.imap(etl, filenames)

    for filename, duration in results:
        print(f"{filename} took {duration:.2f}s")

20 Feb 2025 multiprocessing reduced a database update script from a 195s run time to 11s, with this code:

# ... existing code ...

# IMPORTS (script-specific)
import multiprocessing  # Change this import
# ... existing imports ...

# ... existing code ...

def process_file(filename):
    if not filename.endswith('.html'):
        return None

    file_path = os.path.join(processed_dir, filename)
    if file_path in existing_file_paths:
        return None

    try:
        linkedin_url = extract_linkedin_url(file_path)
        if linkedin_url:
            linkedin_handle = my_utils.linkedin_handle_from_url(linkedin_url)
            return (linkedin_handle, file_path)
    except Exception as e:
        print(f"❌ Error processing {filename}: {e}")
        os.remove(file_path)
        return 'deleted'
    return None

# Replace the ThreadPoolExecutor with ProcessPoolExecutor
if __name__ == '__main__':  # This guard is important for multiprocessing
    # Use number of CPU cores minus 1 to avoid overloading
    num_processes = max(1, multiprocessing.cpu_count() - 1)

    with multiprocessing.Pool(processes=num_processes) as pool:
        results = list(tqdm(
            pool.imap(process_file, os.listdir(processed_dir)),
            total=count_total,
            desc="Processing LinkedIn profiles"
        ))

    linkedin_profiles = {}
    count_deleted = sum(1 for r in results if r == 'deleted')
    linkedin_profiles = dict(r for r in results if r and r != 'deleted')

    # ... rest of the code ...

concurrent

code

Working code:

from concurrent.futures import ThreadPoolExecutor, as_completed

def process_data_in_parallel(MY_DATA_LIST, workers=3): # change workers numbers as neede

    with ThreadPoolExecutor(max_workers=workers) as executor:

        futures = {executor.submit(MY_FUNCTION, x): x for x in MY_DATA_LIST} 

        for future in as_completed(futures):

            result = future.result()            
            # Update the SQLite table immediately after processing each worker
            if not test:
                update_record(DB_TWITTER, table, result)
            else:
                print(f"ℹ️ Test mode: not updating SQLite table with {result}")

links

social