Table of Contents
3 ways:
asyncio:
- primarily for I/O bound tasks (eg reading/writing to disk, network, etc)
- cooporative pausing/waiting
threading
- non-cooperative pausing/waiting
- good for I/O bound tasks
- good to do long running tasks in the background
Global Interpreter Lock (GIL) - only one thread can execute at a time.
multiprocessing
primarily for CPU bound tasks
code
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
filenames = ['file1.txt', 'file2.txt', 'file3.txt']
with Pool(processes=25) as pool:
results = pool.imap(etl, filenames)
for filename, duration in results:
print(f"{filename} took {duration:.2f}s")
20 Feb 2025 multiprocessing
reduced a database update script from a 195s run time to 11s, with this code:
# ... existing code ...
# IMPORTS (script-specific)
import multiprocessing # Change this import
# ... existing imports ...
# ... existing code ...
def process_file(filename):
if not filename.endswith('.html'):
return None
file_path = os.path.join(processed_dir, filename)
if file_path in existing_file_paths:
return None
try:
linkedin_url = extract_linkedin_url(file_path)
if linkedin_url:
linkedin_handle = my_utils.linkedin_handle_from_url(linkedin_url)
return (linkedin_handle, file_path)
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
os.remove(file_path)
return 'deleted'
return None
# Replace the ThreadPoolExecutor with ProcessPoolExecutor
if __name__ == '__main__': # This guard is important for multiprocessing
# Use number of CPU cores minus 1 to avoid overloading
num_processes = max(1, multiprocessing.cpu_count() - 1)
with multiprocessing.Pool(processes=num_processes) as pool:
results = list(tqdm(
pool.imap(process_file, os.listdir(processed_dir)),
total=count_total,
desc="Processing LinkedIn profiles"
))
linkedin_profiles = {}
count_deleted = sum(1 for r in results if r == 'deleted')
linkedin_profiles = dict(r for r in results if r and r != 'deleted')
# ... rest of the code ...
concurrent
code
Working code:
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_data_in_parallel(MY_DATA_LIST, workers=3): # change workers numbers as neede
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(MY_FUNCTION, x): x for x in MY_DATA_LIST}
for future in as_completed(futures):
result = future.result()
# Update the SQLite table immediately after processing each worker
if not test:
update_record(DB_TWITTER, table, result)
else:
print(f"ℹ️ Test mode: not updating SQLite table with {result}")