"""
Currently just defines "stats_dict", which is a nice way to gather multiple
numeric statistics (e.g. max, min, median, mode, arithmetic-mean,
geometric-mean, standard-deviation, etc...) about data in an array.
"""
import warnings
import collections
import numpy as np
import ubelt as ub
import sys
# Maybe name or alias or use in kwarray.describe?
[docs]
def stats_dict(inputs, axis=None, nan=False, sum=False, extreme=True,
n_extreme=False, median=False, shape=True, size=False,
quantile='auto'):
"""
Describe statistics about an input array
Args:
inputs (ArrayLike): set of values to get statistics of
axis (int): if ``inputs`` is ndarray then this specifies the axis
nan (bool): report number of nan items (TODO: rename to skipna)
sum (bool): report sum of values
extreme (bool): report min and max values
n_extreme (bool): report extreme value frequencies
median (bool): report median
size (bool): report array size
shape (bool): report array shape
quantile (str | bool | List[float]):
defaults to 'auto'. Can also be a list of quantiles to compute.
if truthy computes quantiles.
Returns:
collections.OrderedDict:
dictionary of common numpy statistics
(min, max, mean, std, nMin, nMax, shape)
SeeAlso:
:func:`scipy.stats.describe`
:func:`pandas.DataFrame.describe`
Example:
>>> # xdoctest: +IGNORE_WHITESPACE
>>> from kwarray.util_averages import * # NOQA
>>> axis = 0
>>> rng = np.random.RandomState(0)
>>> inputs = rng.rand(10, 2).astype(np.float32)
>>> stats = stats_dict(inputs, axis=axis, nan=False, median=True)
>>> import ubelt as ub # NOQA
>>> result = str(ub.urepr(stats, nl=1, precision=4, with_dtype=True))
>>> print(result)
{
'mean': np.array([[0.5206, 0.6425]], dtype=np.float32),
'std': np.array([[0.2854, 0.2517]], dtype=np.float32),
'min': np.array([[0.0202, 0.0871]], dtype=np.float32),
'max': np.array([[0.9637, 0.9256]], dtype=np.float32),
'q_0.25': np.array([0.4271, 0.5329], dtype=np.float64),
'q_0.50': np.array([0.5584, 0.6805], dtype=np.float64),
'q_0.75': np.array([0.7343, 0.8607], dtype=np.float64),
'med': np.array([0.5584, 0.6805], dtype=np.float32),
'shape': (10, 2),
}
Example:
>>> # xdoctest: +IGNORE_WHITESPACE
>>> from kwarray.util_averages import * # NOQA
>>> axis = 0
>>> rng = np.random.RandomState(0)
>>> inputs = rng.randint(0, 42, size=100).astype(np.float32)
>>> inputs[4] = np.nan
>>> stats = stats_dict(inputs, axis=axis, nan=True, quantile='auto')
>>> import ubelt as ub # NOQA
>>> result = str(ub.urepr(stats, nl=0, precision=1, strkeys=True))
>>> print(result)
Example:
>>> import kwarray
>>> import ubelt as ub
>>> rng = kwarray.ensure_rng(0)
>>> orig_inputs = rng.rand(1, 1, 2, 3)
>>> param_grid = ub.named_product({
>>> #'axis': (None, 0, (0, 1), -1),
>>> 'axis': [None],
>>> 'percent_nan': [0, 0.5, 1.0],
>>> 'nan': [True, False],
>>> 'sum': [1],
>>> 'extreme': [True],
>>> 'n_extreme': [True],
>>> 'median': [1],
>>> 'size': [1],
>>> 'shape': [1],
>>> 'quantile': ['auto'],
>>> })
>>> for params in param_grid:
>>> kwargs = params.copy()
>>> percent_nan = kwargs.pop('percent_nan', 0)
>>> if percent_nan:
>>> inputs = orig_inputs.copy()
>>> inputs[rng.rand(*inputs.shape) < percent_nan] = np.nan
>>> else:
>>> inputs = orig_inputs
>>> stats = kwarray.stats_dict(inputs, **kwargs)
>>> print('---')
>>> print('params = {}'.format(ub.urepr(params, nl=1)))
>>> print('stats = {}'.format(ub.urepr(stats, nl=1)))
Ignore:
import kwarray
inputs = np.random.rand(3, 2, 1)
stats = kwarray.stats_dict(inputs, axis=2, nan=True, quantile='auto')
"""
stats = collections.OrderedDict([])
# only use torch if its already in use
torch = sys.modules.get('torch', None)
# Ensure input is in numpy format
if isinstance(inputs, np.ndarray):
nparr = inputs
elif isinstance(inputs, list):
nparr = np.array(inputs)
elif torch is not None and isinstance(inputs, torch.Tensor):
nparr = inputs.data.cpu().numpy()
else:
nparr = np.array(list(inputs))
# Check to make sure stats are feasible
if len(nparr) == 0:
stats['empty_list'] = True
if size:
stats['size'] = 0
else:
keepdims = (axis is not None)
if nan:
# invalid_mask = np.isnan(nparr)
# valid_mask = ~invalid_mask
# valid_values = nparr[~valid_mask]
min_val = np.nanmin(nparr, axis=axis, keepdims=keepdims)
max_val = np.nanmax(nparr, axis=axis, keepdims=keepdims)
mean_ = np.nanmean(nparr, axis=axis, keepdims=keepdims)
std_ = np.nanstd(nparr, axis=axis, keepdims=keepdims)
else:
min_val = nparr.min(axis=axis, keepdims=keepdims)
max_val = nparr.max(axis=axis, keepdims=keepdims)
mean_ = nparr.mean(axis=axis, keepdims=keepdims)
std_ = nparr.std(axis=axis, keepdims=keepdims)
# Note:
# the first central moment is 0
# the first raw moment is the mean
# the second central moment is the variance
# the third central moment is the skweness * var ** 3
# the fourth central moment is the kurtosis * var ** 4
# the third standardized moment is the skweness
# the fourth standardized moment is the kurtosis
if quantile:
# if not ub.iterable(quantile):
if quantile is True:
quantile == 'auto'
if quantile == 'auto':
quantile = [0.25, 0.50, 0.75]
if True:
stats['mean'] = np.float32(mean_)
stats['std'] = np.float32(std_)
if extreme:
stats['min'] = np.float32(min_val)
stats['max'] = np.float32(max_val)
if quantile:
if quantile == 'auto':
quantile = [0.25, 0.50, 0.75]
quant_values = np.quantile(nparr, quantile, axis=axis)
quant_keys = ['q_{:0.2f}'.format(q) for q in quantile]
for k, v in zip(quant_keys, quant_values):
stats[k] = v
if n_extreme:
# number of entries with min/max val
nMin = np.sum(nparr == min_val, axis=axis, keepdims=keepdims)
nMax = np.sum(nparr == max_val, axis=axis, keepdims=keepdims)
nMin = nMin.astype(int)
nMax = nMax.astype(int)
if nMax.size == 1:
nMax = nMax.ravel()[0]
if nMin.size == 1:
nMin = nMin.ravel()[0]
stats['nMin'] = nMin
stats['nMax'] = nMax
if median:
stats['med'] = np.nanmedian(nparr, axis=axis)
if nan:
stats['num_nan'] = np.isnan(nparr).sum()
if sum:
sumfunc = np.nansum if nan else np.sum
stats['sum'] = sumfunc(nparr, axis=axis)
if size:
stats['size'] = nparr.size
if shape:
stats['shape'] = nparr.shape
return stats
[docs]
def _gmean(a, axis=0, dtype=None, clobber=False):
"""
Compute the geometric mean along the specified axis.
Modification of the scikit-learn method to be more memory efficient
Example
>>> rng = np.random.RandomState(0)
>>> C, H, W = 8, 32, 32
>>> axis = 0
>>> a = [rng.rand(C, H, W).astype(np.float16),
>>> rng.rand(C, H, W).astype(np.float16)]
"""
if isinstance(a, np.ndarray):
if clobber:
# NOTE: we reuse (a), we clobber the input array!
log_a = np.log(a, out=a)
else:
log_a = np.log(a)
else:
if dtype is None:
# if not an ndarray object attempt to convert it
log_a = np.log(np.array(a, dtype=dtype))
else:
# Must change the default dtype allowing array type
# Note: that this will use memory, but there isn't anything we can
# do here.
if isinstance(a, np.ma.MaskedArray):
a_ = np.ma.asarray(a, dtype=dtype)
else:
a_ = np.asarray(a, dtype=dtype)
# We can reuse `a_` because it was a temp var
log_a = np.log(a_, out=a_)
# attempt to reuse memory when computing mean
mem = log_a[axis]
mean_log_a = log_a.mean(axis=axis, out=mem)
# And reuse memory again when computing the final result
result = np.exp(mean_log_a, out=mean_log_a)
return result
[docs]
class NoSupportError(RuntimeError):
...
[docs]
class RunningStats(ub.NiceRepr):
"""
Track mean, std, min, and max values over time with constant memory.
Dynamically records per-element array statistics and can summarized them
per-element, across channels, or globally.
TODO:
- [ ] This may need a few API tweaks and good documentation
Example:
>>> import kwarray
>>> run = kwarray.RunningStats()
>>> ch1 = np.array([[0, 1], [3, 4]])
>>> ch2 = np.zeros((2, 2))
>>> img = np.dstack([ch1, ch2])
>>> run.update(np.dstack([ch1, ch2]))
>>> run.update(np.dstack([ch1 + 1, ch2]))
>>> run.update(np.dstack([ch1 + 2, ch2]))
>>> # No marginalization
>>> print('current-ave = ' + ub.urepr(run.summarize(axis=ub.NoParam), nl=2, precision=3))
>>> # Average over channels (keeps spatial dims separate)
>>> print('chann-ave(k=1) = ' + ub.urepr(run.summarize(axis=0), nl=2, precision=3))
>>> print('chann-ave(k=0) = ' + ub.urepr(run.summarize(axis=0, keepdims=0), nl=2, precision=3))
>>> # Average over spatial dims (keeps channels separate)
>>> print('spatial-ave(k=1) = ' + ub.urepr(run.summarize(axis=(1, 2)), nl=2, precision=3))
>>> print('spatial-ave(k=0) = ' + ub.urepr(run.summarize(axis=(1, 2), keepdims=0), nl=2, precision=3))
>>> # Average over all dims
>>> print('alldim-ave(k=1) = ' + ub.urepr(run.summarize(axis=None), nl=2, precision=3))
>>> print('alldim-ave(k=0) = ' + ub.urepr(run.summarize(axis=None, keepdims=0), nl=2, precision=3))
"""
def __init__(run, nan_policy='omit', check_weights=True, **kwargs):
"""
Args:
nan_policy (str): indicates how we will handle nan values
* if "omit" - set weights of nan items to zero.
* if "propogate" - propogate nans.
* if "raise" - then raise a ValueError if nans are given.
check_weights (bool):
if True, we check the weights for zeros (which can also
implicitly occur when data has nans). Disabling this check will
result in faster computation, but it is your responsibility to
ensure all data passed to update is valid.
"""
if len(kwargs):
if 'nan_behavior' in kwargs:
nan_behavior = kwargs.pop('nan_behavior', None)
if nan_behavior == 'ignore':
nan_behavior = 'omit'
nan_policy = nan_behavior
ub.schedule_deprecation(
'kwarray', 'nan_policy', 'use nan_policy instead',
deprecate='0.6.10', error='0.7.0', remove='0.8.0')
if len(kwargs):
raise ValueError(f'Unsupported arguments: {list(kwargs.keys())}')
run.raw_max = -np.inf
run.raw_min = np.inf
run.raw_total = 0
run.raw_squares = 0
run.n = 0
run.nan_policy = nan_policy
run.check_weights = check_weights
if run.nan_policy not in {'omit', 'propogate'}:
if run.nan_policy == 'omit':
if run.check_weights:
raise ValueError(
'To prevent foot-shooting, check weights must be '
'initialized to True when nan_policy is omit'
)
raise KeyError(run.nan_policy)
def __nice__(self):
return '{}'.format(self.shape)
@property
def shape(run):
try:
return run.raw_total.shape
except Exception:
return None
[docs]
def _update_from_other(run, other):
"""
Combine this runner with another one.
"""
run.raw_max = np.maximum(run.raw_max, other.raw_max)
run.raw_min = np.minimum(run.raw_min, other.raw_min)
run.raw_total = run.raw_total + other.raw_total
run.raw_squares = run.raw_squares + other.raw_squares
run.n = run.n + other.n
[docs]
def update_many(run, data, weights=1):
"""
Assumes first data axis represents multiple observations
Example:
>>> import kwarray
>>> rng = kwarray.ensure_rng(0)
>>> run = kwarray.RunningStats()
>>> data = rng.randn(1, 2, 3)
>>> run.update_many(data)
>>> print(run.current())
>>> data = rng.randn(2, 2, 3)
>>> run.update_many(data)
>>> print(run.current())
>>> data = rng.randn(3, 2, 3)
>>> run.update_many(data)
>>> print(run.current())
>>> run.update_many(1000)
>>> print(run.current())
>>> assert np.all(run.current()['n'] == 7)
Example:
>>> import kwarray
>>> rng = kwarray.ensure_rng(0)
>>> run = kwarray.RunningStats()
>>> data = rng.randn(1, 2, 3)
>>> run.update_many(data.ravel())
>>> print(run.current())
>>> data = rng.randn(2, 2, 3)
>>> run.update_many(data.ravel())
>>> print(run.current())
>>> data = rng.randn(3, 2, 3)
>>> run.update_many(data.ravel())
>>> print(run.current())
>>> run.update_many(1000)
>>> print(run.current())
>>> assert np.all(run.current()['n'] == 37)
"""
data = np.asarray(data)
if run.nan_policy == 'omit':
weights = weights * (~np.isnan(data)).astype(float)
elif run.nan_policy == 'propogate':
...
elif run.nan_policy == 'raise':
if np.any(np.isnan(data)):
raise ValueError('nan policy is raise')
else:
raise AssertionError('should not be here')
has_ignore_items = False
if ub.iterable(weights):
ignore_flags = (weights == 0)
has_ignore_items = np.any(ignore_flags)
if has_ignore_items:
data = data.copy()
# Replace the bad value with somehting sensible for each operation.
data[ignore_flags] = 0
# Multiply data by weights
w_data = data * weights
run.raw_total += w_data.sum(axis=0)
run.raw_squares += (w_data ** 2).sum(axis=0)
data[ignore_flags] = -np.inf
run.raw_max = np.maximum(run.raw_max, data.max(axis=0))
data[ignore_flags] = +np.inf
run.raw_min = np.minimum(run.raw_min, data.min(axis=0))
else:
w_data = data * weights
run.raw_total += w_data.sum(axis=0)
run.raw_squares += (w_data ** 2).sum(axis=0)
run.raw_max = np.maximum(run.raw_max, data.max(axis=0))
run.raw_min = np.minimum(run.raw_min, data.min(axis=0))
run.n += weights.sum(axis=0)
[docs]
def update(run, data, weights=1):
"""
Updates statistics across all data dimensions on a per-element basis
Example:
>>> import kwarray
>>> data = np.full((7, 5), fill_value=1.3)
>>> weights = np.ones((7, 5), dtype=np.float32)
>>> run = kwarray.RunningStats()
>>> run.update(data, weights=1)
>>> run.update(data, weights=weights)
>>> rng = np.random
>>> weights[rng.rand(*weights.shape) > 0.5] = 0
>>> run.update(data, weights=weights)
Example:
>>> import kwarray
>>> run = kwarray.RunningStats()
>>> data = np.array([[1, np.nan, np.nan], [0, np.nan, 1.]])
>>> run.update(data)
>>> print('current = {}'.format(ub.urepr(run.current(), nl=1)))
>>> print('summary(axis=None) = {}'.format(ub.urepr(run.summarize(), nl=1)))
>>> print('summary(axis=1) = {}'.format(ub.urepr(run.summarize(axis=1), nl=1)))
>>> print('summary(axis=0) = {}'.format(ub.urepr(run.summarize(axis=0), nl=1)))
>>> data = np.array([[2, 0, 1], [0, 1, np.nan]])
>>> run.update(data)
>>> data = np.array([[3, 1, 1], [0, 1, np.nan]])
>>> run.update(data)
>>> data = np.array([[4, 1, 1], [0, 1, 1.]])
>>> run.update(data)
>>> print('----')
>>> print('current = {}'.format(ub.urepr(run.current(), nl=1)))
>>> print('summary(axis=None) = {}'.format(ub.urepr(run.summarize(), nl=1)))
>>> print('summary(axis=1) = {}'.format(ub.urepr(run.summarize(axis=1), nl=1)))
>>> print('summary(axis=0) = {}'.format(ub.urepr(run.summarize(axis=0), nl=1)))
"""
if run.nan_policy == 'omit':
weights = weights * (~np.isnan(data)).astype(float)
elif run.nan_policy == 'propogate':
...
elif run.nan_policy == 'raise':
if np.any(np.isnan(data)):
raise ValueError('nan policy is raise')
else:
raise AssertionError('should not be here')
has_ignore_items = False
if ub.iterable(weights):
ignore_flags = (weights == 0)
has_ignore_items = np.any(ignore_flags)
if has_ignore_items:
data = data.copy()
# Replace the bad value with somehting sensible for each operation.
data[ignore_flags] = 0
# Multiply data by weights
w_data = data * weights
run.raw_total += w_data
run.raw_squares += w_data ** 2
data[ignore_flags] = -np.inf
run.raw_max = np.maximum(run.raw_max, data)
data[ignore_flags] = +np.inf
run.raw_min = np.minimum(run.raw_min, data)
else:
w_data = data * weights
run.raw_total += w_data
run.raw_squares += w_data ** 2
run.raw_max = np.maximum(run.raw_max, data)
run.raw_min = np.minimum(run.raw_min, data)
run.n += weights
[docs]
def _sumsq_std(run, total, squares, n):
"""
Sum of squares method to compute standard deviation
"""
numer = (n * squares - total ** 2)
# FIXME: this isn't exactly correct when we have fractional weights.
# Integer weights should be ok. I suppose it really is
# what "type" of weights they are (see numpy weighted quantile
# discussion)
denom = (n * (n - 1.0))
std = np.sqrt(numer / denom)
return std
[docs]
def summarize(run, axis=None, keepdims=True):
"""
Compute summary statistics across a one or more dimension
Args:
axis (int | List[int] | None | NoParamType):
axis or axes to summarize over,
if None, all axes are summarized.
if ub.NoParam, no axes are summarized the current result is
returned.
keepdims (bool, default=True):
if False removes the dimensions that are summarized over
Returns:
Dict: containing minimum, maximum, mean, std, etc..
Raises:
NoSupportError : if update was never called with valid data
Example:
>>> # Test to make sure summarize works across different shapes
>>> base = np.array([1, 1, 1, 1, 0, 0, 0, 1])
>>> run0 = RunningStats()
>>> for _ in range(3):
>>> run0.update(base.reshape(8, 1))
>>> run1 = RunningStats()
>>> for _ in range(3):
>>> run1.update(base.reshape(4, 2))
>>> run2 = RunningStats()
>>> for _ in range(3):
>>> run2.update(base.reshape(2, 2, 2))
>>> #
>>> # Summarizing over everything should be exactly the same
>>> s0N = run0.summarize(axis=None, keepdims=0)
>>> s1N = run1.summarize(axis=None, keepdims=0)
>>> s2N = run2.summarize(axis=None, keepdims=0)
>>> #assert ub.util_indexable.indexable_allclose(s0N, s1N, rel_tol=0.0, abs_tol=0.0)
>>> #assert ub.util_indexable.indexable_allclose(s1N, s2N, rel_tol=0.0, abs_tol=0.0)
>>> assert s0N['mean'] == 0.625
"""
with np.errstate(divide='ignore'):
with warnings.catch_warnings():
warnings.filterwarnings('ignore', 'invalid value encountered', category=RuntimeWarning)
if axis is ub.NoParam:
total = run.raw_total
squares = run.raw_squares
maxi = run.raw_max
mini = run.raw_min
n = run.n
info = ub.odict([
('n', n),
('max', maxi),
('min', mini),
('total', total),
('squares', squares),
('mean', total / n),
('std', run._sumsq_std(total, squares, n)),
])
return info
else:
if np.all(run.n <= 0):
raise NoSupportError('No statistics have been accumulated')
total = run.raw_total.sum(axis=axis, keepdims=keepdims)
squares = run.raw_squares.sum(axis=axis, keepdims=keepdims)
maxi = run.raw_max.max(axis=axis, keepdims=keepdims)
mini = run.raw_min.min(axis=axis, keepdims=keepdims)
if not hasattr(run.raw_total, 'shape'):
n = run.n
elif axis is None:
n = (run.n * np.ones_like(run.raw_total)).sum(axis=axis, keepdims=keepdims)
else:
n = (run.n * np.ones_like(run.raw_total)).sum(axis=axis, keepdims=keepdims)
info = ub.odict([
('n', n),
('max', maxi),
('min', mini),
('total', total),
('squares', squares),
('mean', total / n),
('std', run._sumsq_std(total, squares, n)),
])
return info
[docs]
def current(run):
"""
Returns current staticis on a per-element basis
(not summarized over any axis)
"""
info = run.summarize(axis=ub.NoParam)
return info
[docs]
def _combine_mean_stds(means, stds, nums=None, axis=None, keepdims=False,
bessel=True):
r"""
Args:
means (array): means[i] is the mean of the ith entry to combine
stds (array): stds[i] is the std of the ith entry to combine
nums (array | None):
nums[i] is the number of samples in the ith entry to combine.
if None, assumes sample sizes are infinite.
axis (int | Tuple[int] | None):
axis to combine the statistics over
keepdims (bool):
if True return arrays with the same number of dimensions they were
given in.
bessel (int):
Set to 1 to enables bessel correction to unbias the combined std
estimate. Only disable if you have the true population means, or
you think you know what you are doing.
References:
https://stats.stackexchange.com/questions/55999/is-it-possible-to-find-the-combined-standard-deviation
Sympy:
>>> # xdoctest: +REQUIRES(env:SHOW_SYMPY)
>>> # What about the case where we don't know population size of the
>>> # estimates. We could treat it as a fixed number, or perhaps take the
>>> # limit as n -> infinity.
>>> import sympy
>>> import sympy as sym
>>> from sympy import symbols, sqrt, limit, IndexedBase, summation
>>> from sympy import Indexed, Idx, symbols
>>> means = IndexedBase('m')
>>> stds = IndexedBase('s')
>>> nums = IndexedBase('n')
>>> i = symbols('i', cls=Idx)
>>> k = symbols('k', cls=Idx)
>>> #
>>> combo_mean = symbols('C')
>>> #
>>> bessel = 1
>>> total = summation(nums[i], (i, 1, k))
>>> combo_mean_expr = summation(nums[i] * means[i], (i, 1, k)) / total
>>> p1 = summation((nums[i] - bessel) * stds[i], (i, 1, k))
>>> p2 = summation(nums[i] * ((means[i] - combo_mean) ** 2), (i, 1, k))
>>> #
>>> combo_std_expr = sqrt((p1 + p2) / (total - bessel))
>>> print('------------------------------------')
>>> print('General Combined Mean / Std Formulas')
>>> print('C = combined mean')
>>> print('S = combined std')
>>> print('------------------------------------')
>>> print(ub.hzcat(['C = ', sym.pretty(combo_mean_expr, use_unicode=True, use_unicode_sqrt_char=True)]))
>>> print(ub.hzcat(['S = ', sym.pretty(combo_std_expr, use_unicode=True, use_unicode_sqrt_char=True)]))
>>> print('')
>>> print('---------')
>>> print('Now assuming all sample sizes are the same constant value N')
>>> print('---------')
>>> # Now assume all n[i] = N (i.e. a constant value)
>>> N = symbols('N')
>>> combo_mean_const_n_expr = combo_mean_expr.copy().xreplace({nums[i]: N})
>>> combo_std_const_n_expr = combo_std_expr.copy().xreplace({nums[i]: N})
>>> p1_const_n = p1.copy().xreplace({nums[i]: N})
>>> p2_const_n = p2.copy().xreplace({nums[i]: N})
>>> total_const_n = total.copy().xreplace({nums[i]: N})
>>> #
>>> print(ub.hzcat(['C = ', sym.pretty(combo_mean_const_n_expr, use_unicode=True, use_unicode_sqrt_char=True)]))
>>> print(ub.hzcat(['S = ', sym.pretty(combo_std_const_n_expr, use_unicode=True, use_unicode_sqrt_char=True)]))
>>> #
>>> print('')
>>> print('---------')
>>> print('Take the limit as N -> infinity')
>>> print('---------')
>>> #
>>> # Limit doesnt directly but we can break it into parts
>>> lim_C = limit(combo_mean_const_n_expr, N, float('inf'))
>>> lim_p1 = limit(p1_const_n / (total_const_n - bessel), N, float('inf'))
>>> lim_p2 = limit(p2_const_n / (total_const_n - bessel), N, float('inf'))
>>> lim_expr = sym.sqrt(lim_p1 + lim_p2)
>>> print(ub.hzcat(['lim(C, N->inf) = ', sym.pretty(lim_C)]))
>>> print(ub.hzcat(['lim(S, N->inf) = ', sym.pretty(lim_expr)]))
Ignore:
# lim0_p1 = limit(p1 / (total - 1), n, 0)
# lim0_p2 = limit(p2 / (total - 1), n, 0)
# lim0_expr = sym.sqrt(lim0_p1 + lim0_p2)
# print(sym.pretty(lim0_expr))
kcase = combo_std_expr.subs({k: 2})
print(sym.pretty(kcase))
print(sym.pretty(sympy.simplify(kcase)))
limit(kcase, N, float('inf'))
limit(combo_std_expr, N, float('inf'))
Example:
>>> from kwarray.util_averages import * # NOQA
>>> from kwarray.util_averages import _combine_mean_stds
>>> means = np.array([1.2, 3.2, 4.1])
>>> stds = np.array([4.2, 0.2, 2.1])
>>> nums = np.array([10, 100, 10])
>>> _combine_mean_stds(means, stds, nums)
>>> means = np.array([1, 2, 3])
>>> stds = np.array([1, 2, 3])
>>> #
>>> nums = np.array([1, 1, 1]) / 3
>>> print(_combine_mean_stds(means, stds, nums, bessel=True), '- .3 B')
>>> print(_combine_mean_stds(means, stds, nums, bessel=False), '- .3')
>>> nums = np.array([1, 1, 1])
>>> print(_combine_mean_stds(means, stds, nums, bessel=True), '- 1 B')
>>> print(_combine_mean_stds(means, stds, nums, bessel=False), '- 1')
>>> nums = np.array([10, 10, 10])
>>> print(_combine_mean_stds(means, stds, nums, bessel=True), '- 10 B')
>>> print(_combine_mean_stds(means, stds, nums, bessel=False), '- 10')
>>> nums = np.array([1000, 1000, 1000])
>>> print(_combine_mean_stds(means, stds, nums, bessel=True), '- 1000 B')
>>> print(_combine_mean_stds(means, stds, nums, bessel=False), '- 1000')
>>> #
>>> nums = None
>>> print(_combine_mean_stds(means, stds, nums, bessel=True), '- inf B')
>>> print(_combine_mean_stds(means, stds, nums, bessel=False), '- inf')
Example:
>>> from kwarray.util_averages import * # NOQA
>>> from kwarray.util_averages import _combine_mean_stds
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 100, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 100, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 100, axis=0)
>>> cm1, cs1, _ = _combine_mean_stds(means, stds, nums, axis=None)
>>> print('combo_mean = {}'.format(ub.urepr(cm1, nl=1)))
>>> print('combo_std = {}'.format(ub.urepr(cs1, nl=1)))
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 1, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 1, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 1, axis=0)
>>> cm2, cs2, _ = _combine_mean_stds(means, stds, nums, axis=None)
>>> print('combo_mean = {}'.format(ub.urepr(cm2, nl=1)))
>>> print('combo_std = {}'.format(ub.urepr(cs2, nl=1)))
>>> means = np.stack([np.array([1.2, 3.2, 4.1])] * 5, axis=0)
>>> stds = np.stack([np.array([4.2, 0.2, 2.1])] * 5, axis=0)
>>> nums = np.stack([np.array([10, 100, 10])] * 5, axis=0)
>>> cm3, cs3, combo_num = _combine_mean_stds(means, stds, nums, axis=1)
>>> print('combo_mean = {}'.format(ub.urepr(cm3, nl=1)))
>>> print('combo_std = {}'.format(ub.urepr(cs3, nl=1)))
>>> assert np.allclose(cm1, cm2) and np.allclose(cm2, cm3)
>>> assert not np.allclose(cs1, cs2)
>>> assert np.allclose(cs2, cs3)
Example:
>>> from kwarray.util_averages import * # NOQA
>>> from kwarray.util_averages import _combine_mean_stds
>>> means = np.random.rand(2, 3, 5, 7)
>>> stds = np.random.rand(2, 3, 5, 7)
>>> nums = (np.random.rand(2, 3, 5, 7) * 10) + 1
>>> cm, cs, cn = _combine_mean_stds(means, stds, nums, axis=1, keepdims=1)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = _combine_mean_stds(means, stds, nums, axis=(0, 2), keepdims=1)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = _combine_mean_stds(means, stds, nums, axis=(1, 3), keepdims=1)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
>>> cm, cs, cn = _combine_mean_stds(means, stds, nums, axis=None)
>>> assert cm.shape == cs.shape == cn.shape
>>> print(f'cm.shape={cm.shape}')
cm.shape=(2, 1, 5, 7)
cm.shape=(1, 3, 1, 7)
cm.shape=(2, 1, 5, 1)
cm.shape=()
"""
if nums is None:
# Assume the limit as nums -> infinite
combo_num = None
combo_mean = np.average(means, weights=None, axis=axis)
combo_mean = _postprocess_keepdims(means, combo_mean, axis)
numer_p1 = stds.sum(axis=axis, keepdims=1)
numer_p2 = (((means - combo_mean) ** 2)).sum(axis=axis, keepdims=1)
numer = numer_p1 + numer_p2
denom = len(stds)
combo_std = np.sqrt(numer / denom)
else:
combo_num = nums.sum(axis=axis, keepdims=1)
weights = nums / combo_num
combo_mean = np.average(means, weights=weights, axis=axis)
combo_mean = _postprocess_keepdims(means, combo_mean, axis)
numer_p1 = (np.maximum(nums - bessel, 0) * stds).sum(axis=axis, keepdims=1)
numer_p2 = (nums * ((means - combo_mean) ** 2)).sum(axis=axis, keepdims=1)
numer = numer_p1 + numer_p2
denom = np.maximum(combo_num - bessel, 0)
combo_std = np.sqrt(numer / denom)
if not keepdims:
indexer = _no_keepdim_indexer(combo_mean, axis)
combo_mean = combo_mean[indexer]
combo_std = combo_std[indexer]
if combo_num is not None:
combo_num = combo_num[indexer]
return combo_mean, combo_std, combo_num
[docs]
def _no_keepdim_indexer(result, axis):
"""
Computes an indexer to postprocess a result with keepdims=True
that will modify the result as if keepdims=False
"""
if axis is None:
indexer = [0] * len(result.shape)
else:
indexer = [slice(None)] * len(result.shape)
if isinstance(axis, (list, tuple)):
for a in axis:
indexer[a] = 0
else:
indexer[axis] = 0
indexer = tuple(indexer)
return indexer
[docs]
def _postprocess_keepdims(original, result, axis):
"""
Can update the result of a function that does not support keepdims to look
as if keepdims was supported.
"""
# Newer versions of numpy have keepdims on more functions
if axis is not None:
expander = [slice(None)] * len(original.shape)
if isinstance(axis, (list, tuple)):
for a in axis:
expander[a] = None
else:
expander[axis] = None
result = result[tuple(expander)]
else:
expander = [None] * len(original.shape)
result = np.array(result)[tuple(expander)]
return result
if __name__ == '__main__':
"""
CommandLine:
python ~/code/kwarray/kwarray/util_averages.py
"""
import xdoctest
xdoctest.doctest_module(__file__)