This repository has been archived on 2023-03-25. You can view files and clone it, but cannot push or open issues or pull requests.
mightyscape-1.1-deprecated/extensions/networkx/utils/decorators.py
2020-07-30 01:16:18 +02:00

466 lines
14 KiB
Python

import sys
from warnings import warn
from collections import defaultdict
from os.path import splitext
from contextlib import contextmanager
try:
from pathlib import Path
except ImportError:
# Use Path to indicate if pathlib exists (like numpy does)
Path = None
import networkx as nx
from decorator import decorator
from networkx.utils import is_string_like, create_random_state, \
create_py_random_state
__all__ = [
'not_implemented_for',
'open_file',
'nodes_or_number',
'preserve_random_state',
'random_state',
'np_random_state',
'py_random_state',
]
def not_implemented_for(*graph_types):
"""Decorator to mark algorithms as not implemented
Parameters
----------
graph_types : container of strings
Entries must be one of 'directed','undirected', 'multigraph', 'graph'.
Returns
-------
_require : function
The decorated function.
Raises
------
NetworkXNotImplemented
If any of the packages cannot be imported
Notes
-----
Multiple types are joined logically with "and".
For "or" use multiple @not_implemented_for() lines.
Examples
--------
Decorate functions like this::
@not_implemnted_for('directed')
def sp_function(G):
pass
@not_implemnted_for('directed','multigraph')
def sp_np_function(G):
pass
"""
@decorator
def _not_implemented_for(not_implement_for_func, *args, **kwargs):
graph = args[0]
terms = {'directed': graph.is_directed(),
'undirected': not graph.is_directed(),
'multigraph': graph.is_multigraph(),
'graph': not graph.is_multigraph()}
match = True
try:
for t in graph_types:
match = match and terms[t]
except KeyError:
raise KeyError('use one or more of ',
'directed, undirected, multigraph, graph')
if match:
msg = 'not implemented for %s type' % ' '.join(graph_types)
raise nx.NetworkXNotImplemented(msg)
else:
return not_implement_for_func(*args, **kwargs)
return _not_implemented_for
def _open_gz(path, mode):
import gzip
return gzip.open(path, mode=mode)
def _open_bz2(path, mode):
import bz2
return bz2.BZ2File(path, mode=mode)
# To handle new extensions, define a function accepting a `path` and `mode`.
# Then add the extension to _dispatch_dict.
_dispatch_dict = defaultdict(lambda: open)
_dispatch_dict['.gz'] = _open_gz
_dispatch_dict['.bz2'] = _open_bz2
_dispatch_dict['.gzip'] = _open_gz
def open_file(path_arg, mode='r'):
"""Decorator to ensure clean opening and closing of files.
Parameters
----------
path_arg : int
Location of the path argument in args. Even if the argument is a
named positional argument (with a default value), you must specify its
index as a positional argument.
mode : str
String for opening mode.
Returns
-------
_open_file : function
Function which cleanly executes the io.
Examples
--------
Decorate functions like this::
@open_file(0,'r')
def read_function(pathname):
pass
@open_file(1,'w')
def write_function(G,pathname):
pass
@open_file(1,'w')
def write_function(G, pathname='graph.dot')
pass
@open_file('path', 'w+')
def another_function(arg, **kwargs):
path = kwargs['path']
pass
"""
# Note that this decorator solves the problem when a path argument is
# specified as a string, but it does not handle the situation when the
# function wants to accept a default of None (and then handle it).
# Here is an example:
#
# @open_file('path')
# def some_function(arg1, arg2, path=None):
# if path is None:
# fobj = tempfile.NamedTemporaryFile(delete=False)
# close_fobj = True
# else:
# # `path` could have been a string or file object or something
# # similar. In any event, the decorator has given us a file object
# # and it will close it for us, if it should.
# fobj = path
# close_fobj = False
#
# try:
# fobj.write('blah')
# finally:
# if close_fobj:
# fobj.close()
#
# Normally, we'd want to use "with" to ensure that fobj gets closed.
# However, recall that the decorator will make `path` a file object for
# us, and using "with" would undesirably close that file object. Instead,
# you use a try block, as shown above. When we exit the function, fobj will
# be closed, if it should be, by the decorator.
@decorator
def _open_file(func_to_be_decorated, *args, **kwargs):
# Note that since we have used @decorator, *args, and **kwargs have
# already been resolved to match the function signature of func. This
# means default values have been propagated. For example, the function
# func(x, y, a=1, b=2, **kwargs) if called as func(0,1,b=5,c=10) would
# have args=(0,1,1,5) and kwargs={'c':10}.
# First we parse the arguments of the decorator. The path_arg could
# be an positional argument or a keyword argument. Even if it is
try:
# path_arg is a required positional argument
# This works precisely because we are using @decorator
path = args[path_arg]
except TypeError:
# path_arg is a keyword argument. It is "required" in the sense
# that it must exist, according to the decorator specification,
# It can exist in `kwargs` by a developer specified default value
# or it could have been explicitly set by the user.
try:
path = kwargs[path_arg]
except KeyError:
# Could not find the keyword. Thus, no default was specified
# in the function signature and the user did not provide it.
msg = 'Missing required keyword argument: {0}'
raise nx.NetworkXError(msg.format(path_arg))
else:
is_kwarg = True
except IndexError:
# A "required" argument was missing. This can only happen if
# the decorator of the function was incorrectly specified.
# So this probably is not a user error, but a developer error.
msg = "path_arg of open_file decorator is incorrect"
raise nx.NetworkXError(msg)
else:
is_kwarg = False
# Now we have the path_arg. There are two types of input to consider:
# 1) string representing a path that should be opened
# 2) an already opened file object
if is_string_like(path):
ext = splitext(path)[1]
fobj = _dispatch_dict[ext](path, mode=mode)
close_fobj = True
elif hasattr(path, 'read'):
# path is already a file-like object
fobj = path
close_fobj = False
elif Path is not None and isinstance(path, Path):
# path is a pathlib reference to a filename
fobj = _dispatch_dict[path.suffix](str(path), mode=mode)
close_fobj = True
else:
# could be None, in which case the algorithm will deal with it
fobj = path
close_fobj = False
# Insert file object into args or kwargs.
if is_kwarg:
new_args = args
kwargs[path_arg] = fobj
else:
# args is a tuple, so we must convert to list before modifying it.
new_args = list(args)
new_args[path_arg] = fobj
# Finally, we call the original function, making sure to close the fobj
try:
result = func_to_be_decorated(*new_args, **kwargs)
finally:
if close_fobj:
fobj.close()
return result
return _open_file
def nodes_or_number(which_args):
"""Decorator to allow number of nodes or container of nodes.
Parameters
----------
which_args : int or sequence of ints
Location of the node arguments in args. Even if the argument is a
named positional argument (with a default value), you must specify its
index as a positional argument.
If more than one node argument is allowed, can be a list of locations.
Returns
-------
_nodes_or_numbers : function
Function which replaces int args with ranges.
Examples
--------
Decorate functions like this::
@nodes_or_number(0)
def empty_graph(nodes):
pass
@nodes_or_number([0,1])
def grid_2d_graph(m1, m2, periodic=False):
pass
@nodes_or_number(1)
def full_rary_tree(r, n)
# r is a number. n can be a number of a list of nodes
pass
"""
@decorator
def _nodes_or_number(func_to_be_decorated, *args, **kw):
# form tuple of arg positions to be converted.
try:
iter_wa = iter(which_args)
except TypeError:
iter_wa = (which_args,)
# change each argument in turn
new_args = list(args)
for i in iter_wa:
n = args[i]
try:
nodes = list(range(n))
except TypeError:
nodes = tuple(n)
else:
if n < 0:
msg = "Negative number of nodes not valid: %i" % n
raise nx.NetworkXError(msg)
new_args[i] = (n, nodes)
return func_to_be_decorated(*new_args, **kw)
return _nodes_or_number
def preserve_random_state(func):
""" Decorator to preserve the numpy.random state during a function.
Parameters
----------
func : function
function around which to preserve the random state.
Returns
-------
wrapper : function
Function which wraps the input function by saving the state before
calling the function and restoring the function afterward.
Examples
--------
Decorate functions like this::
@preserve_random_state
def do_random_stuff(x, y):
return x + y * numpy.random.random()
Notes
-----
If numpy.random is not importable, the state is not saved or restored.
"""
try:
from numpy.random import get_state, seed, set_state
@contextmanager
def save_random_state():
state = get_state()
try:
yield
finally:
set_state(state)
def wrapper(*args, **kwargs):
with save_random_state():
seed(1234567890)
return func(*args, **kwargs)
wrapper.__name__ = func.__name__
return wrapper
except ImportError:
return func
def random_state(random_state_index):
"""Decorator to generate a numpy.random.RandomState instance.
Argument position `random_state_index` is processed by create_random_state.
The result is a numpy.random.RandomState instance.
Parameters
----------
random_state_index : int
Location of the random_state argument in args that is to be used to
generate the numpy.random.RandomState instance. Even if the argument is
a named positional argument (with a default value), you must specify
its index as a positional argument.
Returns
-------
_random_state : function
Function whose random_state keyword argument is a RandomState instance.
Examples
--------
Decorate functions like this::
@np_random_state(0)
def random_float(random_state=None):
return random_state.rand()
@np_random_state(1)
def random_array(dims, random_state=1):
return random_state.rand(*dims)
See Also
--------
py_random_state
"""
@decorator
def _random_state(func, *args, **kwargs):
# Parse the decorator arguments.
try:
random_state_arg = args[random_state_index]
except TypeError:
raise nx.NetworkXError("random_state_index must be an integer")
except IndexError:
raise nx.NetworkXError("random_state_index is incorrect")
# Create a numpy.random.RandomState instance
random_state = create_random_state(random_state_arg)
# args is a tuple, so we must convert to list before modifying it.
new_args = list(args)
new_args[random_state_index] = random_state
return func(*new_args, **kwargs)
return _random_state
np_random_state = random_state
def py_random_state(random_state_index):
"""Decorator to generate a random.Random instance (or equiv).
Argument position `random_state_index` processed by create_py_random_state.
The result is either a random.Random instance, or numpy.random.RandomState
instance with additional attributes to mimic basic methods of Random.
Parameters
----------
random_state_index : int
Location of the random_state argument in args that is to be used to
generate the numpy.random.RandomState instance. Even if the argument is
a named positional argument (with a default value), you must specify
its index as a positional argument.
Returns
-------
_random_state : function
Function whose random_state keyword argument is a RandomState instance.
Examples
--------
Decorate functions like this::
@py_random_state(0)
def random_float(random_state=None):
return random_state.rand()
@py_random_state(1)
def random_array(dims, random_state=1):
return random_state.rand(*dims)
See Also
--------
np_random_state
"""
@decorator
def _random_state(func, *args, **kwargs):
# Parse the decorator arguments.
try:
random_state_arg = args[random_state_index]
except TypeError:
raise nx.NetworkXError("random_state_index must be an integer")
except IndexError:
raise nx.NetworkXError("random_state_index is incorrect")
# Create a numpy.random.RandomState instance
random_state = create_py_random_state(random_state_arg)
# args is a tuple, so we must convert to list before modifying it.
new_args = list(args)
new_args[random_state_index] = random_state
return func(*new_args, **kwargs)
return _random_state