concurrent.futures — Manage Pools of Concurrent Tasks
8:58 PM
Using map() with a Basic Thread Pool
#futures_thread_pool_map.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))
#output
main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results .result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
Scheduling Individual Tasks
#futures_thread_pool_submit.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))
#output
main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future:
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result:
Waiting for Tasks in Any Order
#futures_as_completed.py
from concurrent import futures
import random
import time
def task(n):
time.sleep(random.random())
return (n, n / 10)
ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')
wait_for = [
ex.submit(task, i)
for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('main: result: {}'.format(f.result()))
#output
main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)
Future Callbacks
#futures_future_callback.py
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('{}: error returned: {}'.format(
fn.arg, error))
else:
result = fn.result()
print('{}: value returned: {}'.format(
fn.arg, result))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()
#output
main: starting
5: sleeping
5: done
5: value returned: 0.5
Canceling Tasks
#futures_future_callback_cancel.py
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
print('{}: not canceled'.format(fn.arg))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
tasks = []
for i in range(10, 0, -1):
print('main: submitting {}'.format(i))
f = ex.submit(task, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f))
for i, t in reversed(tasks):
if not t.cancel():
print('main: did not cancel {}'.format(i))
ex.shutdown()
#output
main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
Exceptions in Tasks
#futures_future_exception.py
from concurrent import futures
def task(n):
print('{}: starting'.format(n))
raise ValueError('the value {} is no good'.format(n))
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
error = f.exception()
print('main: error: {}'.format(error))
try:
result = f.result()
except ValueError as e:
print('main: saw error "{}" when accessing result'.format(e))
#output
main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result
Context Manager
#futures_context_manager.py
from concurrent import futures
def task(n):
print(n)
with futures.ThreadPoolExecutor(max_workers=2) as ex:
print('main: starting')
ex.submit(task, 1)
ex.submit(task, 2)
ex.submit(task, 3)
ex.submit(task, 4)
print('main: done')
#output
main: starting
1
2
3
4
main: done
Process Pools
#futures_process_pool_map.py
from concurrent import futures
import os
def task(n):
return (n, os.getpid())
ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
print('ran task {} in process {}'.format(n, pid))
#output
ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854
Process Pool Executor
#futures_process_pool_broken.py
from concurrent import futures
import os
import signal
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('getting the pid for one worker')
f1 = ex.submit(os.getpid)
pid1 = f1.result()
print('killing process {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)
print('submitting another task')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('could not start new tasks: {}'.format(e))
#outpput
getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.
source: https://pymotw.com/3/concurrent.futures/