Module threadbare.execute
Expand source code
import traceback
import copy
from multiprocessing import Process, Queue
import time
from .common import first
from . import state
import logging
LOG = logging.getLogger(__name__)
def serial(func, pool_size=None):
"""Forces the given function to run `pool_size` times.
when pool_size is None (default), executor decides how many instances of `func` to execute (1, probably).
if set and executor is given a set of values to use instead, `pool_size` is ignored"""
def inner(*args, **kwargs):
return func(*args, **kwargs)
inner.pool_size = pool_size
return inner
def parallel(func, pool_size=None):
"""Forces the wrapped function to run in parallel, instead of sequentially."""
wrapped_func = serial(func, pool_size)
# `func` *must* be forced to run in parallel to main process
wrapped_func.parallel = True
return wrapped_func
def _parallel_execution_worker_wrapper(env, worker_func, name, queue):
"""this function is executed in another process. it wraps the given `worker_func`, initialising the `state.ENV` of
the new process and adds its results to the given `queue`"""
assert isinstance(env, dict), "given environment must be a dictionary"
# Fabric nukes the child process's `env` dictionary
# -
# note: not possible to service stdin when multiprocessing
env["abort_on_prompts"] = True
# we don't care what the parent process had when Python copied across it's state to
# execute this `worker_func` in parallel. reset it now. the process is destroyed upon leaving.
state.DEPTH = 0
result = worker_func()
queue.put({"name": name, "result": result})
except BaseException as unhandled_exception:
# "Note that exit handlers and finally clauses, etc., will not be executed."
# -
queue.put({"name": name, "result": unhandled_exception})
def process_status(running_p):
result = {
"exitcode": running_p.exitcode,
"alive": running_p.is_alive(),
"killed": False,
"kill-signal": None,
if running_p.exitcode is not None and running_p.exitcode < 0:
result["killed"] = True
result["kill-signal"] = -running_p.exitcode
return result
def _parallel_execution(env, func, param_key, param_values, return_process_pool=False):
"executes the given function in parallel to main process. blocks until processes are complete"
results_q = Queue()
kwargs = {
# 'env': ..., # each process will get a new state dictionary
"worker_func": func,
# 'name': ..., # a name is assigned on process start
"queue": results_q,
pool_size = getattr(func, "pool_size", None)
pool_size = pool_size if pool_size is not None else 1
pool_values = param_values or range(0, pool_size)
pool = []
for idx, nth_val in enumerate(pool_values):
kwargs["name"] = "process--" + str(idx + 1) # process--1, process--2
new_env = {} if not env else copy.deepcopy(env)
# ssh clients are not shared between processes
if "ssh_client" in new_env:
del new_env["ssh_client"]
if param_key:
new_env[param_key] = nth_val
new_env["parallel"] = True
# new_env['linewise'] = True # not set until needed
kwargs["env"] = new_env
p = Process(
if return_process_pool:
# don't poll for results, don't wait to finish, just return the list of running processes
return results_q, pool
result_list = [results_q.get(block=True) for _ in range(len(pool))]
# there is a slight delay between a result appearing and the process exiting
result_map = {} # {process-name: process-results, ...}
# all processes are done, they have yielded results and we can finish up now.
# there is a case where a worker has yielded results but the process hasn't ended.
# to solve this we terminate the process and issue a warning.
for idx, process in enumerate(pool):
status = process_status(process)
if status["alive"]:
"process is still alive despite worker having completed. terminating process: %s"
# this should report that the process *was* killed, but the return code should remain the same.
status = process_status(process)
result = process_status(process)
result_map[result["name"]] = status
# all processes are complete
# marry the results to their process results using their 'name'
for job_result in result_list:
job_name = job_result["name"]
result_map[job_name]["result"] = job_result["result"]
# sort the results, drop the process name
return [b for a, b in sorted(result_map.items(), key=first)]
def _serial_execution(func, param_key, param_values):
"executes the given function serially"
result_list = []
if param_key and param_values:
for x in param_values:
with state.settings(**{param_key: x}):
# pretty boring :(
# I could set '_idx' or something in `state.ENV` I suppose ..
for _ in range(0, getattr(func, "pool_size", 1)):
return result_list
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True):
"""inspects a given function and then executes it either serially or in another process using Python's `multiprocessing` module.
`param` and `param_list` control the number of processes spawned and the name of the parameter passed to the function.
For example:
execute(somefunc, param_key='host', param_values=['', '', 'localhost'])
will ensure that `somefunc` has the (local) state property 'host' with a value of one of the above when executed.
`param` and `param_list` are optional, but if one is specified then so must the other.
parent process blocks until all child processes have completed.
returns a map of execution data with the return values of the individual executions available under 'result'.
when `raise_unhandled_errors` is `True` (default), the first result that is an exception will be re-raised."""
# in Fabric, `execute` is a guard-type function that ensures the function and the function's environment is
# correct before passing it to `_execute` that does the actual magic.
# `execute`:
# `_execute`:
# Fabric's custom 'JobQueue' adds complexity but can be avoided:
if (param_key and param_values is None) or (param_key is None and param_values):
raise ValueError(
"either a `param_key` AND `param_values` are provided OR neither are provided"
if param_values is not None and type(param_values) not in [list, tuple, set]:
raise ValueError(
"given value for `param_values` must be an iterable type, not %r"
% type(param_values)
if param_key is not None and not isinstance(param_key, str):
raise ValueError(
"given value for `param_key` must be a valid function parameter key"
if hasattr(func, "parallel") and func.parallel:
result_payload_list = _parallel_execution(
state.ENV, func, param_key, param_values
response = []
for result_payload in result_payload_list:
if (
isinstance(result_payload["result"], BaseException)
and raise_unhandled_errors
unhandled_error = result_payload["result"]
raise unhandled_error
return response
return _serial_execution(func, param_key, param_values)
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True):
"""convenience wrapper around `execute`. calls `execute` on given `func` for each host in `hosts`.
The host is available within the worker function's `env` as `host_string`."""
host_list = hosts or state.ENV.get("hosts") or []
assert isinstance(host_list, list) and host_list, "'hosts' must be a non-empty list"
# Fabric may know about many hosts ('all_hosts') but only be acting upon a subset of them ('hosts')
# -
# set here:
# -
# in elife/builder we use a map of host information:
# -
# -
# it says 'for informational purposes only' and nothing we use depends on it, so I'm disabling for now
# env['all_hosts'] = env['hosts']
results = execute(
# results are ordered so we can do this
return dict(zip(host_list, results)) # {'': [], '': []}
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True)
inspects a given function and then executes it either serially or in another process using Python's
control the number of processes spawned and the name of the parameter passed to the function.For example:
execute(somefunc, param_key='host', param_values=['', '', 'localhost'])
will ensure that
has the (local) state property 'host' with a value of one of the above when executed.param
are optional, but if one is specified then so must the other.parent process blocks until all child processes have completed. returns a map of execution data with the return values of the individual executions available under 'result'.
(default), the first result that is an exception will be re-raised.Expand source code
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True): """inspects a given function and then executes it either serially or in another process using Python's `multiprocessing` module. `param` and `param_list` control the number of processes spawned and the name of the parameter passed to the function. For example: execute(somefunc, param_key='host', param_values=['', '', 'localhost']) will ensure that `somefunc` has the (local) state property 'host' with a value of one of the above when executed. `param` and `param_list` are optional, but if one is specified then so must the other. parent process blocks until all child processes have completed. returns a map of execution data with the return values of the individual executions available under 'result'. when `raise_unhandled_errors` is `True` (default), the first result that is an exception will be re-raised.""" # in Fabric, `execute` is a guard-type function that ensures the function and the function's environment is # correct before passing it to `_execute` that does the actual magic. # `execute`: # `_execute`: # Fabric's custom 'JobQueue' adds complexity but can be avoided: # if (param_key and param_values is None) or (param_key is None and param_values): raise ValueError( "either a `param_key` AND `param_values` are provided OR neither are provided" ) if param_values is not None and type(param_values) not in [list, tuple, set]: raise ValueError( "given value for `param_values` must be an iterable type, not %r" % type(param_values) ) if param_key is not None and not isinstance(param_key, str): raise ValueError( "given value for `param_key` must be a valid function parameter key" ) if hasattr(func, "parallel") and func.parallel: result_payload_list = _parallel_execution( state.ENV, func, param_key, param_values ) response = [] for result_payload in result_payload_list: if ( isinstance(result_payload["result"], BaseException) and raise_unhandled_errors ): unhandled_error = result_payload["result"] raise unhandled_error response.append(result_payload["result"]) return response return _serial_execution(func, param_key, param_values)
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True)
convenience wrapper around
. callsexecute()
on givenfunc
for each host inhosts
. The host is available within the worker function'senv
.Expand source code
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True): """convenience wrapper around `execute`. calls `execute` on given `func` for each host in `hosts`. The host is available within the worker function's `env` as `host_string`.""" host_list = hosts or state.ENV.get("hosts") or [] assert isinstance(host_list, list) and host_list, "'hosts' must be a non-empty list" # Fabric may know about many hosts ('all_hosts') but only be acting upon a subset of them ('hosts') # - # set here: # - # in elife/builder we use a map of host information: # - # - # it says 'for informational purposes only' and nothing we use depends on it, so I'm disabling for now # env['all_hosts'] = env['hosts'] results = execute( func, param_key="host_string", param_values=host_list, raise_unhandled_errors=raise_unhandled_errors, ) # results are ordered so we can do this return dict(zip(host_list, results)) # {'': [], '': []}
def parallel(func, pool_size=None)
Forces the wrapped function to run in parallel, instead of sequentially.
Expand source code
def parallel(func, pool_size=None): """Forces the wrapped function to run in parallel, instead of sequentially.""" wrapped_func = serial(func, pool_size) # `func` *must* be forced to run in parallel to main process wrapped_func.parallel = True return wrapped_func
def process_status(running_p)
Expand source code
def process_status(running_p): # result = { "pid":, "name":, "exitcode": running_p.exitcode, "alive": running_p.is_alive(), "killed": False, "kill-signal": None, } if running_p.exitcode is not None and running_p.exitcode < 0: result["killed"] = True result["kill-signal"] = -running_p.exitcode return result
def serial(func, pool_size=None)
Forces the given function to run
times. when pool_size is None (default), executor decides how many instances offunc
to execute (1, probably). if set and executor is given a set of values to use instead,pool_size
is ignoredExpand source code
def serial(func, pool_size=None): """Forces the given function to run `pool_size` times. when pool_size is None (default), executor decides how many instances of `func` to execute (1, probably). if set and executor is given a set of values to use instead, `pool_size` is ignored""" def inner(*args, **kwargs): return func(*args, **kwargs) inner.pool_size = pool_size return inner