Source code for kwarray.util_random

"""
Handle and interchange between different random number generators (numpy,
python, torch, ...). Also defines useful random iterator functions and
:func:`ensure_rng`.


Random Number Generator Patterns
--------------------------------

If you need a seeded random number generator kwarray.ensure_rng is helpful with
that: :func:`kwarray.util_random.ensure_rng`

If the input is a number it returns a seeded random number generator. If it is
None is returns whatever the system level RNG is. If the input is an existing
RNG it returns it without changing it. It also has the ability to switch
between Python's random module RNG and numpys np.random RNG (it can translate
the internal state between the two).


When I write randomized functions / class, a coding pattern I like is to
have a default keyword argument ``rng=None``. Then kwarray.ensure_rng coerces
whatever the input is into a :func:`random.Random` or
:func:`numpy.random.RandomState` object.

.. code:: python

    def some_random_function(*args, rng=None):
        rng = kwarray.ensure_rng(rng)

Then if this random function calls any other random function, it passes the
coerced rng to all other subfunctions. This ensures that seeing the RNG at
the top level produces a completely determenistic process.

For a more involved example

.. code:: python

    import pandas as pd
    import numpy
    import kwarray

    def random_subfunc1(rng=None):
        rng = kwarray.ensure_rng(rng, api='python')
        value: float = rng.betavariate(3, 2.3)
        return value


    def random_subfunc2(rng=None):
        rng = kwarray.ensure_rng(rng, api='numpy')
        arr: np.ndarray = rng.choice([1, 2, 3, 4], size=3, replace=0)
        return arr


    def random_method(rng=None):
        value = random_subfunc1(rng=rng)
        arr = random_subfunc2(rng=rng)
        final = (arr * value).sum()
        return final

    def demo():
        results = []
        num = 10
        for _ in range(num):
            rng = np.random.RandomState(3)
            row = {}
            row['system'] = random_method(None)
            row['seeded'] = random_method(0)
            row['exiting'] = random_method(rng)
            results.append(row)

        df = pd.DataFrame(results)
        print(df)


This results in:

.. code::

         system    seeded   exiting
    0  3.642700  6.902354  4.869275
    1  3.127890  6.902354  4.869275
    2  4.317397  6.902354  4.869275
    3  3.382259  6.902354  4.869275
    4  1.999498  6.902354  4.869275
    5  5.293688  6.902354  4.869275
    6  2.984741  6.902354  4.869275
    7  6.455160  6.902354  4.869275
    8  5.161900  6.902354  4.869275
    9  2.810358  6.902354  4.869275
"""
import numpy as np
import random
import itertools as it

_SEED_MAX = int(2 ** 32 - 1)


__todo = """
Make a Coercable[RandomState] type that is

Coercable[numpy.random.RandomState] = Union[int, float, None, numpy.random.RandomState, random.Random]
Coercable[random.Random] = Union[int, float, None, numpy.random.RandomState, random.Random]



"""


