Module threadbare.operations
Expand source code
from functools import wraps, partial
from datetime import datetime
import tempfile
import contextlib
import subprocess
from threading import Timer
import getpass
import pssh.exceptions
import os, sys
from pssh.clients.native import SSHClient as PSSHClient
import gevent
import io
import logging
from . import state
from .common import (
PromptedException,
merge,
subdict,
rename,
cwd,
sudo_wrap_command,
cwd_wrap_command,
shell_wrap_command,
ensure,
)
LOG = logging.getLogger(__name__)
class SSHClient(PSSHClient):
def __deepcopy__(self, memo):
# do not copy.deepcopy ourselves or the pssh SSHClient object, just
# return a reference to the object (self)
# - https://docs.python.org/3/library/copy.html
return self
class NetworkError(Exception):
"generic 'died while doing something network-related' catch-all exception class."
pass
class WrappedNetworkError(NetworkError):
"groups several exceptions into a single WrappedNetworkError"
def __init__(self, exc):
self.wrapped = exc
def pem_key():
"""returns the first private key found in a list of common private keys.
if none of the keys exist, the default (first) key will be returned."""
id_list = ["id_rsa", "id_dsa", "identity", "id_ecdsa"]
id_list = [os.path.expanduser("~/.ssh/" + idstr) for idstr in id_list]
for id_path in id_list:
if os.path.isfile(id_path):
return id_path
LOG.debug("key not found: %s" % id_path)
default = id_list[0]
return default
def handle(base_kwargs, kwargs):
"""handles the merging of the base set of function keyword arguments and their possible overrides.
`base_kwargs` is a map of the function's keyword arguments and their defaults.
`kwargs` are the keyword arguments used when executing the function.
the keys from `base_kwargs` are used to determine which keys to extract from the `kwargs` and any
global settings.
returns a triple of (`global_kwargs`, `user_kwargs`, `final_kwargs`) where
`global_kwargs` is the subset of keyword arguments extracted from `state.env`,
`user_kwargs` is the subset of keyword arguments extracted from the given kwargs and
`final_kwargs` is the result of merging `base_kwargs` <- `global_kwargs` <- `user_kwargs`
'user' keyword arguments that are explicitly passed in take precedence over all others and
'global' keyword arguments take precedence over the function's defaults kwargs."""
key_list = base_kwargs.keys()
global_kwargs = subdict(state.ENV, key_list)
user_kwargs = subdict(kwargs, key_list)
final_kwargs = merge(base_kwargs, global_kwargs, user_kwargs)
return global_kwargs, user_kwargs, final_kwargs
# api
@contextlib.contextmanager
def lcd(local_dir):
"temporarily changes the local working directory"
ensure(os.path.isdir(local_dir), "not a directory: %s" % local_dir)
with state.settings():
current_dir = cwd()
state.add_cleanup(lambda: os.chdir(current_dir))
os.chdir(local_dir)
yield
@contextlib.contextmanager
def rcd(remote_working_dir):
"ensures all commands run are done from the given remote directory. if remote directory doesn't exist, command will not be run"
with state.settings(remote_working_dir=remote_working_dir):
yield
@contextlib.contextmanager
def hide(what=None):
"hides *all* output, regardless of `what` type of output is to be hidden."
with state.settings(quiet=True):
yield
def _ssh_default_settings():
"default settings for dealing with ssh."
return {
# current user. sensible default but probably not what you want
"user": getpass.getuser(),
"host_string": None,
# looks for the same ~4 possible keys as Fabric and ParallelSSH.
# uses the first one it finds or the most common if none found.
"key_filename": pem_key(),
"port": 22,
"use_shell": True,
"use_sudo": False,
"combine_stderr": True,
"quiet": False,
"remote_working_dir": None,
"timeout": None,
"warn_only": False, # https://github.com/mathiasertl/fabric/blob/master/fabric/state.py#L301-L305
"abort_exception": RuntimeError,
}
def _ssh_client(**kwargs):
"""returns an instance of pssh.clients.native.SSHClient
if within a state context, looks for a client already in use and returns that if found.
if not found, creates a new one and stores it for later use."""
# parameters we're interested in and their default values
base_kwargs = subdict(
_ssh_default_settings(), ["user", "host_string", "key_filename", "port"]
)
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
final_kwargs["password"] = None # always private keys
rename(final_kwargs, [("key_filename", "pkey"), ("host_string", "host")])
# if we're not using global state, return the new client as-is
env = state.ENV
if env.read_only:
return SSHClient(**final_kwargs)
client_map_key = "ssh_client"
client_key = subdict(final_kwargs, ["user", "host", "pkey", "port", "timeout"])
client_key = tuple(sorted(client_key.items()))
# otherwise, check to see if a previous client is available for this host
client_map = env.get(client_map_key, {})
if client_key in client_map:
return client_map[client_key]
# if not, create a new one and store it in the state
# https://parallel-ssh.readthedocs.io/en/latest/native_single.html#pssh.clients.native.single.SSHClient
client = SSHClient(**final_kwargs)
# disconnect session when leaving context manager
state.add_cleanup(lambda: client.disconnect())
client_map[client_key] = client
env[client_map_key] = client_map
return client
def _execute(command, user, key_filename, host_string, port, use_pty, timeout):
"""creates an SSHClient object and executes given `command` with the given parameters."""
client = _ssh_client(
user=user, host_string=host_string, key_filename=key_filename, port=port
)
shell = False # handled ourselves
sudo = False # handled ourselves
user = None # user to sudo to
encoding = "utf-8" # used everywhere
# https://parallel-ssh.readthedocs.io/en/latest/native_single.html#pssh.clients.native.single.SSHClient.run_command
# https://github.com/ParallelSSH/parallel-ssh/blob/master/pssh/output.py
host_output = client.run_command(
command, sudo, user, use_pty, shell, encoding, timeout
)
host_string = host_output.host
stdout = host_output.stdout
stderr = host_output.stderr
def get_exit_code():
client.wait_finished(host_output)
return host_output.exit_code
return {
# defer executing as it consumes output entirely before returning. this
# removes our chance to display/transform output as it is streamed to us
"return_code": get_exit_code,
"command": command,
"stdout": stdout,
"stderr": stderr,
}
def _print_line(output_pipe, line, **kwargs):
"""writes the given `line` (string) to the given `output_pipe` (file-like object)
if `quiet` is True, `line` is *not* written to `output_pipe`.
if `discard_output` is True, `line` is *not* returned and output does *not* accumulate in memory."""
base_kwargs = {
"discard_output": False,
"quiet": False,
"line_template": "[{host}] {pipe}: {line}\n", # "1.2.3.4 err: Foo not found\n"
"display_prefix": True, # strips everything in `line_template` before "{line}"
"custom_pipe": None,
}
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
if not final_kwargs["quiet"]:
# useful values that can be part of the template
pipe_type = "err" if output_pipe == sys.stderr else "out"
if final_kwargs["custom_pipe"]:
pipe_type = final_kwargs["custom_pipe"] # like "run"
dt = datetime.now()
template_kwargs = {
"line": line,
"year": dt.year,
"month": dt.month,
"day": dt.day,
"hour": dt.hour,
"minute": dt.minute,
"second": dt.second,
"ms": dt.microsecond,
"host": state.ENV.get("host_string", ""),
"pipe": pipe_type,
}
# render template and write to given pipe
template = final_kwargs["line_template"]
if not final_kwargs["display_prefix"]:
try:
template = template[template.index("{line}") :]
except ValueError: # "substring not found"
msg = "'display_prefix' option ignored: '{line}' not found in 'line_template' setting"
LOG.warning(msg)
output_pipe.write(template.format(**template_kwargs))
if not final_kwargs["discard_output"]:
return line # free of any formatting
def _process_output(output_pipe, result_buffer, **kwargs):
"calls `_print_line` on each result in `result_list`."
# always process the results as soon as we have them
# use `quiet=True` to hide the printing of output to stdout/stderr
# use `discard_output=True` to discard the results as soon as they are read.
# `stderr` results may be empty if `combine_stderr` in call to `remote` was `True`
new_results = [_print_line(output_pipe, line, **kwargs) for line in result_buffer]
output_pipe.flush()
if "discard_output" in kwargs and not kwargs["discard_output"]:
return new_results
def _print_running(command, output_pipe, **kwargs):
"""Prints the command to be run on a line of output prior to executing a command.
Obeys the formatting and rules of the context in which the command is being exected.
Deprecated. This is to mimic Fabric's command output until we're sure nothing depends on it.
It will be replaced with a standard LOG.info output eventually."""
keepers = ["display_running", "quiet", "discard_output", "line_template"]
kwargs = subdict(kwargs, keepers)
if kwargs["display_running"]:
if not isinstance(command, list):
command = [command]
command = " ".join(command)
return _print_line(output_pipe, command, custom_pipe="run", **kwargs)
def abort(result, err_msg, **kwargs):
"""raises a `RuntimeError` with the given `err_msg` and the given `result` attached to it's `.result` property.
issues a warning and returns the given `result` if `settings.warn_only` is `True`.
raises a SystemExit with a return code of `1` if `settings.abort_exception` is set to None."""
base_kwargs = {
"quiet": False,
"warn_only": False,
"display_aborts": True,
"abort_exception": RuntimeError,
}
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
if final_kwargs["warn_only"]:
if not final_kwargs["quiet"]:
LOG.warning(err_msg)
return result
if final_kwargs["display_aborts"]:
if not final_kwargs["quiet"]:
LOG.error("Fatal error: %s" % err_msg)
abort_exc_klass = final_kwargs["abort_exception"]
if abort_exc_klass:
exc = abort_exc_klass(err_msg)
setattr(exc, "result", result)
raise exc
# https://docs.python.org/3/library/exceptions.html#SystemExit
# # https://github.com/mathiasertl/fabric/blob/master/fabric/utils.py#L30-L63
exc = SystemExit(1)
exc.message = err_msg
raise exc
# https://github.com/mathiasertl/fabric/blob/master/fabric/state.py#L338
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L898-L901
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L975
def remote(command, **kwargs):
"preprocesses given `command` and options before sending it to `_execute` to be executed on remote host"
# Fabric function signature for `run`
# shell=True # done
# pty=True # mutually exclusive with `combine_stderr` in pssh. not sure how Fabric/Paramiko is doing it
# combine_stderr=None # mutually exclusive with use_pty. 'True' in global env.
# quiet=False, # done
# warn_only=False # done
# stdout=None # done, stdout/stderr always available unless explicitly discarded. 'see discard_output'
# stderr=None # done, stderr not available when combine_stderr is `True`
# timeout=None # done
# shell_escape=None # ignored. shell commands are always escaped
# capture_buffer_size=None # correlates to `ssh2.channel.read` and the `size` parameter. Ignored.
# parameters we're interested in and their default values
base_kwargs = _ssh_default_settings()
base_kwargs.update({"display_running": True, "discard_output": False})
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
# wrap the command up
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L920-L925
if final_kwargs["remote_working_dir"]:
command = cwd_wrap_command(command, final_kwargs["remote_working_dir"])
if final_kwargs["use_shell"]:
command = shell_wrap_command(command)
if final_kwargs["use_sudo"]:
command = sudo_wrap_command(command)
# if use_pty is True, stdout and stderr are combined and stderr will yield nothing.
# - https://parallel-ssh.readthedocs.io/en/latest/advanced.html#combined-stdout-stderr
use_pty = final_kwargs["combine_stderr"]
# values `remote` specifically passes to `_execute`
execute_kwargs = {"command": command, "use_pty": use_pty}
execute_kwargs = merge(final_kwargs, execute_kwargs)
execute_kwargs = subdict(
execute_kwargs,
[
"command",
"user",
"key_filename",
"host_string",
"port",
"use_pty",
"timeout",
],
)
# TODO: validate `_execute`s args. `host_string` can't be None for example
# run command
_print_running(command, sys.stdout, **final_kwargs)
result = _execute(**execute_kwargs)
# handle stdout/stderr streams
output_kwargs = subdict(final_kwargs, ["quiet", "discard_output"])
stdout = _process_output(sys.stdout, result["stdout"], **output_kwargs)
stderr = _process_output(sys.stderr, result["stderr"], **output_kwargs)
# command must have finished before we have access to return code
return_code = result["return_code"]()
result.update(
{
"stdout": stdout,
"stderr": stderr,
"return_code": return_code,
"failed": return_code > 0,
"succeeded": return_code == 0,
}
)
if result["succeeded"]:
return result
err_msg = "remote() encountered an error (return code %s) while executing %r" % (
result["return_code"],
command,
)
# if `warn_only` is True this function may still return a result
return abort(result, err_msg, **final_kwargs)
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L1100
def remote_sudo(command, **kwargs):
"exactly the same as `remote`, but the given command is run as the root user"
# user=None # ignore
# group=None # ignore
kwargs["use_sudo"] = True
return remote(command, **kwargs)
# https://github.com/mathiasertl/fabric/blob/master/fabric/contrib/files.py#L15
def remote_file_exists(path, **kwargs):
"returns True if given path exists on remote system"
# note: Fabric is doing something weird and clever here:
# - https://github.com/mathiasertl/fabric/blob/master/fabric/contrib/files.py#L474-L485
# but their examples don't work:
# $ /bin/sh
# sh-5.0$ foo="$(echo /usr/\*/share)"
# sh-5.0$ echo $foo
# /usr/*/share
# sh-5.0$ exit
# $ echo $SHELL
# $ /bin/bash
# $ foo="$(echo /usr/\*/share)"
# $ echo $foo
# /usr/*/share
# TODO: revisit
# update 2020/01: it does work, I just had no "/usr/[anything]/share" directories.
# this works for me:
# foo=$(echo /\*/share/)
# echo $foo
# /usr/share/
base_kwargs = {
"use_sudo": False,
}
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
# do not raise an exception if remote file doesn't exist
final_kwargs["warn_only"] = True
remote_fn = remote_sudo if final_kwargs["use_sudo"] else remote
command = "test -e %s" % path
return remote_fn(command, **final_kwargs)["return_code"] == 0
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L1157
def local(command, **kwargs):
"preprocesses given `command` and options before executing it locally using Python's `subprocess.Popen`"
base_kwargs = {
"use_sudo": False,
"use_shell": True,
"combine_stderr": True,
"capture": False,
"timeout": None,
"quiet": False,
"display_running": True,
"warn_only": False, # https://github.com/mathiasertl/fabric/blob/master/fabric/state.py#L301-L305
"abort_exception": RuntimeError,
}
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
if final_kwargs["capture"]:
if final_kwargs["combine_stderr"]:
out_stream = subprocess.PIPE
err_stream = subprocess.STDOUT
else:
out_stream = subprocess.PIPE
err_stream = subprocess.PIPE
else:
if final_kwargs["quiet"]:
# we're not capturing and we've been told to be quiet
# send everything to /dev/null
out_stream = subprocess.DEVNULL
err_stream = subprocess.DEVNULL
else:
out_stream = None
err_stream = None
if not final_kwargs["use_shell"] and not isinstance(command, list):
raise ValueError("when shell=False, given command *must* be a list")
if final_kwargs["use_shell"]:
command = shell_wrap_command(command)
if final_kwargs["use_sudo"]:
if final_kwargs["use_shell"]:
command = sudo_wrap_command(command)
else:
# lsh@2020-04: is this a good enough sudo command?
# nothing uses local+noshell+sudo (at time of writing)
command = ["sudo", "--non-interactive"] + command
proc = subprocess.Popen(
command, shell=final_kwargs["use_shell"], stdout=out_stream, stderr=err_stream
)
_print_running(command, sys.stdout, **final_kwargs)
if final_kwargs["timeout"]:
timer = Timer(final_kwargs["timeout"], proc.kill)
try:
timer.start() # proximity matters
stdout, stderr = proc.communicate()
finally:
timer.cancel()
else:
stdout, stderr = proc.communicate()
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L1240-L1244
result = {
"return_code": proc.returncode,
"failed": proc.returncode != 0,
"succeeded": proc.returncode == 0,
"command": command,
"stdout": (stdout or b"").decode("utf-8").splitlines(),
"stderr": (stderr or b"").decode("utf-8").splitlines(),
}
if result["succeeded"]:
return result
err_msg = "local() encountered an error (return code %s) while executing %r" % (
result["return_code"],
command,
)
# if `warn_only` is True this function may still return a result
return abort(result, err_msg, **final_kwargs)
def single_command(cmd_list):
"given a list of commands to run, returns a single command."
# `remote` and `local` will do any escaping as necessary
if cmd_list in [None, []]:
return None
return " && ".join(map(str, cmd_list))
def prompt(msg):
"""issues a prompt for input.
raises a `PromptedException` if `abort_on_prompts` in `state.ENV` is `True` or executing within
another process using `execute.parallel` where input can't be supplied.
if `abort_exception` is set in `state.ENV`, then that exception is raised instead"""
if state.ENV.get("abort_on_prompts", False):
abort_ex = state.ENV.get("abort_exception", PromptedException)
raise abort_ex("prompted with: %s" % (msg,))
print(msg)
try:
return raw_input("> ")
except NameError:
return input("> ")
#
# uploads and downloads
#
def execute_rsync_command(cmd):
"""executes given rsync `cmd`, catching rsync errors and improving any errors raised.
rsync commands can be generated with `_rsync_upload` and `_rsync_download` functions.
"""
try:
return local(cmd)
except Exception as uncaught_exc:
if hasattr(uncaught_exc, "result"):
# this is a threadbare error and we may be able to improve it
result = uncaught_exc.result
# taken straight from the `man` page, authored "28 Jan 2018"
error_map = {
1: "Syntax or usage error",
2: "Protocol incompatibility",
3: "Errors selecting input/output files, dirs",
4: "Requested action not supported: an attempt was made to manipulate 64-bit files on a platform that cannot support them; or an option was specified that is supported by the client and not by the server.",
5: "Error starting client-server protocol",
6: "Daemon unable to append to log-file",
10: "Error in socket I/O",
11: "Error in file I/O",
12: "Error in rsync protocol data stream",
13: "Errors with program diagnostics",
14: "Error in IPC code",
20: "Received SIGUSR1 or SIGINT",
21: "Some error returned by waitpid()",
22: "Error allocating core memory buffers",
23: "Partial transfer due to error",
24: "Partial transfer due to vanished source files",
25: "The --max-delete limit stopped deletions",
30: "Timeout in data send/receive",
35: "Timeout waiting for daemon connection",
}
if result["return_code"] in error_map:
raise NetworkError(
"rsync returned error %s: %s"
% (result["return_code"], error_map[result["return_code"]])
)
raise uncaught_exc
def _rsync_upload(local_path, remote_path, **kwargs):
"""generates an rsync command to copy `local_path` to `remote_path` using values in the current `state.ENV`.
does *not* execute command. see `rsync_upload` and `execute_rsync_command`."""
base_kwargs = subdict(
_ssh_default_settings(), ["user", "host_string", "key_filename", "port"]
)
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
cmd = [
"rsync",
# '-i' is 'identity file'
# note: without 'StrictHostKeyChecking' we'll be given a prompt during testing. is this solvable?
"--rsh='ssh -i %s -p %s -o StrictHostKeyChecking=no'"
% (final_kwargs["key_filename"], final_kwargs["port"]),
local_path,
"%s@%s:%s" % (final_kwargs["user"], final_kwargs["host_string"], remote_path),
]
return " ".join(cmd)
def rsync_upload(local_path, remote_path, **kwargs):
"copies `local_path` to `remote_path` using values in the current `state.ENV`."
remote_dir = os.path.dirname(remote_path)
if not remote_file_exists(remote_dir):
remote("mkdir -p %r" % remote_dir)
return execute_rsync_command(_rsync_upload(local_path, remote_path, **kwargs))
def _rsync_download(remote_path, local_path, **kwargs):
"""generates an rsync command to copy `remote_path` to `local_path` using values in the current `state.ENV`.
does *not* execute command. see `rsync_download` and `execute_rsync_command`."""
base_kwargs = subdict(
_ssh_default_settings(), ["user", "host_string", "key_filename", "port"]
)
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
cmd = [
"rsync",
# '-i' is 'identity file'
# without 'StrictHostKeyChecking' we'll be given a prompt during testing.
"--rsh='ssh -i %s -p %s -o StrictHostKeyChecking=no'"
% (final_kwargs["key_filename"], final_kwargs["port"]),
"%s@%s:%s" % (final_kwargs["user"], final_kwargs["host_string"], remote_path),
local_path,
]
return " ".join(cmd)
def rsync_download(remote_path, local_path, **kwargs):
"copies `remote_path` to `local_path` using values in the current `state.ENV`."
abs_local_path = os.path.abspath(os.path.expanduser(local_path))
abs_local_dir = os.path.dirname(abs_local_path)
if not os.path.exists(abs_local_dir):
# replicates behaviour of downloading via scp and sftp (via parallel-ssh)
local("mkdir -p %r" % (abs_local_dir,))
return execute_rsync_command(_rsync_download(remote_path, local_path, **kwargs))
def _transfer_fn(client, direction, **kwargs):
"""returns the `client` object's appropriate transfer *method* given a `direction`.
`direction` is either 'upload' or 'download'.
Also accepts the `transfer_protocol` keyword parameter that is either 'rsync' (default), 'scp' or 'sftp'."""
base_kwargs = {
"overwrite": True,
# sftp is *exceptionally* slow.
# Paramiko's implementation is faster than native SFTP but slower than SCP:
# - https://github.com/ParallelSSH/parallel-ssh/issues/177
# however, SCP is buggy and may randomly hang or complete without uploading anything.
# take slow and reliable over fast and buggy.
"transfer_protocol": "rsync", # "sftp", # "scp"
}
global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs)
def upload_fn(fn):
@wraps(fn)
def wrapper(local_file, remote_file):
if remote_file_exists(remote_file) and not final_kwargs["overwrite"]:
raise NetworkError(
"Remote file exists and 'overwrite' is set to 'False'. Refusing to write: %s"
% (remote_file,)
)
if final_kwargs["transfer_protocol"] == "rsync":
fn(local_file, remote_file)
else:
# https://github.com/ParallelSSH/parallel-ssh/blob/8b7bb4bcb94d913c3b7da77db592f84486c53b90/pssh/clients/native/parallel.py#L524
g = fn(local_file, remote_file)
if g:
gevent.joinall(g, raise_error=True)
# lsh@2020-04, local testing didn't reveal anything but small files uploaded via SCP SCP during CI
# were either missing or had empty bodies. SFTP seemed to be fine.
# This sanity check seems to fix the issue (lending more credence to my theory it's an unflushed buffer somewhere),
# when waiting 3 seconds between upload of file and check of file was still failing.
ensure(
remote_file_exists(remote_file, **kwargs),
"failed to upload file, remote file does not exist: %s"
% (remote_file,),
)
return wrapper
def download_fn(fn):
@wraps(fn)
def wrapper(remote_file, local_file):
if not final_kwargs["overwrite"] and os.path.exists(local_file):
raise NetworkError(
"Local file exists and 'overwrite' is set to 'False'. Refusing to write: %s"
% (local_file,)
)
if final_kwargs["transfer_protocol"] == "rsync":
fn(remote_file, local_file)
else:
# https://github.com/ParallelSSH/parallel-ssh/blob/d812ff32d828009ddb94f458fe43920c22df4c0e/pssh/clients/native/single.py#L558
g = fn(remote_file, local_file)
if g:
gevent.joinall(g, raise_error=True)
return wrapper
upload_backends = {
"sftp": partial(client.copy_file, recurse=True),
"scp": partial(client.scp_send, recurse=True),
"rsync": rsync_upload,
}
download_backends = {
"sftp": client.copy_remote_file,
"scp": client.scp_recv,
"rsync": rsync_download,
}
direction_map = {"upload": upload_backends, "download": download_backends}
ensure(
direction in direction_map,
"you can 'upload' or 'download' but not %r" % (direction,),
)
backend_map = direction_map[direction]
transfer_protocol = final_kwargs["transfer_protocol"]
ensure(
transfer_protocol in backend_map,
"unhandled transfer protocol %r; supported protocols: %s"
% (transfer_protocol, ", ".join(backend_map.keys())),
)
transfer_fn = direction_map[direction][transfer_protocol]
direction_wrapper_map = {"upload": upload_fn, "download": download_fn}
wrapper_fn = direction_wrapper_map[direction]
return wrapper_fn(transfer_fn)
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L419
# use_sudo hack: https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L453-L458
def _download_as_root_hack(remote_path, local_path, **kwargs):
"""as root, creates a temporary copy of the file that can be downloaded by a
regular user and then removes the temporary file.
warning: don't try to download anything huge `with_sudo` as the file is duplicated.
warning: the privileged file will be available in /tmp until the download is complete"""
if not remote_file_exists(remote_path, use_sudo=True, **kwargs):
raise EnvironmentError("remote file does not exist: %s" % (remote_path,))
client = _ssh_client(**kwargs)
cmd = single_command(
[
# create a temporary file with the suffix '-threadbare'
'tempfile=$(mktemp --suffix "-threadbare")',
# copy the target file to this temporary file
'cp "%s" "$tempfile"' % remote_path,
# ensure it's readable by the user doing the downloading
'chmod +r "$tempfile"',
# emit the name of the temporary file so we can find it to download it
'echo "$tempfile"',
]
)
result = remote_sudo(cmd, **kwargs)
remote_tempfile = result["stdout"][-1]
remote_path = remote_tempfile
transfer_fn = _transfer_fn(client, "download", **kwargs)
try:
transfer_fn(remote_tempfile, local_path)
return local_path
except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc:
# permissions or network issues may cause these
raise WrappedNetworkError(exc)
finally:
remote_sudo('rm -f "%s"' % remote_tempfile, **kwargs)
# https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L419
# use_sudo hack: https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L453-L458
def download(remote_path, local_path, use_sudo=False, **kwargs):
"""downloads file at `remote_path` to `local_path`, overwriting the local path if it exists.
avoid `use_sudo` if at all possible"""
with state.settings(quiet=True):
if remote_path.endswith("/"):
raise ValueError("directory downloads are not supported")
# do not raise an exception if remote path is a directory
result = remote(
'test -d "%s"' % remote_path, use_sudo=use_sudo, warn_only=True, quiet=True
)
remote_path_is_dir = result["succeeded"]
if remote_path_is_dir:
raise ValueError("directory downloads are not supported")
temp_file, data_buffer = None, None
if hasattr(local_path, "read"):
# given a file-like object to download file into.
# 1. write the remote file to local temporary file
# 2. read temporary file into the given buffer
# 3. delete the temporary file
data_buffer = local_path
temp_file, local_path = tempfile.mkstemp(suffix="-threadbare")
if not os.path.isabs(local_path):
local_path = os.path.abspath(local_path)
if os.path.isdir(local_path):
local_path = os.path.join(local_path, os.path.basename(remote_path))
if use_sudo:
local_path = _download_as_root_hack(remote_path, local_path, **kwargs)
else:
if not remote_file_exists(remote_path, **kwargs):
raise EnvironmentError(
"remote file does not exist: %s" % (remote_path,)
)
client = _ssh_client(**kwargs)
transfer_fn = _transfer_fn(client, "download", **kwargs)
try:
transfer_fn(remote_path, local_path)
except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc:
# permissions or network issues may cause these
raise WrappedNetworkError(exc)
if temp_file:
flags = "r" if isinstance(data_buffer, io.StringIO) else "rb"
with open(local_path, flags) as fh:
data = fh.read()
data_buffer.write(data)
# deletes the *temporary file*. `temp_file` is a file descriptor
os.unlink(local_path)
return data_buffer
return local_path
def _upload_as_root_hack(local_path, remote_path, **kwargs):
"""uploads file at `local_path` to a remote temporary file then moves the file to `remote_path` as root.
does not alter any permissions or attributes on the file"""
client = _ssh_client(**kwargs)
cmd = single_command(
[
# create a temporary file with the suffix '-threadbare'
'tempfile=$(mktemp --suffix "-threadbare")',
'echo "$tempfile"',
]
)
result = remote(cmd, **kwargs)
remote_temp_path = result["stdout"][-1]
ensure(
remote_file_exists(remote_temp_path, **kwargs),
"remote temporary file %r (%s) does not exist"
% (remote_temp_path, remote_path),
)
transfer_fn = _transfer_fn(client, "upload", **kwargs)
try:
transfer_fn(local_path, remote_temp_path)
move_file_into_place = 'mv "%s" "%s"' % (remote_temp_path, remote_path)
remote_sudo(move_file_into_place, **kwargs)
ensure(
remote_file_exists(remote_path, use_sudo=True, **kwargs),
"remote path does not exist: %s" % (remote_path),
)
except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc:
# permissions or network issues may cause these
raise WrappedNetworkError(exc)
def _write_bytes_to_temporary_file(local_path):
"""if `local_path` is a file-like object, write the contents to an *actual* file and
return a pair of new local filename and a function that removes the temporary file when called."""
if hasattr(local_path, "read"):
# `local_path` is a file-like object
local_bytes = local_path
local_bytes.seek(0) # reset internal pointer
temp_file, local_path = tempfile.mkstemp(suffix="-threadbare")
with os.fdopen(temp_file, "wb") as fh:
data = local_bytes.getvalue()
# data may be a string or it may be bytes.
# if it's a string we assume it's a UTF-8 string.
if isinstance(data, str):
data = bytes(data, "utf-8")
fh.write(data)
cleanup = lambda: os.unlink(local_path)
return local_path, cleanup
return local_path, None
def upload(local_path, remote_path, use_sudo=False, **kwargs):
"uploads file at `local_path` to the given `remote_path`, overwriting anything that may be at that path"
# todo: this setting is dubious, don't count on it hanging around
with state.settings(quiet=True):
# bytes handling
local_path, cleanup_fn = _write_bytes_to_temporary_file(local_path)
if cleanup_fn:
state.add_cleanup(cleanup_fn)
if os.path.isdir(local_path):
raise ValueError("folders cannot be uploaded")
if use_sudo:
return _upload_as_root_hack(local_path, remote_path, **kwargs)
if not os.path.exists(local_path):
raise EnvironmentError("local file does not exist: %s" % (local_path,))
client = _ssh_client(**kwargs)
try:
transfer_fn = _transfer_fn(client, "upload", **kwargs)
transfer_fn(local_path, remote_path)
except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc:
# permissions or network issues may cause these
raise WrappedNetworkError(exc)
Functions
def abort(result, err_msg, **kwargs)
-
raises a
RuntimeError
with the givenerr_msg
and the givenresult
attached to it's.result
property. issues a warning and returns the givenresult
ifsettings.warn_only
isTrue
. raises a SystemExit with a return code of1
ifsettings.abort_exception
is set to None.Expand source code
def abort(result, err_msg, **kwargs): """raises a `RuntimeError` with the given `err_msg` and the given `result` attached to it's `.result` property. issues a warning and returns the given `result` if `settings.warn_only` is `True`. raises a SystemExit with a return code of `1` if `settings.abort_exception` is set to None.""" base_kwargs = { "quiet": False, "warn_only": False, "display_aborts": True, "abort_exception": RuntimeError, } global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs) if final_kwargs["warn_only"]: if not final_kwargs["quiet"]: LOG.warning(err_msg) return result if final_kwargs["display_aborts"]: if not final_kwargs["quiet"]: LOG.error("Fatal error: %s" % err_msg) abort_exc_klass = final_kwargs["abort_exception"] if abort_exc_klass: exc = abort_exc_klass(err_msg) setattr(exc, "result", result) raise exc # https://docs.python.org/3/library/exceptions.html#SystemExit # # https://github.com/mathiasertl/fabric/blob/master/fabric/utils.py#L30-L63 exc = SystemExit(1) exc.message = err_msg raise exc
def download(remote_path, local_path, use_sudo=False, **kwargs)
-
downloads file at
remote_path
tolocal_path
, overwriting the local path if it exists. avoiduse_sudo
if at all possibleExpand source code
def download(remote_path, local_path, use_sudo=False, **kwargs): """downloads file at `remote_path` to `local_path`, overwriting the local path if it exists. avoid `use_sudo` if at all possible""" with state.settings(quiet=True): if remote_path.endswith("/"): raise ValueError("directory downloads are not supported") # do not raise an exception if remote path is a directory result = remote( 'test -d "%s"' % remote_path, use_sudo=use_sudo, warn_only=True, quiet=True ) remote_path_is_dir = result["succeeded"] if remote_path_is_dir: raise ValueError("directory downloads are not supported") temp_file, data_buffer = None, None if hasattr(local_path, "read"): # given a file-like object to download file into. # 1. write the remote file to local temporary file # 2. read temporary file into the given buffer # 3. delete the temporary file data_buffer = local_path temp_file, local_path = tempfile.mkstemp(suffix="-threadbare") if not os.path.isabs(local_path): local_path = os.path.abspath(local_path) if os.path.isdir(local_path): local_path = os.path.join(local_path, os.path.basename(remote_path)) if use_sudo: local_path = _download_as_root_hack(remote_path, local_path, **kwargs) else: if not remote_file_exists(remote_path, **kwargs): raise EnvironmentError( "remote file does not exist: %s" % (remote_path,) ) client = _ssh_client(**kwargs) transfer_fn = _transfer_fn(client, "download", **kwargs) try: transfer_fn(remote_path, local_path) except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc: # permissions or network issues may cause these raise WrappedNetworkError(exc) if temp_file: flags = "r" if isinstance(data_buffer, io.StringIO) else "rb" with open(local_path, flags) as fh: data = fh.read() data_buffer.write(data) # deletes the *temporary file*. `temp_file` is a file descriptor os.unlink(local_path) return data_buffer return local_path
def execute_rsync_command(cmd)
-
executes given rsync
cmd
, catching rsync errors and improving any errors raised. rsync commands can be generated with_rsync_upload
and_rsync_download
functions.Expand source code
def execute_rsync_command(cmd): """executes given rsync `cmd`, catching rsync errors and improving any errors raised. rsync commands can be generated with `_rsync_upload` and `_rsync_download` functions. """ try: return local(cmd) except Exception as uncaught_exc: if hasattr(uncaught_exc, "result"): # this is a threadbare error and we may be able to improve it result = uncaught_exc.result # taken straight from the `man` page, authored "28 Jan 2018" error_map = { 1: "Syntax or usage error", 2: "Protocol incompatibility", 3: "Errors selecting input/output files, dirs", 4: "Requested action not supported: an attempt was made to manipulate 64-bit files on a platform that cannot support them; or an option was specified that is supported by the client and not by the server.", 5: "Error starting client-server protocol", 6: "Daemon unable to append to log-file", 10: "Error in socket I/O", 11: "Error in file I/O", 12: "Error in rsync protocol data stream", 13: "Errors with program diagnostics", 14: "Error in IPC code", 20: "Received SIGUSR1 or SIGINT", 21: "Some error returned by waitpid()", 22: "Error allocating core memory buffers", 23: "Partial transfer due to error", 24: "Partial transfer due to vanished source files", 25: "The --max-delete limit stopped deletions", 30: "Timeout in data send/receive", 35: "Timeout waiting for daemon connection", } if result["return_code"] in error_map: raise NetworkError( "rsync returned error %s: %s" % (result["return_code"], error_map[result["return_code"]]) ) raise uncaught_exc
def handle(base_kwargs, kwargs)
-
handles the merging of the base set of function keyword arguments and their possible overrides.
base_kwargs
is a map of the function's keyword arguments and their defaults.kwargs
are the keyword arguments used when executing the function.the keys from
base_kwargs
are used to determine which keys to extract from thekwargs
and any global settings.returns a triple of (
global_kwargs
,user_kwargs
,final_kwargs
) whereglobal_kwargs
is the subset of keyword arguments extracted fromstate.env
,user_kwargs
is the subset of keyword arguments extracted from the given kwargs andfinal_kwargs
is the result of mergingbase_kwargs
<-global_kwargs
<-user_kwargs
'user' keyword arguments that are explicitly passed in take precedence over all others and 'global' keyword arguments take precedence over the function's defaults kwargs.
Expand source code
def handle(base_kwargs, kwargs): """handles the merging of the base set of function keyword arguments and their possible overrides. `base_kwargs` is a map of the function's keyword arguments and their defaults. `kwargs` are the keyword arguments used when executing the function. the keys from `base_kwargs` are used to determine which keys to extract from the `kwargs` and any global settings. returns a triple of (`global_kwargs`, `user_kwargs`, `final_kwargs`) where `global_kwargs` is the subset of keyword arguments extracted from `state.env`, `user_kwargs` is the subset of keyword arguments extracted from the given kwargs and `final_kwargs` is the result of merging `base_kwargs` <- `global_kwargs` <- `user_kwargs` 'user' keyword arguments that are explicitly passed in take precedence over all others and 'global' keyword arguments take precedence over the function's defaults kwargs.""" key_list = base_kwargs.keys() global_kwargs = subdict(state.ENV, key_list) user_kwargs = subdict(kwargs, key_list) final_kwargs = merge(base_kwargs, global_kwargs, user_kwargs) return global_kwargs, user_kwargs, final_kwargs
def hide(what=None)
-
hides all output, regardless of
what
type of output is to be hidden.Expand source code
@contextlib.contextmanager def hide(what=None): "hides *all* output, regardless of `what` type of output is to be hidden." with state.settings(quiet=True): yield
def lcd(local_dir)
-
temporarily changes the local working directory
Expand source code
@contextlib.contextmanager def lcd(local_dir): "temporarily changes the local working directory" ensure(os.path.isdir(local_dir), "not a directory: %s" % local_dir) with state.settings(): current_dir = cwd() state.add_cleanup(lambda: os.chdir(current_dir)) os.chdir(local_dir) yield
def local(command, **kwargs)
-
preprocesses given
command
and options before executing it locally using Python'ssubprocess.Popen
Expand source code
def local(command, **kwargs): "preprocesses given `command` and options before executing it locally using Python's `subprocess.Popen`" base_kwargs = { "use_sudo": False, "use_shell": True, "combine_stderr": True, "capture": False, "timeout": None, "quiet": False, "display_running": True, "warn_only": False, # https://github.com/mathiasertl/fabric/blob/master/fabric/state.py#L301-L305 "abort_exception": RuntimeError, } global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs) if final_kwargs["capture"]: if final_kwargs["combine_stderr"]: out_stream = subprocess.PIPE err_stream = subprocess.STDOUT else: out_stream = subprocess.PIPE err_stream = subprocess.PIPE else: if final_kwargs["quiet"]: # we're not capturing and we've been told to be quiet # send everything to /dev/null out_stream = subprocess.DEVNULL err_stream = subprocess.DEVNULL else: out_stream = None err_stream = None if not final_kwargs["use_shell"] and not isinstance(command, list): raise ValueError("when shell=False, given command *must* be a list") if final_kwargs["use_shell"]: command = shell_wrap_command(command) if final_kwargs["use_sudo"]: if final_kwargs["use_shell"]: command = sudo_wrap_command(command) else: # lsh@2020-04: is this a good enough sudo command? # nothing uses local+noshell+sudo (at time of writing) command = ["sudo", "--non-interactive"] + command proc = subprocess.Popen( command, shell=final_kwargs["use_shell"], stdout=out_stream, stderr=err_stream ) _print_running(command, sys.stdout, **final_kwargs) if final_kwargs["timeout"]: timer = Timer(final_kwargs["timeout"], proc.kill) try: timer.start() # proximity matters stdout, stderr = proc.communicate() finally: timer.cancel() else: stdout, stderr = proc.communicate() # https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L1240-L1244 result = { "return_code": proc.returncode, "failed": proc.returncode != 0, "succeeded": proc.returncode == 0, "command": command, "stdout": (stdout or b"").decode("utf-8").splitlines(), "stderr": (stderr or b"").decode("utf-8").splitlines(), } if result["succeeded"]: return result err_msg = "local() encountered an error (return code %s) while executing %r" % ( result["return_code"], command, ) # if `warn_only` is True this function may still return a result return abort(result, err_msg, **final_kwargs)
def pem_key()
-
returns the first private key found in a list of common private keys. if none of the keys exist, the default (first) key will be returned.
Expand source code
def pem_key(): """returns the first private key found in a list of common private keys. if none of the keys exist, the default (first) key will be returned.""" id_list = ["id_rsa", "id_dsa", "identity", "id_ecdsa"] id_list = [os.path.expanduser("~/.ssh/" + idstr) for idstr in id_list] for id_path in id_list: if os.path.isfile(id_path): return id_path LOG.debug("key not found: %s" % id_path) default = id_list[0] return default
def prompt(msg)
-
issues a prompt for input. raises a
PromptedException
ifabort_on_prompts
instate.ENV
isTrue
or executing within another process usingexecute.parallel
where input can't be supplied. ifabort_exception
is set instate.ENV
, then that exception is raised insteadExpand source code
def prompt(msg): """issues a prompt for input. raises a `PromptedException` if `abort_on_prompts` in `state.ENV` is `True` or executing within another process using `execute.parallel` where input can't be supplied. if `abort_exception` is set in `state.ENV`, then that exception is raised instead""" if state.ENV.get("abort_on_prompts", False): abort_ex = state.ENV.get("abort_exception", PromptedException) raise abort_ex("prompted with: %s" % (msg,)) print(msg) try: return raw_input("> ") except NameError: return input("> ")
def rcd(remote_working_dir)
-
ensures all commands run are done from the given remote directory. if remote directory doesn't exist, command will not be run
Expand source code
@contextlib.contextmanager def rcd(remote_working_dir): "ensures all commands run are done from the given remote directory. if remote directory doesn't exist, command will not be run" with state.settings(remote_working_dir=remote_working_dir): yield
def remote(command, **kwargs)
-
preprocesses given
command
and options before sending it to_execute
to be executed on remote hostExpand source code
def remote(command, **kwargs): "preprocesses given `command` and options before sending it to `_execute` to be executed on remote host" # Fabric function signature for `run` # shell=True # done # pty=True # mutually exclusive with `combine_stderr` in pssh. not sure how Fabric/Paramiko is doing it # combine_stderr=None # mutually exclusive with use_pty. 'True' in global env. # quiet=False, # done # warn_only=False # done # stdout=None # done, stdout/stderr always available unless explicitly discarded. 'see discard_output' # stderr=None # done, stderr not available when combine_stderr is `True` # timeout=None # done # shell_escape=None # ignored. shell commands are always escaped # capture_buffer_size=None # correlates to `ssh2.channel.read` and the `size` parameter. Ignored. # parameters we're interested in and their default values base_kwargs = _ssh_default_settings() base_kwargs.update({"display_running": True, "discard_output": False}) global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs) # wrap the command up # https://github.com/mathiasertl/fabric/blob/master/fabric/operations.py#L920-L925 if final_kwargs["remote_working_dir"]: command = cwd_wrap_command(command, final_kwargs["remote_working_dir"]) if final_kwargs["use_shell"]: command = shell_wrap_command(command) if final_kwargs["use_sudo"]: command = sudo_wrap_command(command) # if use_pty is True, stdout and stderr are combined and stderr will yield nothing. # - https://parallel-ssh.readthedocs.io/en/latest/advanced.html#combined-stdout-stderr use_pty = final_kwargs["combine_stderr"] # values `remote` specifically passes to `_execute` execute_kwargs = {"command": command, "use_pty": use_pty} execute_kwargs = merge(final_kwargs, execute_kwargs) execute_kwargs = subdict( execute_kwargs, [ "command", "user", "key_filename", "host_string", "port", "use_pty", "timeout", ], ) # TODO: validate `_execute`s args. `host_string` can't be None for example # run command _print_running(command, sys.stdout, **final_kwargs) result = _execute(**execute_kwargs) # handle stdout/stderr streams output_kwargs = subdict(final_kwargs, ["quiet", "discard_output"]) stdout = _process_output(sys.stdout, result["stdout"], **output_kwargs) stderr = _process_output(sys.stderr, result["stderr"], **output_kwargs) # command must have finished before we have access to return code return_code = result["return_code"]() result.update( { "stdout": stdout, "stderr": stderr, "return_code": return_code, "failed": return_code > 0, "succeeded": return_code == 0, } ) if result["succeeded"]: return result err_msg = "remote() encountered an error (return code %s) while executing %r" % ( result["return_code"], command, ) # if `warn_only` is True this function may still return a result return abort(result, err_msg, **final_kwargs)
def remote_file_exists(path, **kwargs)
-
returns True if given path exists on remote system
Expand source code
def remote_file_exists(path, **kwargs): "returns True if given path exists on remote system" # note: Fabric is doing something weird and clever here: # - https://github.com/mathiasertl/fabric/blob/master/fabric/contrib/files.py#L474-L485 # but their examples don't work: # $ /bin/sh # sh-5.0$ foo="$(echo /usr/\*/share)" # sh-5.0$ echo $foo # /usr/*/share # sh-5.0$ exit # $ echo $SHELL # $ /bin/bash # $ foo="$(echo /usr/\*/share)" # $ echo $foo # /usr/*/share # TODO: revisit # update 2020/01: it does work, I just had no "/usr/[anything]/share" directories. # this works for me: # foo=$(echo /\*/share/) # echo $foo # /usr/share/ base_kwargs = { "use_sudo": False, } global_kwargs, user_kwargs, final_kwargs = handle(base_kwargs, kwargs) # do not raise an exception if remote file doesn't exist final_kwargs["warn_only"] = True remote_fn = remote_sudo if final_kwargs["use_sudo"] else remote command = "test -e %s" % path return remote_fn(command, **final_kwargs)["return_code"] == 0
def remote_sudo(command, **kwargs)
-
exactly the same as
remote()
, but the given command is run as the root userExpand source code
def remote_sudo(command, **kwargs): "exactly the same as `remote`, but the given command is run as the root user" # user=None # ignore # group=None # ignore kwargs["use_sudo"] = True return remote(command, **kwargs)
def rsync_download(remote_path, local_path, **kwargs)
-
copies
remote_path
tolocal_path
using values in the currentstate.ENV
.Expand source code
def rsync_download(remote_path, local_path, **kwargs): "copies `remote_path` to `local_path` using values in the current `state.ENV`." abs_local_path = os.path.abspath(os.path.expanduser(local_path)) abs_local_dir = os.path.dirname(abs_local_path) if not os.path.exists(abs_local_dir): # replicates behaviour of downloading via scp and sftp (via parallel-ssh) local("mkdir -p %r" % (abs_local_dir,)) return execute_rsync_command(_rsync_download(remote_path, local_path, **kwargs))
def rsync_upload(local_path, remote_path, **kwargs)
-
copies
local_path
toremote_path
using values in the currentstate.ENV
.Expand source code
def rsync_upload(local_path, remote_path, **kwargs): "copies `local_path` to `remote_path` using values in the current `state.ENV`." remote_dir = os.path.dirname(remote_path) if not remote_file_exists(remote_dir): remote("mkdir -p %r" % remote_dir) return execute_rsync_command(_rsync_upload(local_path, remote_path, **kwargs))
def single_command(cmd_list)
-
given a list of commands to run, returns a single command.
Expand source code
def single_command(cmd_list): "given a list of commands to run, returns a single command." # `remote` and `local` will do any escaping as necessary if cmd_list in [None, []]: return None return " && ".join(map(str, cmd_list))
def upload(local_path, remote_path, use_sudo=False, **kwargs)
-
uploads file at
local_path
to the givenremote_path
, overwriting anything that may be at that pathExpand source code
def upload(local_path, remote_path, use_sudo=False, **kwargs): "uploads file at `local_path` to the given `remote_path`, overwriting anything that may be at that path" # todo: this setting is dubious, don't count on it hanging around with state.settings(quiet=True): # bytes handling local_path, cleanup_fn = _write_bytes_to_temporary_file(local_path) if cleanup_fn: state.add_cleanup(cleanup_fn) if os.path.isdir(local_path): raise ValueError("folders cannot be uploaded") if use_sudo: return _upload_as_root_hack(local_path, remote_path, **kwargs) if not os.path.exists(local_path): raise EnvironmentError("local file does not exist: %s" % (local_path,)) client = _ssh_client(**kwargs) try: transfer_fn = _transfer_fn(client, "upload", **kwargs) transfer_fn(local_path, remote_path) except (pssh.exceptions.SFTPError, pssh.exceptions.SCPError) as exc: # permissions or network issues may cause these raise WrappedNetworkError(exc)
Classes
class NetworkError (*args, **kwargs)
-
generic 'died while doing something network-related' catch-all exception class.
Expand source code
class NetworkError(Exception): "generic 'died while doing something network-related' catch-all exception class." pass
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class SSHClient (host, user=None, password=None, port=None, pkey=None, alias=None, num_retries=3, retry_delay=5, allow_agent=True, timeout=None, forward_ssh_agent=False, proxy_host=None, proxy_port=None, proxy_pkey=None, proxy_user=None, proxy_password=None, keepalive_seconds=60, identity_auth=True, ipv6_only=False)
-
ssh2-python (libssh2) based non-blocking SSH client.
:param host: Host name or IP to connect to. :type host: str :param user: User to connect as. Defaults to logged in user. :type user: str :param password: Password to use for password authentication. :type password: str :param alias: Use an alias for this host. :type alias: str :param port: SSH port to connect to. Defaults to SSH default (22) :type port: int :param pkey: Private key file path to use for authentication. Path must be either absolute path or relative to user home directory like
~/<path>
. Bytes type input is used as private key data for authentication. :type pkey: str or bytes :param num_retries: (Optional) Number of connection and authentication attempts before the client gives up. Defaults to 3. :type num_retries: int :param retry_delay: Number of seconds to wait between retries. Defaults to :py:class:pssh.constants.RETRY_DELAY
:type retry_delay: int or float :param timeout: SSH session timeout setting in seconds. This controls timeout setting of authenticated SSH sessions. :type timeout: int or float :param allow_agent: (Optional) set to False to disable connecting to the system's SSH agent :type allow_agent: bool :param identity_auth: (Optional) set to False to disable attempting to authenticate with default identity files frompssh.clients.base.single.BaseSSHClient.IDENTITIES
:type identity_auth: bool :param forward_ssh_agent: Unused - agent forwarding not implemented. :type forward_ssh_agent: bool :param proxy_host: Connect to target host via given proxy host. :type proxy_host: str :param proxy_port: Port to use for proxy connection. Defaults to self.port :type proxy_port: int :param keepalive_seconds: Interval of keep alive messages being sent to server. Set to0
orFalse
to disable. :type keepalive_seconds: int :param ipv6_only: Choose IPv6 addresses only if multiple are available for the host or raise NoIPv6AddressFoundError otherwise. Note this will disable connecting to an IPv4 address if an IP address is provided instead. :type ipv6_only: bool:raises: :py:class:
pssh.exceptions.PKeyFileError
on errors finding provided private key.Expand source code
class SSHClient(PSSHClient): def __deepcopy__(self, memo): # do not copy.deepcopy ourselves or the pssh SSHClient object, just # return a reference to the object (self) # - https://docs.python.org/3/library/copy.html return self
Ancestors
- pssh.clients.native.single.SSHClient
- pssh.clients.base.single.BaseSSHClient
class WrappedNetworkError (exc)
-
groups several exceptions into a single WrappedNetworkError
Expand source code
class WrappedNetworkError(NetworkError): "groups several exceptions into a single WrappedNetworkError" def __init__(self, exc): self.wrapped = exc
Ancestors
- NetworkError
- builtins.Exception
- builtins.BaseException