Module threadbare.state
Expand source code
import copy
import contextlib
CLEANUP_KEY = "_cleanup"
class FreezeableDict(dict):
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.read_only = False
def update(self, new_dict):
if self.read_only:
raise ValueError(
"dictionary is locked attempting to `update` with %r" % new_dict
)
dict.update(self, new_dict)
def __setitem__(self, key, val):
# I suspect multiprocessing isn't copying the custom 'read_only' attribute back
# from the child process results. be aware of this weirdness
# print("self:", self.__dict__, "internal data:", self)
if hasattr(self, "read_only") and self.read_only:
raise ValueError(
"dictionary is locked attempting to `__setitem__` %r with %r"
% (key, val)
)
dict.__setitem__(self, key, val)
def read_only(d):
if hasattr(d, "read_only"):
d.read_only = True
def read_write(d):
if hasattr(d, "read_only"):
d.read_only = False
def initial_state():
"""returns a new, empty, locked, FreezeableDict instance that is used as the initial `state.ENV` value.
if you are thinking "it would be really convenient if 'some_setting' was 'some_value' by default",
see `set_defaults`."""
new_env = FreezeableDict()
read_only(new_env)
return new_env
ENV = initial_state()
DEPTH = 0 # used to determine how deeply nested we are
def set_defaults(defaults_dict=None):
"""re-initialises the `state.ENV` dictionary with the given defaults.
With no arguments the global state will be reverted to it's initial state (an empty FreezeableDict).
Use `state.set_defaults` BEFORE using ANY other `state.*` functions are called."""
global ENV, DEPTH
if DEPTH != 0:
msg = "refusing to set initial `threadbare.state.ENV` state within a `threadbare.state.settings` context manager."
raise EnvironmentError(msg)
new_env = FreezeableDict()
new_env.update(defaults_dict or {})
read_only(new_env)
ENV = new_env
def cleanup(old_state):
if CLEANUP_KEY in old_state:
for cleanup_fn in old_state[CLEANUP_KEY]:
cleanup_fn()
del old_state[CLEANUP_KEY]
def _add_cleanup(state, fn):
cleanup_fn_list = state.get(CLEANUP_KEY, [])
cleanup_fn_list.append(fn)
state[CLEANUP_KEY] = cleanup_fn_list
def add_cleanup(fn):
"add a function to a list of functions that are called after leaving the current scope of the context manager"
return _add_cleanup(ENV, fn)
@contextlib.contextmanager
def settings(**kwargs):
global DEPTH
state = ENV
if not isinstance(state, dict):
raise TypeError(
"state map must be a dictionary-like object, not %r" % type(state)
)
# deepcopy will attempt to pickle and unpickle all objects in state
# we can't guarantee what will live in state and if it's possible to pickle it or not
# the SSHClient is one such unserialisable object that has had to be subclassed
# another approach would be to relax guarantees that the environment is completely reverted
# call `read_write` here as `deepcopy` copies across attributes (like `read_only`) and
# then values using `__setitem__`, causing errors in FreezeableDict when 'set_defaults' used
read_write(state)
original_values = copy.deepcopy(state)
DEPTH += 1
state.update(kwargs)
# ensure child context processors don't clean up their parents
if CLEANUP_KEY in state:
state.update({CLEANUP_KEY: []})
try:
yield state
finally:
cleanup(state)
state.clear()
state.update(original_values)
DEPTH -= 1
if DEPTH == 0:
# we're leaving the top-most context decorator
# ensure state dictionary is marked as read-only
read_only(state)
Functions
def add_cleanup(fn)
-
add a function to a list of functions that are called after leaving the current scope of the context manager
Expand source code
def add_cleanup(fn): "add a function to a list of functions that are called after leaving the current scope of the context manager" return _add_cleanup(ENV, fn)
def cleanup(old_state)
-
Expand source code
def cleanup(old_state): if CLEANUP_KEY in old_state: for cleanup_fn in old_state[CLEANUP_KEY]: cleanup_fn() del old_state[CLEANUP_KEY]
def initial_state()
-
returns a new, empty, locked, FreezeableDict instance that is used as the initial
state.ENV
value.if you are thinking "it would be really convenient if 'some_setting' was 'some_value' by default", see
set_defaults()
.Expand source code
def initial_state(): """returns a new, empty, locked, FreezeableDict instance that is used as the initial `state.ENV` value. if you are thinking "it would be really convenient if 'some_setting' was 'some_value' by default", see `set_defaults`.""" new_env = FreezeableDict() read_only(new_env) return new_env
def read_only(d)
-
Expand source code
def read_only(d): if hasattr(d, "read_only"): d.read_only = True
def read_write(d)
-
Expand source code
def read_write(d): if hasattr(d, "read_only"): d.read_only = False
def set_defaults(defaults_dict=None)
-
re-initialises the
state.ENV
dictionary with the given defaults. With no arguments the global state will be reverted to it's initial state (an empty FreezeableDict).Use
state.set_defaults
BEFORE using ANY otherstate.*
functions are called.Expand source code
def set_defaults(defaults_dict=None): """re-initialises the `state.ENV` dictionary with the given defaults. With no arguments the global state will be reverted to it's initial state (an empty FreezeableDict). Use `state.set_defaults` BEFORE using ANY other `state.*` functions are called.""" global ENV, DEPTH if DEPTH != 0: msg = "refusing to set initial `threadbare.state.ENV` state within a `threadbare.state.settings` context manager." raise EnvironmentError(msg) new_env = FreezeableDict() new_env.update(defaults_dict or {}) read_only(new_env) ENV = new_env
def settings(**kwargs)
-
Expand source code
@contextlib.contextmanager def settings(**kwargs): global DEPTH state = ENV if not isinstance(state, dict): raise TypeError( "state map must be a dictionary-like object, not %r" % type(state) ) # deepcopy will attempt to pickle and unpickle all objects in state # we can't guarantee what will live in state and if it's possible to pickle it or not # the SSHClient is one such unserialisable object that has had to be subclassed # another approach would be to relax guarantees that the environment is completely reverted # call `read_write` here as `deepcopy` copies across attributes (like `read_only`) and # then values using `__setitem__`, causing errors in FreezeableDict when 'set_defaults' used read_write(state) original_values = copy.deepcopy(state) DEPTH += 1 state.update(kwargs) # ensure child context processors don't clean up their parents if CLEANUP_KEY in state: state.update({CLEANUP_KEY: []}) try: yield state finally: cleanup(state) state.clear() state.update(original_values) DEPTH -= 1 if DEPTH == 0: # we're leaving the top-most context decorator # ensure state dictionary is marked as read-only read_only(state)
Classes
class FreezeableDict (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Expand source code
class FreezeableDict(dict): def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs) self.read_only = False def update(self, new_dict): if self.read_only: raise ValueError( "dictionary is locked attempting to `update` with %r" % new_dict ) dict.update(self, new_dict) def __setitem__(self, key, val): # I suspect multiprocessing isn't copying the custom 'read_only' attribute back # from the child process results. be aware of this weirdness # print("self:", self.__dict__, "internal data:", self) if hasattr(self, "read_only") and self.read_only: raise ValueError( "dictionary is locked attempting to `__setitem__` %r with %r" % (key, val) ) dict.__setitem__(self, key, val)
Ancestors
- builtins.dict
Methods
def update(self, new_dict)
-
D.update([E, ]**F) -> None. Update D from dict/iterable E and F. If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
Expand source code
def update(self, new_dict): if self.read_only: raise ValueError( "dictionary is locked attempting to `update` with %r" % new_dict ) dict.update(self, new_dict)