Source code for kwarray.fast_rand

"""
Fast 32-bit random functions for numpy as of 2018. (More recent versions of
numpy may have these natively supported).
"""
import numpy as np


[docs] def uniform(low=0.0, high=1.0, size=None, dtype=np.float32, rng=np.random): """ Draws float32 samples from a uniform distribution. Samples are uniformly distributed over the half-open interval ``[low, high)`` (includes low, but excludes high). Args: low (float): Lower boundary of the output interval. All values generated will be greater than or equal to low. Defaults to 0. high (float): Upper boundary of the output interval. All values generated will be less than high. Default to 1. size (int | Tuple[int, ...] | None): Output shape. If the given shape is, e.g., ``(m, n, k)``, then ``m * n * k`` samples are drawn. If size is ``None`` (default), a single value is returned if ``low`` and ``high`` are both scalars. Otherwise, ``np.broadcast(low, high).size`` samples are drawn. dtype (type): either np.float32 or np.float64. Defaults to float32 rng (numpy.random.RandomState): underlying random state Returns: ndarray: uniformly distributed random numbers with chosen size and dtype Extended typing ``NDArray[Literal[size], Literal[dtype]]`` Benchmark: >>> from timerit import Timerit >>> import kwarray >>> size = (300, 300, 3) >>> for timer in Timerit(100, bestof=10, label='dtype=np.float32'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> ours = standard_normal(size, rng=rng, dtype=np.float32) >>> # Timed best=4.705 ms, mean=4.75 ± 0.085 ms for dtype=np.float32 >>> for timer in Timerit(100, bestof=10, label='dtype=np.float64'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> theirs = standard_normal(size, rng=rng, dtype=np.float64) >>> # Timed best=9.327 ms, mean=9.794 ± 0.4 ms for rng.np.float64 """ if dtype is np.float32: return uniform32(low, high, size, rng) elif dtype is np.float64: return rng.uniform(low, high, size) else: raise ValueError('dtype = {!r}'.format(dtype))
[docs] def standard_normal(size, mean=0, std=1, dtype=float, rng=np.random): """ Draw samples from a standard Normal distribution with a specified mean and standard deviation. Args: size (int | Tuple[int, ...]) : shape of the returned ndarray mean (float): mean of the normal distribution. defaults to 0 std (float): standard deviation of the normal distribution. defaults to 1. dtype (type): either np.float32 (default) or np.float64 rng (numpy.random.RandomState): underlying random state Returns: ndarray: normally distributed random numbers with chosen size and dtype Extended typing ``NDArray[Literal[size], Literal[dtype]]`` Benchmark: >>> from timerit import Timerit >>> import kwarray >>> size = (300, 300, 3) >>> for timer in Timerit(100, bestof=10, label='dtype=np.float32'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> ours = standard_normal(size, rng=rng, dtype=np.float32) >>> # Timed best=4.705 ms, mean=4.75 ± 0.085 ms for dtype=np.float32 >>> for timer in Timerit(100, bestof=10, label='dtype=np.float64'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> theirs = standard_normal(size, rng=rng, dtype=np.float64) >>> # Timed best=9.327 ms, mean=9.794 ± 0.4 ms for rng.np.float64 """ if dtype is np.float32: return standard_normal32(size, mean, std, rng) elif dtype is np.float64: return standard_normal64(size, mean, std, rng) else: raise ValueError('dtype = {!r}'.format(dtype))
[docs] def standard_normal32(size, mean=0, std=1, rng=np.random): """ Fast normally distributed random variables. Uses the Box–Muller transform [WikiBoxMuller]_. The difference between this function and :func:`numpy.random.standard_normal` is that we use float32 arrays in the backend instead of float64. Halving the amount of bits that need to be manipulated can significantly reduce the execution time, and 32-bit precision is often good enough. Args: size (int | Tuple[int, ...]) : shape of the returned ndarray mean (float, default=0): mean of the normal distribution std (float, default=1): standard deviation of the normal distribution rng (numpy.random.RandomState): underlying random state Returns: ndarray[Any, Float32]: normally distributed random numbers with chosen size. References: .. [WikiBoxMuller] https://en.wikipedia.org/wiki/Box%E2%80%93Muller_transform SeeAlso: * :func:`standard_normal` * :func:`standard_normal64` Example: >>> # xdoctest: +REQUIRES(module:scipy) >>> import scipy >>> import scipy.stats >>> pts = 1000 >>> # Our numbers are normally distributed with high probability >>> rng = np.random.RandomState(28041990) >>> ours_a = standard_normal32(pts, rng=rng) >>> ours_b = standard_normal32(pts, rng=rng) + 2 >>> ours = np.concatenate((ours_a, ours_b)) # numerical stability? >>> p = scipy.stats.normaltest(ours)[1] >>> print('Probability our data is non-normal is: {:.4g}'.format(p)) Probability our data is non-normal is: 1.573e-14 >>> rng = np.random.RandomState(28041990) >>> theirs_a = rng.standard_normal(pts) >>> theirs_b = rng.standard_normal(pts) + 2 >>> theirs = np.concatenate((theirs_a, theirs_b)) >>> p = scipy.stats.normaltest(theirs)[1] >>> print('Probability their data is non-normal is: {:.4g}'.format(p)) Probability their data is non-normal is: 3.272e-11 Example: >>> pts = 1000 >>> rng = np.random.RandomState(28041990) >>> ours = standard_normal32(pts, mean=10, std=3, rng=rng) >>> assert np.abs(ours.std() - 3.0) < 0.1 >>> assert np.abs(ours.mean() - 10.0) < 0.1 Example: >>> # Test an even and odd numbers of points >>> assert standard_normal32(3).shape == (3,) >>> assert standard_normal32(2).shape == (2,) >>> assert standard_normal32(1).shape == (1,) >>> assert standard_normal32(0).shape == (0,) >>> assert standard_normal32((3, 1)).shape == (3, 1) >>> assert standard_normal32((3, 0)).shape == (3, 0) """ # return rng.standard_normal(size) * std + mean # The integer and float dtype must have the same number of bits dtype = np.float32 int_dtype = np.uint32 MAX_INT = np.core.getlimits.iinfo(int_dtype).max # Preallocate output out = np.empty(size, dtype=dtype) total = out.size n = int(np.ceil(total / 2)) # Generate uniform-01 random numbers that we will transform u12 = rng.randint(1, MAX_INT - 1, size=n * 2, dtype=int_dtype).astype(dtype) np.divide(u12, dtype(MAX_INT), out=u12) u1 = u12[:n] u2 = u12[n:] # Box–Muller transform transforms two uniformly distributed values # into normally distributed random values. Because of the way it works # we generate two sets of uniform random values, and then flat_out = out.ravel() # The following logic is equivalent to: # R = std * np.sqrt(-2 * np.log(u1)) # Theta = 2 * np.pi * u2 # flat_out[0:n] = R * np.cos(Theta) + mean # flat_out[-n:] = R * np.sin(Theta) + mean np.log(u1, out=u1) np.multiply(-2 * (std ** 2), u1, out=u1) R = np.sqrt(u1, out=u1) Theta = np.multiply(2 * np.pi, u2, out=u2) out1 = flat_out[0:n] out2 = flat_out[-n:] np.cos(Theta, out=out1) np.sin(Theta, out=out2) np.multiply(R, out1, out=out1) np.multiply(R, out2, out=out2) if mean != 0: np.add(out, mean, out=out) return out
[docs] def standard_normal64(size, mean=0, std=1, rng=np.random): """ Simple wrapper around rng.standard_normal to make an API compatible with :func:`standard_normal32`. Args: size (int | Tuple[int, ...]) : shape of the returned ndarray mean (float): mean of the normal distribution. defaults to 0 std (float): standard deviation of the normal distribution. defaults to 1. rng (numpy.random.RandomState): underlying random state Returns: ndarray[Any, Float64]: normally distributed random numbers with chosen size. SeeAlso: * standard_normal * standard_normal32 Example: >>> pts = 1000 >>> rng = np.random.RandomState(28041994) >>> out = standard_normal64(pts, mean=10, std=3, rng=rng) >>> assert np.abs(out.std() - 3.0) < 0.1 >>> assert np.abs(out.mean() - 10.0) < 0.1 """ out = rng.standard_normal(size) if std != 1: np.multiply(out, std, out=out) if mean != 0: np.add(out, mean, out=out) return out
[docs] def uniform32(low=0.0, high=1.0, size=None, rng=np.random): """ Draws float32 samples from a uniform distribution. Samples are uniformly distributed over the half-open interval ``[low, high)`` (includes low, but excludes high). Args: low (float, default=0.0): Lower boundary of the output interval. All values generated will be greater than or equal to low. high (float, default=1.0): Upper boundary of the output interval. All values generated will be less than high. size (int | Tuple[int, ...] | None): Output shape. If the given shape is, e.g., ``(m, n, k)``, then ``m * n * k`` samples are drawn. If size is ``None`` (default), a single value is returned if ``low`` and ``high`` are both scalars. Otherwise, ``np.broadcast(low, high).size`` samples are drawn. Returns: ndarray[Any, Float32]: uniformly distributed random numbers with chosen size. Example: >>> rng = np.random.RandomState(0) >>> uniform32(low=0.0, high=1.0, size=None, rng=rng) 0.5488... >>> uniform32(low=0.0, high=1.0, size=2000, rng=rng).sum() 1004.94... >>> uniform32(low=-10, high=10.0, size=2000, rng=rng).sum() 202.44... Benchmark: >>> from timerit import Timerit >>> import kwarray >>> size = 512 * 512 >>> for timer in Timerit(100, bestof=10, label='theirs: dtype=np.float64'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> theirs = rng.uniform(size=size) >>> for timer in Timerit(100, bestof=10, label='theirs: dtype=np.float32'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> theirs = rng.rand(size).astype(np.float32) >>> for timer in Timerit(100, bestof=10, label='ours: dtype=np.float32'): >>> rng = kwarray.ensure_rng(0) >>> with timer: >>> ours = uniform32(size=size) """ if size is None: out = np.float32(rng.uniform(low, high)) else: total = size if isinstance(size, int) else np.prod(size) if total < 1764: # For smaller sizes simply casting to float32 is faster out = rng.uniform(low, high, size).astype(np.float32) else: # This is only faster for large sizes. # The cuttoff size seems to be (42 * 42). dtype = np.float32 int_dtype = np.uint32 MAX_INT = np.core.getlimits.iinfo(int_dtype).max # Generate uniform-01 random numbers that we will transform out = rng.randint(0, MAX_INT - 1, size=size, dtype=int_dtype).astype(dtype) if low == 0 and high == 1.0: np.divide(out, dtype(MAX_INT), out=out) else: extent = dtype(MAX_INT) / (high - low) np.divide(out, extent, out=out) np.add(out, low, out=out) return out