Multiprocessing and CPU Parallelism in Python
CPython's GIL (Global Interpreter Lock) prevents multiple threads from executing pure Python code in parallel. For CPU-intensive work, the solution is multiprocessing: separate processes, no GIL, full utilization of every CPU core.
When to use what
| Workload | Recommended solution |
|---|---|
| Concurrent I/O (network, disk) | asyncio or threading |
| CPU-intensive pure Python | multiprocessing |
| CPU-intensive NumPy/pandas | multiprocessing or numba |
| Mixed I/O + CPU | concurrent.futures (auto-selects) |
Basic Process
import multiprocessing as mp
import os
import time
def cpu_task(name: str, n: int) -> int:
pid = os.getpid()
print(f"[PID {pid}] Starting {name} n={n}")
result = sum(i * i for i in range(n))
print(f"[PID {pid}] {name} done: {result}")
return result
if __name__ == '__main__': # ← REQUIRED on Windows/macOS (spawn start method)
p1 = mp.Process(target=cpu_task, args=('A', 1_000_000))
p2 = mp.Process(target=cpu_task, args=('B', 2_000_000))
t0 = time.perf_counter()
p1.start(); p2.start()
p1.join(); p2.join()
print(f"Parallel elapsed: {time.perf_counter()-t0:.2f}s")
Pool.map — parallelize over a collection
import multiprocessing as mp
import time
def factorize(n: int) -> list[int]:
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
if __name__ == '__main__':
numbers = [10**12 + i for i in range(100)]
ncpus = mp.cpu_count()
t0 = time.perf_counter()
seq_results = [factorize(n) for n in numbers]
t_seq = time.perf_counter() - t0
t0 = time.perf_counter()
with mp.Pool(processes=ncpus) as pool:
par_results = pool.map(factorize, numbers)
t_par = time.perf_counter() - t0
print(f"Sequential: {t_seq:.2f}s")
print(f"Parallel ({ncpus} cores): {t_par:.2f}s")
print(f"Speedup: {t_seq/t_par:.1f}x")
Pool with multiple arguments
import multiprocessing as mp
from functools import partial
def process_image(path: str, quality: int, fmt: str) -> dict:
import time, pathlib
time.sleep(0.1)
return {'input': path, 'output': pathlib.Path(path).with_suffix(f'.{fmt}').name}
if __name__ == '__main__':
paths = [f'photo_{i:03d}.jpg' for i in range(20)]
# starmap — pass arguments as tuples
args = [(p, 85, 'webp') for p in paths]
with mp.Pool() as pool:
results = pool.starmap(process_image, args)
# partial — fix common arguments
to_webp = partial(process_image, quality=85, fmt='webp')
with mp.Pool() as pool:
results2 = pool.map(to_webp, paths)
print(f"Converted: {len(results)} images")
Pool.imap — streaming results
import multiprocessing as mp
def heavy_calc(n: int) -> float:
return sum(i**0.5 for i in range(n))
if __name__ == '__main__':
numbers = range(0, 10_000_000, 100_000)
with mp.Pool() as pool:
# imap — lazy iterator, don't wait for all results
for i, result in enumerate(pool.imap(heavy_calc, numbers, chunksize=4)):
print(f" [{i+1}] {result:.2f}")
# imap_unordered — results arrive as soon as ready (fastest)
for result in pool.imap_unordered(list(numbers), chunksize=4):
pass
Inter-process communication — Queue and Pipe
import multiprocessing as mp
import time
# Queue — multiple producers/consumers
def producer(q: mp.Queue, pid: int, n: int):
for i in range(n):
q.put({'pid': pid, 'i': i, 'data': i**2})
time.sleep(0.01)
q.put(None) # poison pill
def consumer(q: mp.Queue, cid: int, results: mp.Queue):
total = 0
while True:
item = q.get()
if item is None:
q.put(None) # pass poison pill to next consumer
break
total += item['data']
results.put({'consumer': cid, 'total': total})
if __name__ == '__main__':
work_q = mp.Queue(maxsize=50)
result_q = mp.Queue()
producers = [mp.Process(target=producer, args=(work_q, i, 10)) for i in range(3)]
consumers = [mp.Process(target=consumer, args=(work_q, j, result_q)) for j in range(2)]
for p in producers + consumers: p.start()
for p in producers: p.join()
for p in consumers: p.join()
while not result_q.empty():
r = result_q.get()
print(f"Consumer {r['consumer']}: total={r['total']}")
# Pipe — bidirectional communication between exactly 2 processes
def pipe_worker(child_conn):
while True:
data = child_conn.recv()
if data == 'STOP':
child_conn.close(); break
child_conn.send(data ** 2)
if __name__ == '__main__':
parent_conn, child_conn = mp.Pipe(duplex=True)
p = mp.Process(target=pipe_worker, args=(child_conn,))
p.start()
for n in [5, 10, 15]:
parent_conn.send(n)
print(f"{n}² = {parent_conn.recv()}")
parent_conn.send('STOP')
p.join()
Shared memory
import multiprocessing as mp
import ctypes
def increment(arr, start: int, end: int):
for i in range(start, end):
arr[i] += 1
if __name__ == '__main__':
# Value — single shared scalar
counter = mp.Value(ctypes.c_int, 0)
lock = mp.Lock()
def bump(v, lck):
for _ in range(1000):
with lck:
v.value += 1
procs = [mp.Process(target=bump, args=(counter, lock)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(f"Counter: {counter.value}") # 4000
# Array — typed C array
arr = mp.Array(ctypes.c_double, 1000)
for i in range(1000): arr[i] = float(i)
ps = [mp.Process(target=increment, args=(arr, i*200, (i+1)*200)) for i in range(5)]
for p in ps: p.start()
for p in ps: p.join()
print(f"arr[500] = {arr[500]}") # 501.0
# SharedMemory + NumPy (Python 3.8+)
import numpy as np
from multiprocessing.shared_memory import SharedMemory
data = np.arange(1000, dtype=np.float64)
shm = SharedMemory(create=True, size=data.nbytes)
buf = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
buf[:] = data[:]
def double_shared(name: str, shape, dtype_str: str):
from multiprocessing.shared_memory import SharedMemory
import numpy as np
shm2 = SharedMemory(name=name)
arr2 = np.ndarray(shape, dtype=np.dtype(dtype_str), buffer=shm2.buf)
arr2 *= 2
shm2.close()
p = mp.Process(target=double_shared, args=(shm.name, data.shape, str(data.dtype)))
p.start(); p.join()
print(f"buf[1] = {buf[1]}") # 2.0
shm.close(); shm.unlink()
ProcessPoolExecutor — cleanest API
from concurrent.futures import ProcessPoolExecutor, as_completed
import math
import time
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
return all(n % i != 0 for i in range(3, int(math.isqrt(n)) + 1, 2))
if __name__ == '__main__':
numbers = range(10**7, 10**7 + 10_000)
with ProcessPoolExecutor() as ex:
t0 = time.perf_counter()
primes = list(filter(None, ex.map(is_prime, numbers, chunksize=500)))
print(f"Primes found: {len(primes)} in {time.perf_counter()-t0:.2f}s")
# submit + as_completed — process results as they arrive
with ProcessPoolExecutor(max_workers=4) as ex:
futures = {ex.submit(is_prime, n): n for n in range(10**7, 10**7 + 100)}
for fut in as_completed(futures):
n = futures[fut]
if fut.result():
print(f" Prime: {n}")
Pool initializer — shared resources per worker
import multiprocessing as mp
import sqlite3
db_conn = None # global in each worker process
def init_worker(db_path: str):
"""Runs once when each worker process starts."""
global db_conn
db_conn = sqlite3.connect(db_path)
def process_record(record_id: int) -> dict:
global db_conn
cur = db_conn.execute("SELECT * FROM records WHERE id=?", (record_id,))
return {'id': record_id, 'row': cur.fetchone()}
if __name__ == '__main__':
ids = list(range(1, 101))
with mp.Pool(
processes=4,
initializer=init_worker,
initargs=('data.db',),
) as pool:
results = pool.map(process_record, ids)
print(f"Processed: {len(results)} records")
Best practices
if __name__ == '__main__':is mandatory on Windows and macOS (where the start method isspawn). Without it, every worker process re-imports the module and re-runs top-level code.Pool.mapover manualProcessfor parallelizing over collections — manages the worker pool and result collection automatically.chunksizeinpool.map/imapreduces IPC overhead: larger chunks = fewer messages, but worse load balancing.ProcessPoolExecutorfromconcurrent.futuresis the most modern and Pythonic API; prefer it unless you need Pool-specific features likeinitializer.- Never share non-picklable objects (sockets, threading locks, DB connections) between processes — use the Pool
initializerpattern instead. mp.Value+Lockfor atomic counters;mp.ArrayorSharedMemoryfor large arrays that need zero-copy sharing.
Related conversions
Frequent conversions across the catalogue: