from __future__ import print_function # Python2 compat
from functools import (wraps, update_wrapper, WRAPPER_UPDATES,
WRAPPER_ASSIGNMENTS)
from collections.abc import Mapping, Iterable
import inspect
import sys
import logging
logger = logging.getLogger(__name__)
[docs]class Try(object):
''' Try catch helper for cases when you want to ignore certain exceptions
Parameters
----------
*ignore_exceptions : Exception
Exception classes to be ignored. Default is all.
'''
def __init__(self, default_ignore=Exception, *other_ignore):
self.ignore = (default_ignore,) + other_ignore
def __enter__(self):
pass
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if exc_type is None:
return
if issubclass(exc_type, self.ignore):
return True
[docs]class ArgvContext:
''' Context to temporarily change the ``sys.argv`` variable
Parameters
----------
*args : str
Arguments to replace ``sys.argv``, starting with ``argv[0]``
'''
def __init__(self, *args):
self.args = args
def __enter__(self):
self.old_args = sys.argv
sys.argv = list(self.args)
def __exit__(self, *args):
sys.argv = self.old_args
[docs]def reloadModules(pattern='.*', skipPattern='^IPython'):
''' Reload modules that match pattern regular expression (string or re)
Parameters
----------
pattern : str or re.Pattern
The regular expression pattern of modules that will be reloaded.
skipPattern : str or re.Pattern
The regular expression pattern of modules that will not be reloaded.
'''
from types import ModuleType
import os
import re
pattern = re.compile(pattern)
skipPattern = re.compile(skipPattern)
modules = sys.modules.keys()
#In case something is loaded in the background, it will create a
#"dictionary changed size during iteration" error
for m in modules:
if isinstance(sys.modules[m], ModuleType) and \
m not in sys.builtin_module_names and \
'(built-in)' not in str(sys.modules[m]) and \
pattern.search(m) and \
not skipPattern.search(m):
with Try():
reload(sys.modules[m])
[docs]def is_string_like(obj):
""" Check whether obj behaves like a string.
Copied from numpy
Parameters
----------
obj : object
Object being tested for string like behavior
Returns
-------
bool
True if object behaves like a string. False otherwise.
"""
try:
obj + ''
except (TypeError, ValueError):
return False
return True
[docs]def get_file(fid, mode='rb'):
''' Helper function to take either a filename or fid
Parameters
----------
fid : str
File object or filename
mode : :class:`str`, optional
Optional, file mode to open file if filename supplied
Default is 'rb'
Returns
-------
:term:`file-like object`
The opened file object
'''
if is_string_like(fid):
fid = open(fid, mode)
return fid
[docs]def static(**kwargs):
''' Decorator for easily defining static variables
Parameters
----------
**kwargs
Arbitrary keyword arguments.
Example::
@static(count=0)
def test(a, b):
test.count += 1
print(a+b, test.count)
'''
# Note: don't need functools.wraps, since I'm returning the same func
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
[docs]def update_wrapper_class(wrapper, wrapped):
'''functools.update_wrapper for classes
Version of ``functools.update_wrapper`` that works when the wrapper is a
class
Parameters
----------
wrapper : :term:`class`
The class to be updated
wrapped : :term:`class`
The original function/class
Returns
-------
:term:`class`
A subclass of ``wrapper`` that has the updated attributes. If ``wrapped``
was a function, ``wrapper`` is still a class.
'''
__dict__ = dict(getattr(wrapper, '__dict__'))
# Init is a special case, that if set before __new__ is done, then the
# wrapped's init is called insted of OptionalArgumentDecorator
__dict__.update({k:v for (k,v) in \
getattr(wrapped, '__dict__', {}).items() if k != '__init__'})
__dict__['__wrapped__'] = wrapped
Wrapper = type("Wrapper", (wrapper,), __dict__)
if sys.version_info.major == 2:
update_wrapper(Wrapper, wrapped,
assigned = (x for x in WRAPPER_ASSIGNMENTS if x != '__doc__'),
updated = (x for x in WRAPPER_UPDATES if x != '__dict__'))
else:
update_wrapper(Wrapper, wrapped,
updated = (x for x in WRAPPER_UPDATES if x != '__dict__'))
return Wrapper
[docs]class OptionalArgumentDecorator(object):
''' Decorator for easily defining a decorator class that may take arguments
Write a decorator class as normal, that would always take arguments, and
make sure they all have default values. Then decorate that decorator with
this decorator and both ``@decorator`` and ``@decorator()`` notations will
work.
See Also
--------
BasicDecorator : Simple decorator with optional arguments
'''
def __new__(cls, *args, **kwargs):
WrappedClass = _meta_generate_class(cls, *args, **kwargs)
return super(OptionalArgumentDecorator, cls).__new__(
update_wrapper_class(cls, WrappedClass))
def __call__(self, *args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return self.__wrapped__()(args[0])
else:
return self.__wrapped__(*args, **kwargs)
[docs]class _BasicDecorator(object):
''' A basic decorator class that does not take arguments
Parameters
----------
fun : :term:`function`
The function that gets wrapped
Attributes
----------
fun : :term:`function`
The function that is being wrapped
Examples
--------
Usage::
class MyDecorAdd1(_BasicDecorator):
def __call__(self, *args, **kwargs):
result = self.fun(*args, **kwargs)
res
@MyDecorAdd1
def fun(a, b=2):
return a+b
'''
fun = None
def __new__(cls, *args, **kwargs):
WrappedClass = _meta_generate_class(cls, *args, **kwargs)
# In python3, the use of "_BasicDecorator" can be "__class__" instead, but
# must not be "cls", that won't work
return super(_BasicDecorator, cls).__new__(
update_wrapper_class(cls, WrappedClass))
def __init__(self, fun):
''' No need to rewrite this in the child class
'''
self.fun = fun
update_wrapper(self, fun)
[docs] def __call__(self, *args, **kwargs):
'''Re-write this. Do need to call ``super().__call__``
The general idea of this class is you re-write the ``__call__`` method to
do what you want, and call ``self.fun`` and return the result. This can
be accomplished by ``return super().__call__(*args, **kwargs)``, but more
often then not, you will want the result of ``self.fun``, and will call
``result = self.fun(*args, **kwargs)`` yourself, and then ``return result``
'''
#pre wrap code
result = self.fun(*args, **kwargs)
#postwrap code
return result
[docs]class _BasicArgumentDecorator(object):
''' A basic decorator class that takes arguments
It's best to define __init__ with a proper signature when inheriting
'''
[docs] def __call__(self, fun):
''' No need to rewrite this
Parameters
----------
fun : :term:`function`
The Function
'''
@wraps(fun)
def wrapped(*args, **kwargs):
return self.__inner_call__(*args, **kwargs)
self.fun = fun
return wrapped
[docs] def __inner_call__(self, *args, **kwargs):
'''re-write THIS. No need for super().__inner_call__
Parameters
----------
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
'''
#pre wrap code
result = self.fun(*args, **kwargs)
#postwrap code
return result
# Decorated methods do not show up in sphinx unless we use functools.wraps
@OptionalArgumentDecorator
class BasicDecorator(_BasicArgumentDecorator):
''' A basic decorator class that can optionally take arguments
It's best to define __init__ with a proper signature when inheriting
Define __inner_call__(self, *args, **kwargs) to add your wrapper magic
Usage: Define a new class that inherits from OptionalArgumentDecorator.
There is logic to support inheritance as long as the classes are
decorated by OptionalArgumentDecorator ONLY. Any additional decorators
will probably break the inheritance logic. If this is needed, than
inherit from _BasicArgumentDecorator instead and Add the
@OptionalArgumentDecorator decorator yourself, and don't inherit from
that.
Example::
class MyDecor(BasicDecorator):
def __init__(self, name='Default'):
self.name = name
def __inner_call__(self, first_arg, *args, **kwargs):
result = self.fun(first_arg, *args, **kwargs)
print(self.name, first_arg, result)
return result
@MyDecor
def test1(x, y):
return x+y
@MyDecor('not default')
def test2(x, y):
return x+y
test1(11,22)
test2(10,2)
'''
class WarningDecorator(BasicDecorator):
def __init__(self, message='Warning', output_stream=sys.stderr):
self.message = message
self.output_stream = output_stream
def __inner_call__(self, *args, **kwargs):
print(self.message, file=self.output_stream)
return self.fun(*args, **kwargs)
# Fix of https://gist.github.com/MacHu-GWU/0170849f693aa5f8d129aa03fc358305
def __is__method(cls, attribute, kind, exceptions):
value = getattr(cls, attribute)
if attribute in exceptions:
return True
if inspect.isclass(cls):
for cls in inspect.getmro(cls):
if inspect.isroutine(value): # No else, no builtin statics?
if attribute in cls.__dict__:
if isinstance(cls.__dict__[attribute], kind):
return True
else:
return False
else: # class instance
if attribute in cls.__dict__:
value = cls.__dict__[attribute]
elif attribute in type(cls).__dict__:
value = type(cls).__dict__[attribute]
else:
return False
if isinstance(value, kind):
return True
else:
return False
return False
[docs]def is_static_method(cls, attribute):
''' Returns whether the attribute refers to a staticmethod or not
Parameters
----------
cls : object
The class/instance being checked
attribute : str
The name of the function to be checked
Returns
-------
bool
True if the attribute is a static function, else false if anything
else
'''
return __is__method(cls, attribute, staticmethod,
# https://docs.python.org/3.7/reference/datamodel.html#object.__new__
('__new__',))
[docs]def is_class_method(cls, attribute):
return __is__method(cls, attribute, classmethod,
# https://docs.python.org/3/library/abc.html#abc.ABCMeta.__subclasshook__
('__subclasshook__',))
# REVIEW When we no longer support python2, this function should be updated to
# use the newer inspect.signature, Signature.bind() and
# BoundArguments.apply_defaults() instead of inspect.getfullargspec
[docs]def args_to_kwargs(function, args=tuple(), kwargs={}):
'''returns a single dict of all the args and kwargs
Should handle: functions, classes (their __init__), bound and unbound
versions of methods, class methods, and static methods. Furthermore, if a
class instance has a __call__ method, this is used.
It does not call the function.
Parameters
----------
function : :term:`function`
The Function
args : tuple
kwargs : dict
Returns
-------
dict
The returned dictionary has the keywords that would be received in a
real function call. Leftover args are put into the key ARGS, and
leftover KWARGS are placed in the key KWARGS. While everything
should behave exactly as python would, certain failure situations are
not reproduced, for exampled it does not raise exception if you
declare the same parameter in both /*/args and /**/kwargs)
On python3, ``args_to_kwargs_unbound`` must be used for unbound class
methods
Based on:
https://github.com/merriam/dectools/blob/master/dectools/dectools.py
'''
return args_to_kwargs_unbound(function, None, args, kwargs)
[docs]def args_to_kwargs_unbound(function, attribute=None, args=tuple(), kwargs={}):
# Clean up the inputs
pop_first = False
args=tuple(args)
if attribute:
parent = function
function = getattr(function, attribute)
else:
parent = None
if inspect.isclass(function):
function = function.__init__
args = (None,)+args #Dummy for self
pop_first = True
# Check for bound function or class function, both treated the same
elif inspect.ismethod(function):
if hasattr(function, '__self__'):
#if it has a __self__, then it is a class/normal method (not static)
args = (None,)+args #Dummy for self/cls
pop_first = True
# Check to see if it is a class instance with __call__. Only class
# instanaces can have bound methods in python3
elif hasattr(function, '__call__') and inspect.ismethod(function.__call__):
function = function.__call__
args = (None,)+args
pop_first = True
# This is how to handle unbound functions
elif parent:
if not is_static_method(parent, attribute):
# Must be class/normal method
args = (None,)+args
pop_first = True
############
# Parse args
############
kwonly_args_names = []
kwonly_defaults = None
annotations = {}
try:
args_names, extra_args_name, extra_kwargs_name, defaults, \
kwonly_args_names, kwonly_defaults, annotations = \
inspect.getfullargspec(function)
except:
args_names, extra_args_name, extra_kwargs_name, defaults = \
inspect.getargspec(function)
# assign basic args
params = dict(zip(args_names, args)) # zip stops at shorter sequence
if extra_args_name:
params[ARGS] = args[len(args_names):]
elif len(args_names) < len(args):
logger.warning("args_to_kwargs: Too many positional arguments specified")
# assign kwargs given
if extra_kwargs_name:
params[KWARGS] = {}
for keyword, value in kwargs.items():
if keyword in args_names + kwonly_args_names:
params[keyword] = value
else:
if extra_kwargs_name:
params[KWARGS][keyword] = value
else:
logger.warning("args_to_kwargs: Unspecified keyword argument '%s' "
"used", keyword)
params[keyword] = value
# assign defaults
if defaults:
for pos, value in enumerate(defaults):
if args_names[-len(defaults) + pos] not in params:
params[args_names[-len(defaults) + pos]] = value
# assign keyword only defaults
if kwonly_defaults:
for key, value in kwonly_defaults.items():
if key not in params:
params[key] = value
# check we did it correctly. Each param and only params are set
if set(params.keys()) != (set(args_names)|set(kwonly_args_names)|
set([KWARGS if extra_kwargs_name else None,
ARGS if extra_args_name else None]) -
set([None])):
#Remove None, since if *args/**kwargs isn't used, it will have the value None
#And that is not used
logger.warning("args_to_kwargs: Number of named arguments used does not "
"equal arguments parsed. This can mean you are "
"missing required arguments")
if pop_first:
params.pop(args_names[0])
return params
# def args_to_kwargs_easy(function, *args, **kwargs):
[docs]def args_to_kwargs_easy(*args, **kwargs):
'''
Parameters
----------
*args : tuple
Variable length argument list.
**kwargs : dict
Arbitrary keyword arguments.
Returns
-------
dict
'''
return args_to_kwargs(args[0], args[1:], kwargs)
# def args_to_kwargs_unbound_easy(function, attribute, *args, **kwargs):
[docs]def args_to_kwargs_unbound_easy(*args, **kwargs):
'''
Parameters
----------
*args : tuple
Variable length argument list.
**kwargs : dict
Arbitrary keyword arguments.
Returns
-------
dict
'''
return args_to_kwargs_unbound(args[0], args[1], args[2:], kwargs)
[docs]def command_list_to_string(cmd):
'''
Parameters
----------
cmd : list
The Command List
Returns
-------
str
The Command List as a string
'''
try:
from shlex import quote
except:
from pipes import quote
return ' '.join([quote(x) for x in cmd])
[docs]def nested_update(dict_, *args, **kwargs):
''' Updated a dictionary in a nested fashion
Parameters
----------
dict_ : dict
The dict to be updated
*args : tuple
Same arguments as dict.update
**kwargs : dict
Same arguments as dict.update
'''
# patch iterables
def patch_it(v):
# Handle Mappings
return type(v)(type(dict_)(item)
if isinstance(item, Mapping)
and not isinstance(item, type(dict_))
# Handle Iterables
else patch_it(item)
if isinstance(item, Iterable)
and not isinstance(item, str)
# Handle Everything else
else item
# Loop through v items
for item in v)
# Don't use dict comprehension (I forget why, readability?) or constructor
# here (infinite recursion)!
for key, value in dict(*args, **kwargs).items():
if isinstance(value, Mapping):
if key in dict_ and not isinstance(dict_[key], Mapping):
dict_[key] = type(dict_)()
dict_[key] = nested_update(dict_.get(key, type(dict_)()), value)
elif isinstance(value, Iterable) and not isinstance(value, str):
if key in dict_ and not isinstance(dict_[key], Mapping):
dict_[key] = type(value)()
dict_[key] = patch_it(value)
else:
dict_[key] = value
return dict_
[docs]def nested_in_dict(dict1, dict2):
''' Checks to see if dict1 is in dict2
Parameters
----------
dict1 : dict
Subset dictionary
dict2 : dict
Superset dictionary
'''
try:
items = dict1.iteritems()
except:
items = dict1.items()
for key, value1 in items:
if key not in dict2:
return False
if isinstance(value1, Mapping):
if not nested_in_dict(value1, dict2[key]):
return False
elif dict2[key] != value1:
return False
return True
[docs]def nested_patch(obj, condition, patch, _spare_key = None):
''' Patch strings in a nested python dict
Will patch values in mapping and iterable containers recursively. This
includes (but is not limited to) ``set``, ``list``, ``dict``, ``tuple``,
etc... Only iterates through values, not keys.
When the condition is met for a given key,value pair, then the patch function
is used to replace the value.
Parameters
----------
obj: :term:`mapping` or :term:`iterable` or object
The python object to be patched. Typically a dict, but can be a list,
etc... or even a normal object, but that kind of defeats the purpose
condition: :term:`function`
The condition function to decide if each value should be patched.
``condition`` takes two arguments, ``(key, value)``
patch: :term:`function`
Callable that should return a patched version of the value. ``patch``
takes two arguments, ``(key, value)``
Returns
-------
object
Returns a patched version of the object. This should not be though of as
a deep-copy of the original object, as unpatched values will still be the
same python objects, not copies.
Example
-------
::
patterns = ['_file', '_dir', '_path',
'_files', '_dirs', '_paths']
condition = lambda key, value: isinstance(key, str) and \\
any(key.endswith(pattern) for pattern in patterns)
def patch_value(value, volume_map):
for vol_from, vol_to in volume_map:
if isinstance(value, str) and value.startswith(vol_from):
return value.replace(vol_from, vol_to, 1)
return value
volume_map = [['/tmp', '/temp'],
['/tmp/home', '/nope'],
['/home', '/Home']]
patch = lambda key, value: patch_value(value, reversed(volume_map))
z = {'test': '/tmp',
'foo_file': '/tmp',
'foo': 15,
17: 'bar',
'foo_dir': ['/tmp', '/home'],
'foo_files': 15,
'stuff': {
'this_path': '/home',
'a': {
'b': {
'c': {
'e': [{'b_path': '/home'}, {'c_dir': '/tmp'}],
'd': {
'q_path': (('/home', '/opt'), ('/tmp', '/tmp/home/foo/bar')),
'q_orig': (('/home', '/opt'), ('/tmp', '/tmp/home/foo/bar')),
'a_path': ('/home', '/opt', '/tmp', '/tmp/home/foo/bar')
}
}
}
}
}
}
z2 = nested_patch(z, condition, patch)
'''
# Handle mapping
if isinstance(obj, Mapping):
# Muttable mappings could be in-place patched, but for symmetry and DRY,
# just return it like everything else, plus this would work on an immutable
# mapping should one ever be used?
return type(obj)((key,nested_patch(value, condition, patch, key)) \
for key,value in obj.items())
# Handle iterables
elif not isinstance(obj, str) and isinstance(obj, Iterable):
return type(obj)(nested_patch(val, condition, patch, _spare_key) \
for val in obj)
# Handle everything else
else:
if condition(_spare_key, obj):
try:
return patch(_spare_key, obj)
except BaseException as e:
logger.error(f'Exception occurred while patching f{_spare_key}')
raise e
return obj
[docs]def nested_patch_inplace(obj, condition, patch, _spare_key = None):
''' Destructive inplace version of :func:`vsi.tools.python.nested_patch`
'''
# Handle mapping
if isinstance(obj, Mapping):
for key, value in obj.items():
if isinstance(value, Mapping):
nested_patch_inplace(value, condition, patch) # Inplace
# This would work but adds extra recursions and tests
# else:
# obj[key] = nested_patch_inplace(value, condition, patch, key)
elif not isinstance(value, str) and isinstance(value, Iterable):
obj[key] = nested_patch_inplace(value, condition, patch, key)
elif condition(key, value):
obj[key] = patch(key, value)
return obj
# Handle iterable
elif not isinstance(obj, str) and isinstance(obj, Iterable):
return type(obj)(nested_patch_inplace(val, condition, patch, _spare_key) for val in obj)
# Handle everything else
else:
if condition(_spare_key, obj):
try:
return patch(_spare_key, obj)
except BaseException as e:
logger.error(f'Exception occurred while patching f{_spare_key}')
raise e
return obj
[docs]def unwrap_wraps(func):
'''
Unwraps a wrapped function
Finds the originally wrapped function, using the :func:`functools.wraps`
pattern of storing in ``__wrapped__``
'''
is_method = inspect.ismethod(func)
try:
while func.__wrapped__:
func = func.__wrapped__
# It only takes one to be a method
if inspect.ismethod(func):
is_method = True
except AttributeError:
pass
return (func, is_method)