Module threadbare.execute
Expand source code
import traceback
import copy
from multiprocessing import Process, Queue
import time
from .common import first
from . import state
import logging
LOG = logging.getLogger(__name__)
# https://github.com/mathiasertl/fabric/blob/master/fabric/decorators.py#L148-L161
def serial(func, pool_size=None):
"""Forces the given function to run `pool_size` times.
when pool_size is None (default), executor decides how many instances of `func` to execute (1, probably).
if set and executor is given a set of values to use instead, `pool_size` is ignored"""
def inner(*args, **kwargs):
return func(*args, **kwargs)
inner.pool_size = pool_size
return inner
# https://github.com/mathiasertl/fabric/blob/master/fabric/decorators.py#L164-L194
def parallel(func, pool_size=None):
"""Forces the wrapped function to run in parallel, instead of sequentially."""
wrapped_func = serial(func, pool_size)
# `func` *must* be forced to run in parallel to main process
wrapped_func.parallel = True
return wrapped_func
def _parallel_execution_worker_wrapper(env, worker_func, name, queue):
"""this function is executed in another process. it wraps the given `worker_func`, initialising the `state.ENV` of
the new process and adds its results to the given `queue`"""
try:
assert isinstance(env, dict), "given environment must be a dictionary"
# Fabric nukes the child process's `env` dictionary
# - https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L229-L237
# note: not possible to service stdin when multiprocessing
env["abort_on_prompts"] = True
# we don't care what the parent process had when Python copied across it's state to
# execute this `worker_func` in parallel. reset it now. the process is destroyed upon leaving.
state.DEPTH = 0
state.set_defaults(env)
result = worker_func()
queue.put({"name": name, "result": result})
except BaseException as unhandled_exception:
traceback.print_exc()
# "Note that exit handlers and finally clauses, etc., will not be executed."
# - https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.terminate
queue.put({"name": name, "result": unhandled_exception})
def process_status(running_p):
# https://docs.python.org/2/library/multiprocessing.html#process-and-exceptions
result = {
"pid": running_p.pid,
"name": running_p.name,
"exitcode": running_p.exitcode,
"alive": running_p.is_alive(),
"killed": False,
"kill-signal": None,
}
if running_p.exitcode is not None and running_p.exitcode < 0:
result["killed"] = True
result["kill-signal"] = -running_p.exitcode
return result
def _parallel_execution(env, func, param_key, param_values, return_process_pool=False):
"executes the given function in parallel to main process. blocks until processes are complete"
results_q = Queue()
kwargs = {
# 'env': ..., # each process will get a new state dictionary
"worker_func": func,
# 'name': ..., # a name is assigned on process start
"queue": results_q,
}
pool_size = getattr(func, "pool_size", None)
pool_size = pool_size if pool_size is not None else 1
pool_values = param_values or range(0, pool_size)
pool = []
for idx, nth_val in enumerate(pool_values):
kwargs["name"] = "process--" + str(idx + 1) # process--1, process--2
new_env = {} if not env else copy.deepcopy(env)
# ssh clients are not shared between processes
if "ssh_client" in new_env:
del new_env["ssh_client"]
if param_key:
new_env[param_key] = nth_val
new_env["parallel"] = True
# https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L223-L227
# new_env['linewise'] = True # not set until needed
kwargs["env"] = new_env
p = Process(
name=kwargs["name"],
target=_parallel_execution_worker_wrapper,
kwargs=kwargs,
)
p.start()
pool.append(p)
if return_process_pool:
# don't poll for results, don't wait to finish, just return the list of running processes
return results_q, pool
result_list = [results_q.get(block=True) for _ in range(len(pool))]
# there is a slight delay between a result appearing and the process exiting
time.sleep(0.1)
results_q.close()
result_map = {} # {process-name: process-results, ...}
# all processes are done, they have yielded results and we can finish up now.
# there is a case where a worker has yielded results but the process hasn't ended.
# to solve this we terminate the process and issue a warning.
for idx, process in enumerate(pool):
status = process_status(process)
if status["alive"]:
LOG.warning(
"process is still alive despite worker having completed. terminating process: %s"
% process.name
)
process.terminate()
# this should report that the process *was* killed, but the return code should remain the same.
status = process_status(process)
result = process_status(process)
result_map[result["name"]] = status
# all processes are complete
# marry the results to their process results using their 'name'
for job_result in result_list:
job_name = job_result["name"]
result_map[job_name]["result"] = job_result["result"]
# sort the results, drop the process name
return [b for a, b in sorted(result_map.items(), key=first)]
def _serial_execution(func, param_key, param_values):
"executes the given function serially"
result_list = []
if param_key and param_values:
for x in param_values:
with state.settings(**{param_key: x}):
result_list.append(func())
else:
# pretty boring :(
# I could set '_idx' or something in `state.ENV` I suppose ..
for _ in range(0, getattr(func, "pool_size", 1)):
result_list.append(func())
return result_list
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True):
"""inspects a given function and then executes it either serially or in another process using Python's `multiprocessing` module.
`param` and `param_list` control the number of processes spawned and the name of the parameter passed to the function.
For example:
execute(somefunc, param_key='host', param_values=['127.0.0.1', '127.0.1.1', 'localhost'])
will ensure that `somefunc` has the (local) state property 'host' with a value of one of the above when executed.
`param` and `param_list` are optional, but if one is specified then so must the other.
parent process blocks until all child processes have completed.
returns a map of execution data with the return values of the individual executions available under 'result'.
when `raise_unhandled_errors` is `True` (default), the first result that is an exception will be re-raised."""
# in Fabric, `execute` is a guard-type function that ensures the function and the function's environment is
# correct before passing it to `_execute` that does the actual magic.
# `execute`: https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L372-L401
# `_execute`: https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L213-L277
# Fabric's custom 'JobQueue' adds complexity but can be avoided:
# https://github.com/mathiasertl/fabric/blob/master/fabric/job_queue.py
if (param_key and param_values is None) or (param_key is None and param_values):
raise ValueError(
"either a `param_key` AND `param_values` are provided OR neither are provided"
)
if param_values is not None and type(param_values) not in [list, tuple, set]:
raise ValueError(
"given value for `param_values` must be an iterable type, not %r"
% type(param_values)
)
if param_key is not None and not isinstance(param_key, str):
raise ValueError(
"given value for `param_key` must be a valid function parameter key"
)
if hasattr(func, "parallel") and func.parallel:
result_payload_list = _parallel_execution(
state.ENV, func, param_key, param_values
)
response = []
for result_payload in result_payload_list:
if (
isinstance(result_payload["result"], BaseException)
and raise_unhandled_errors
):
unhandled_error = result_payload["result"]
raise unhandled_error
response.append(result_payload["result"])
return response
return _serial_execution(func, param_key, param_values)
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True):
"""convenience wrapper around `execute`. calls `execute` on given `func` for each host in `hosts`.
The host is available within the worker function's `env` as `host_string`."""
host_list = hosts or state.ENV.get("hosts") or []
assert isinstance(host_list, list) and host_list, "'hosts' must be a non-empty list"
# Fabric may know about many hosts ('all_hosts') but only be acting upon a subset of them ('hosts')
# - https://github.com/mathiasertl/fabric/blob/master/sites/docs/usage/env.rst#all_hosts
# set here:
# - https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L352
# in elife/builder we use a map of host information:
# - https://github.com/elifesciences/builder/blob/master/src/buildercore/core.py#L326-L327
# - https://github.com/elifesciences/builder/blob/master/src/buildercore/core.py#L386
# it says 'for informational purposes only' and nothing we use depends on it, so I'm disabling for now
# env['all_hosts'] = env['hosts']
results = execute(
func,
param_key="host_string",
param_values=host_list,
raise_unhandled_errors=raise_unhandled_errors,
)
# results are ordered so we can do this
return dict(zip(host_list, results)) # {'192.168.0.1': [], '192.169.0.3': []}
Functions
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True)
-
inspects a given function and then executes it either serially or in another process using Python's
multiprocessing
module.param
andparam_list
control the number of processes spawned and the name of the parameter passed to the function.For example:
execute(somefunc, param_key='host', param_values=['127.0.0.1', '127.0.1.1', 'localhost'])
will ensure that
somefunc
has the (local) state property 'host' with a value of one of the above when executed.param
andparam_list
are optional, but if one is specified then so must the other.parent process blocks until all child processes have completed. returns a map of execution data with the return values of the individual executions available under 'result'.
when
raise_unhandled_errors
isTrue
(default), the first result that is an exception will be re-raised.Expand source code
def execute(func, param_key=None, param_values=None, raise_unhandled_errors=True): """inspects a given function and then executes it either serially or in another process using Python's `multiprocessing` module. `param` and `param_list` control the number of processes spawned and the name of the parameter passed to the function. For example: execute(somefunc, param_key='host', param_values=['127.0.0.1', '127.0.1.1', 'localhost']) will ensure that `somefunc` has the (local) state property 'host' with a value of one of the above when executed. `param` and `param_list` are optional, but if one is specified then so must the other. parent process blocks until all child processes have completed. returns a map of execution data with the return values of the individual executions available under 'result'. when `raise_unhandled_errors` is `True` (default), the first result that is an exception will be re-raised.""" # in Fabric, `execute` is a guard-type function that ensures the function and the function's environment is # correct before passing it to `_execute` that does the actual magic. # `execute`: https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L372-L401 # `_execute`: https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L213-L277 # Fabric's custom 'JobQueue' adds complexity but can be avoided: # https://github.com/mathiasertl/fabric/blob/master/fabric/job_queue.py if (param_key and param_values is None) or (param_key is None and param_values): raise ValueError( "either a `param_key` AND `param_values` are provided OR neither are provided" ) if param_values is not None and type(param_values) not in [list, tuple, set]: raise ValueError( "given value for `param_values` must be an iterable type, not %r" % type(param_values) ) if param_key is not None and not isinstance(param_key, str): raise ValueError( "given value for `param_key` must be a valid function parameter key" ) if hasattr(func, "parallel") and func.parallel: result_payload_list = _parallel_execution( state.ENV, func, param_key, param_values ) response = [] for result_payload in result_payload_list: if ( isinstance(result_payload["result"], BaseException) and raise_unhandled_errors ): unhandled_error = result_payload["result"] raise unhandled_error response.append(result_payload["result"]) return response return _serial_execution(func, param_key, param_values)
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True)
-
convenience wrapper around
execute()
. callsexecute()
on givenfunc
for each host inhosts
. The host is available within the worker function'senv
ashost_string
.Expand source code
def execute_with_hosts(func, hosts=None, raise_unhandled_errors=True): """convenience wrapper around `execute`. calls `execute` on given `func` for each host in `hosts`. The host is available within the worker function's `env` as `host_string`.""" host_list = hosts or state.ENV.get("hosts") or [] assert isinstance(host_list, list) and host_list, "'hosts' must be a non-empty list" # Fabric may know about many hosts ('all_hosts') but only be acting upon a subset of them ('hosts') # - https://github.com/mathiasertl/fabric/blob/master/sites/docs/usage/env.rst#all_hosts # set here: # - https://github.com/mathiasertl/fabric/blob/master/fabric/tasks.py#L352 # in elife/builder we use a map of host information: # - https://github.com/elifesciences/builder/blob/master/src/buildercore/core.py#L326-L327 # - https://github.com/elifesciences/builder/blob/master/src/buildercore/core.py#L386 # it says 'for informational purposes only' and nothing we use depends on it, so I'm disabling for now # env['all_hosts'] = env['hosts'] results = execute( func, param_key="host_string", param_values=host_list, raise_unhandled_errors=raise_unhandled_errors, ) # results are ordered so we can do this return dict(zip(host_list, results)) # {'192.168.0.1': [], '192.169.0.3': []}
def parallel(func, pool_size=None)
-
Forces the wrapped function to run in parallel, instead of sequentially.
Expand source code
def parallel(func, pool_size=None): """Forces the wrapped function to run in parallel, instead of sequentially.""" wrapped_func = serial(func, pool_size) # `func` *must* be forced to run in parallel to main process wrapped_func.parallel = True return wrapped_func
def process_status(running_p)
-
Expand source code
def process_status(running_p): # https://docs.python.org/2/library/multiprocessing.html#process-and-exceptions result = { "pid": running_p.pid, "name": running_p.name, "exitcode": running_p.exitcode, "alive": running_p.is_alive(), "killed": False, "kill-signal": None, } if running_p.exitcode is not None and running_p.exitcode < 0: result["killed"] = True result["kill-signal"] = -running_p.exitcode return result
def serial(func, pool_size=None)
-
Forces the given function to run
pool_size
times. when pool_size is None (default), executor decides how many instances offunc
to execute (1, probably). if set and executor is given a set of values to use instead,pool_size
is ignoredExpand source code
def serial(func, pool_size=None): """Forces the given function to run `pool_size` times. when pool_size is None (default), executor decides how many instances of `func` to execute (1, probably). if set and executor is given a set of values to use instead, `pool_size` is ignored""" def inner(*args, **kwargs): return func(*args, **kwargs) inner.pool_size = pool_size return inner