[docs] def seed_global(seed, offset=0): """ Seeds the python, numpy, and torch global random states Args: seed (int): seed to use offset (int): if specified, uses a different seed for each global random state separated by this offset. Defaults to 0. """ random.seed((seed) % _SEED_MAX) np.random.seed((seed + offset) % _SEED_MAX) try: import torch except ImportError: pass else: torch.random.manual_seed((seed + 2 * offset) % _SEED_MAX) torch.cuda.manual_seed_all((seed + 3 * offset) % _SEED_MAX)
[docs] def shuffle(items, rng=None): """ Shuffles a list inplace and then returns it for convinience Args: items (list | ndarray): data to shuffle rng (int | float | None | numpy.random.RandomState | random.Random): seed or random number gen Returns: list: this is the input, but returned for convinience Example: >>> list1 = [1, 2, 3, 4, 5, 6] >>> list2 = shuffle(list(list1), rng=1) >>> assert list1 != list2 >>> result = str(list2) >>> print(result) [3, 2, 5, 1, 4, 6] """ rng = ensure_rng(rng) rng.shuffle(items) return items
[docs] def random_combinations(items, size, num=None, rng=None): """ Yields ``num`` combinations of length ``size`` from items in random order Args: items (List): pool of items to choose from size (int): Number of items in each combination num (int | None): Number of combinations to generate. If None, generate them all. rng (int | float | None | numpy.random.RandomState | random.Random): seed or random number generator. Defaults to the global state of the python random module. Yields: Tuple: a random combination of ``items`` of length ``size``. Example: >>> # xdoctest: +REQUIRES(module:scipy) >>> import ubelt as ub >>> items = list(range(10)) >>> size = 3 >>> num = 5 >>> rng = 0 >>> # xdoctest: +IGNORE_WANT >>> combos = list(random_combinations(items, size, num, rng)) >>> print('combos = {}'.format(ub.urepr(combos, nl=1))) combos = [ (0, 6, 9), (4, 7, 8), (4, 6, 7), (2, 3, 5), (1, 2, 4), ] Example: >>> # xdoctest: +REQUIRES(module:scipy) >>> import ubelt as ub >>> items = list(zip(range(10), range(10))) >>> # xdoctest: +IGNORE_WANT >>> combos = list(random_combinations(items, 3, num=5, rng=0)) >>> print('combos = {}'.format(ub.urepr(combos, nl=1))) combos = [ ((0, 0), (6, 6), (9, 9)), ((4, 4), (7, 7), (8, 8)), ((4, 4), (6, 6), (7, 7)), ((2, 2), (3, 3), (5, 5)), ((1, 1), (2, 2), (4, 4)), ] """ import sys rng = ensure_rng(rng, api='python') num_ = np.inf if num is None else num # Ensure we dont request more than is possible if sys.version_info[0:2] >= (3, 8): import math n_max = math.comb(len(items), size) else: try: import scipy.special n_max = int(scipy.special.comb(len(items), size)) except ImportError: # https://stackoverflow.com/questions/26560726/python-binomial-coefficient from math import factorial as fac a = len(items) b = size n_max = int(fac(a) // fac(b) // fac(a - b)) num_ = min(n_max, num_) if num is not None and num_ > n_max // 2: # If num is too big just generate all combinations and shuffle them combos = list(it.combinations(items, size)) rng.shuffle(combos) for combo in combos[:num]: yield combo else: # Otherwise yield randomly until we get something we havent seen items = list(items) combos = set() while len(combos) < num_: # combo = tuple(sorted(rng.choice(items, size, replace=False))) combo = tuple(sorted(rng.sample(items, size))) if combo not in combos: # TODO: store indices instead of combo values combos.add(combo) yield combo
[docs] def random_product(items, num=None, rng=None): """ Yields ``num`` items from the cartesian product of items in a random order. Args: items (List[Sequence]): items to get caresian product of packed in a list or tuple. (note this deviates from api of :func:`itertools.product`) num (int | None): maximum number of items to generate. If None generat them all rng (int | float | None | numpy.random.RandomState | random.Random): Seed or random number generator. Defaults to the global state of the python random module. Yields: Tuple: a random item in the cartesian product Example: >>> import ubelt as ub >>> items = [(1, 2, 3), (4, 5, 6, 7)] >>> rng = 0 >>> # xdoctest: +IGNORE_WANT >>> products = list(random_product(items, rng=0)) >>> print(ub.urepr(products, nl=0)) [(3, 4), (1, 7), (3, 6), (2, 7),... (1, 6), (2, 5), (2, 4)] >>> products = list(random_product(items, num=3, rng=0)) >>> print(ub.urepr(products, nl=0)) [(3, 4), (1, 7), (3, 6)] Example: >>> # xdoctest: +REQUIRES(--profile) >>> rng = ensure_rng(0) >>> items = [np.array([15, 14]), np.array([27, 26]), >>> np.array([21, 22]), np.array([32, 31])] >>> num = 2 >>> for _ in range(100): >>> list(random_product(items, num=num, rng=rng)) """ # NUMPY_RNG = True # toggle new speedup on try: if not isinstance(items, (list, tuple)): raise TypeError idx_cards = np.array([len(g) for g in items], dtype=np.uint32) except (TypeError, AttributeError): items = [list(g) for g in items] idx_cards = np.array([len(g) for g in items], dtype=np.uint32) ndims = len(items) # max_num = np.prod(idx_cards.astype(np.float)) max_num = np.multiply.reduce(idx_cards.astype(np.float32)) if num is None: num = max_num else: num = min(num, max_num) # if num > max_num: # raise ValueError('num exceedes maximum number of products') # TODO: make this more efficient when num is large if max_num > 100 and num > max_num // 2: rng = ensure_rng(rng, 'python') for prod in shuffle(list(it.product(*items)), rng=rng): yield prod else: if True: # NUMPY_RNG rng = ensure_rng(rng, 'numpy') # Need to use least-common-multiple so the mod of all idxs # are equally likely card_lcm = np.lcm.reduce(idx_cards) else: rng = ensure_rng(rng, 'python') seen = set() while len(seen) < num: if True: # NUMPY_RNG idxs = rng.randint(0, card_lcm, size=ndims, dtype=idx_cards.dtype) idxs %= idx_cards idxs = tuple(idxs.tolist()) else: idxs = tuple(rng.randint(0, n - 1) for n in idx_cards) if idxs not in seen: seen.add(idxs) prod = tuple(g[x] for g, x in zip(items, idxs)) yield prod
[docs] def _npstate_to_pystate(npstate): """ Convert state of a NumPy RandomState object to a state that can be used by Python's Random. Derived from [SO44313620]_. References: .. [SO44313620] https://stackoverflow.com/questions/44313620/convert-randomstate Example: >>> py_rng = random.Random(0) >>> np_rng = np.random.RandomState(seed=0) >>> npstate = np_rng.get_state() >>> pystate = _npstate_to_pystate(npstate) >>> py_rng.setstate(pystate) >>> assert np_rng.rand() == py_rng.random() """ PY_VERSION = 3 version, keys, pos, has_gauss, cached_gaussian_ = npstate keys_pos = tuple(map(int, keys)) + (int(pos),) cached_gaussian_ = cached_gaussian_ if has_gauss else None pystate = (PY_VERSION, keys_pos, cached_gaussian_) return pystate
[docs] def _pystate_to_npstate(pystate): """ Convert state of a Python Random object to state usable by NumPy RandomState. Derived from [SO44313620]_. References: .. [SO44313620] https://stackoverflow.com/questions/44313620/convert-randomstate Example: >>> py_rng = random.Random(0) >>> np_rng = np.random.RandomState(seed=0) >>> pystate = py_rng.getstate() >>> npstate = _pystate_to_npstate(pystate) >>> np_rng.set_state(npstate) >>> assert np_rng.rand() == py_rng.random() """ NP_VERSION = 'MT19937' version, keys_pos_, cached_gaussian_ = pystate keys, pos = keys_pos_[:-1], keys_pos_[-1] keys = np.array(keys, dtype=np.uint32) has_gauss = cached_gaussian_ is not None cached_gaussian = cached_gaussian_ if has_gauss else 0.0 npstate = (NP_VERSION, keys, pos, has_gauss, cached_gaussian) return npstate
[docs] def _coerce_rng_type(rng): """ Internal method that transforms input seeds into an integer form. """ if rng is None or isinstance(rng, (random.Random, np.random.RandomState)): pass elif rng is random: rng = rng._inst elif rng is np.random: rng = np.random.mtrand._rand # elif isinstance(rng, str): # # todo convert string to rng # pass elif isinstance(rng, (float, np.floating)): rng = float(rng) # Coerce the float into an integer a, b = rng.as_integer_ratio() if b == 1: rng = a else: s = max(a.bit_length(), b.bit_length()) rng = (b << s) | a elif isinstance(rng, (int, np.integer)): rng = int(rng) else: raise TypeError( 'Cannot coerce {!r} to a random object'.format(type(rng))) return rng
[docs] def ensure_rng(rng=None, api='numpy'): """ Coerces input into a random number generator. This function is useful for ensuring that your code uses a controlled internal random state that is independent of other modules. If the input is None, then a global random state is returned. If the input is a numeric value, then that is used as a seed to construct a random state. If the input is a random number generator, then another random number generator with the same state is returned. Depending on the api, this random state is either return as-is, or used to construct an equivalent random state with the requested api. Args: rng (int | float | None | numpy.random.RandomState | random.Random): if None, then defaults to the global rng. Otherwise this can be an integer or a RandomState class. Defaults to the global random. api (str): specify the type of random number generator to use. This can either be 'numpy' for a :class:`numpy.random.RandomState` object or 'python' for a :class:`random.Random` object. Defaults to numpy. Returns: (numpy.random.RandomState | random.Random) : rng - either a numpy or python random number generator, depending on the setting of ``api``. Example: >>> rng = ensure_rng(None) >>> ensure_rng(0).randint(0, 1000) 684 >>> ensure_rng(np.random.RandomState(1)).randint(0, 1000) 37 Example: >>> num = 4 >>> print('--- Python as PYTHON ---') >>> py_rng = random.Random(0) >>> pp_nums = [py_rng.random() for _ in range(num)] >>> print(pp_nums) >>> print('--- Numpy as PYTHON ---') >>> np_rng = ensure_rng(random.Random(0), api='numpy') >>> np_nums = [np_rng.rand() for _ in range(num)] >>> print(np_nums) >>> print('--- Numpy as NUMPY---') >>> np_rng = np.random.RandomState(seed=0) >>> nn_nums = [np_rng.rand() for _ in range(num)] >>> print(nn_nums) >>> print('--- Python as NUMPY---') >>> py_rng = ensure_rng(np.random.RandomState(seed=0), api='python') >>> pn_nums = [py_rng.random() for _ in range(num)] >>> print(pn_nums) >>> assert np_nums == pp_nums >>> assert pn_nums == nn_nums Example: >>> # Test that random modules can be coerced >>> import random >>> import numpy as np >>> ensure_rng(random, api='python') >>> ensure_rng(random, api='numpy') >>> ensure_rng(np.random, api='python') >>> ensure_rng(np.random, api='numpy') Ignore: >>> np.random.seed(0) >>> np.random.randint(0, 10000) 2732 >>> np.random.seed(0) >>> np.random.mtrand._rand.randint(0, 10000) 2732 >>> np.random.seed(0) >>> ensure_rng(None).randint(0, 10000) 2732 >>> np.random.randint(0, 10000) 9845 >>> ensure_rng(None).randint(0, 10000) 3264 """ rng = _coerce_rng_type(rng) if api == 'numpy': if rng is None: # This is the underlying random state of the np.random module rng = np.random.mtrand._rand # Dont do this because it seeds using dev/urandom # rng = np.random.RandomState(seed=None) elif isinstance(rng, int): rng = np.random.RandomState(seed=rng % _SEED_MAX) elif isinstance(rng, random.Random): # Convert python to numpy random state py_rng = rng pystate = py_rng.getstate() npstate = _pystate_to_npstate(pystate) rng = np_rng = np.random.RandomState(seed=0) np_rng.set_state(npstate) elif api == 'python': if rng is None: # This is the underlying random state of the random module rng = random._inst elif isinstance(rng, int): rng = random.Random(rng % _SEED_MAX) elif isinstance(rng, np.random.RandomState): # Convert numpy to python random state np_rng = rng npstate = np_rng.get_state() pystate = _npstate_to_pystate(npstate) rng = py_rng = random.Random(0) py_rng.setstate(pystate) else: raise KeyError('unknown rng api={}'.format(api)) return rng
if __name__ == '__main__': """ CommandLine: xdoctest -m kwarray.util_random """ import xdoctest xdoctest.doctest_module(__file__)