Source code for kwarray.util_robust

"""
Functions relating to robust statistical methods for normalizing data.
"""
import numpy as np
import ubelt as ub
import math  # NOQA

try:
    from math import isclose
except Exception:
    from numpy import isclose


[docs] def find_robust_normalizers(data, params='auto'): """ Finds robust normalization statistics a set of scalar observations. The idea is to estimate "fense" parameters: minimum and maximum values where anything under / above these values are likely outliers. For non-linear normalizaiton schemes we can also estimate an likely middle and extent of the data. Args: data (ndarray): a 1D numpy array where invalid data has already been removed params (str | dict): normalization params. When passed as a dictionary valid params are: scaling (str): This is the "mode" that will be used in the final normalization. Currently has no impact on the Defaults to 'linear'. Can also be 'sigmoid'. extrema (str): The method for determening what the extrama are. Can be "quantile" for strict quantile clipping Can be "adaptive-quantile" for an IQR-like adjusted quantile method. Can be "tukey" or "IQR" for an exact IQR method. low (float): This is the low quantile for likely inliers. mid (float): This is the middle quantlie for likely inliers. high (float): This is the high quantile for likely inliers. Can be specified as a concise string. The string "auto" defaults to: ``dict(extrema='adaptive-quantile', scaling='linear', low=0.01, mid=0.5, high=0.9)``. The string "tukey" defaults to: ``dict(extrema='tukey', scaling='linear')``. Returns: Dict[str, str | float]: normalization parameters that can be passed to :func:`kwarray.normalize` containing the keys: type (str): which is always 'normalize' mode (str): the value of ``params['scaling']`` min_val (float): the determined "robust" minimum inlier value. max_val (float): the determined "robust" maximum inlier value. beta (float): the determined "robust" middle value for use in non-linear normalizers. alpha (float): the determined "robust" extent value for use in non-linear normalizers. Note: The defaults and methods of this function are subject to change. TODO: - [ ] No (or minimal) Magic Numbers! Use first principles to deterimine defaults. - [ ] Probably a lot of literature on the subject. - [ ] https://arxiv.org/pdf/1707.09752.pdf - [ ] https://www.tandfonline.com/doi/full/10.1080/02664763.2019.1671961 - [ ] https://www.rips-irsp.com/articles/10.5334/irsp.289/ - [ ] This function is not possible to get right in every case (probably can prove this with a NFL theroem), might be useful to allow the user to specify a "model" which is specific to some domain. Example: >>> from kwarray.util_robust import * # NOQA >>> data = np.random.rand(100) >>> norm_params1 = find_robust_normalizers(data, params='auto') >>> norm_params2 = find_robust_normalizers(data, params={'low': 0, 'high': 1.0}) >>> norm_params3 = find_robust_normalizers(np.empty(0), params='auto') >>> print('norm_params1 = {}'.format(ub.urepr(norm_params1, nl=1))) >>> print('norm_params2 = {}'.format(ub.urepr(norm_params2, nl=1))) >>> print('norm_params3 = {}'.format(ub.urepr(norm_params3, nl=1))) Example: >>> # xdoctest: +REQUIRES(module:scipy) >>> from kwarray.util_robust import * # NOQA >>> from kwarray.distributions import Mixture >>> import ubelt as ub >>> # A random mixture distribution for testing >>> data = Mixture.random(6).sample(3000) """ if data.size == 0: normalizer = { 'type': None, 'min_val': np.nan, 'max_val': np.nan, } else: # should center the desired distribution to visualize on zero # beta = np.median(imdata) default_params = { 'extrema': 'adaptive-quantile', 'scaling': 'linear', 'low': 0.01, 'mid': 0.5, 'high': 0.9, } fense_extremes = None if isinstance(params, str): if params == 'auto': params = {} elif params == 'sigmoid': params = {'scaling': 'sigmoid'} elif params == 'linear': params = {'scaling': 'linear'} elif params in {'tukey', 'iqr'}: params = { 'extrema': 'tukey', } elif params == 'std': pass else: raise KeyError(params) # hack params = ub.dict_union(default_params, params) # TODO: # https://github.com/derekbeaton/OuRS # https://en.wikipedia.org/wiki/Feature_scaling if params['extrema'] in {'tukey', 'iqr'}: fense_extremes = _tukey_quantile_fence(data, clip=False) elif params['extrema'] in {'tukey-clip', 'iqr-clip'}: fense_extremes = _tukey_quantile_fence(data, clip=True) elif params['extrema'] == 'quantile': fense_extremes = _quantile_extreme_estimator(data, params) elif params['extrema'] in {'custom-quantile', 'adaptive-quantile'}: fense_extremes = _custom_quantile_extreme_estimator(data, params) else: raise KeyError(params['extrema']) min_val, mid_val, max_val = fense_extremes beta = mid_val # division factor # from scipy.special import logit # alpha = max(abs(old_min - beta), abs(old_max - beta)) / logit(0.998) # This chooses alpha such the original min/max value will be pushed # towards -1 / +1. logit_998 = 6.212606095751518 # value of logit(0.998) alpha = max(abs(min_val - beta), abs(max_val - beta)) / logit_998 normalizer = { 'type': 'normalize', 'mode': params['scaling'], 'min_val': min_val, 'max_val': max_val, 'mid_val': mid_val, 'beta': beta, 'alpha': alpha, } return normalizer
[docs] def _tukey_quantile_fence(data, clip=False): """ One might wonder where the 1.5 in the above interval comes from -- Paul Velleman, a statistician at Cornell University, was a student of John Tukey, who invented this test for outliers. He wondered the same thing. When he asked Tukey, "Why 1.5?", Tukey answered, "Because 1 is too small and 2 is too large." [OxfordShapeSpread]_. References: .. [OxfordShapeSpread] http://mathcenter.oxford.emory.edu/site/math117/shapeCenterAndSpread/ .. [YTFindOutliers] https://www.youtube.com/watch?v=zY1WFMAA-ec """ # Tukey method for outliers q1, q2, q3 = np.quantile(data, [0.25, 0.5, 0.75]) iqr = q3 - q1 fence_lower = q1 - 1.5 * iqr fence_upper = q1 + 1.5 * iqr if clip: # ensure fences are clipped to max/min values. min_ = data.min() max_ = data.max() fence_lower = max(fence_lower, min_) fence_upper = min(fence_upper, max_) return fence_lower, q2, fence_upper
[docs] def _quantile_extreme_estimator(data, params): # Simple quantile quant_low = params['low'] quant_mid = params['mid'] quant_high = params['high'] qvals = [quant_low, quant_mid, quant_high] quantile_vals = np.quantile(data, qvals) (quant_low_val, quant_mid_val, quant_high_val) = quantile_vals min_val = quant_low_val max_val = quant_high_val mid_val = quant_mid_val return (min_val, mid_val, max_val)
[docs] def _custom_quantile_extreme_estimator(data, params): # Quantile with postprocessing quant_low = params['low'] quant_mid = params['mid'] quant_high = params['high'] qvals = [0, quant_low, quant_mid, quant_high, 1] quantile_vals = np.quantile(data, qvals) (quant_low_abs, quant_low_val, quant_mid_val, quant_high_val, quant_high_abs) = quantile_vals # TODO: we could implement a hueristic where we do a numerical inspection # of the intensity distribution. We could apply a normalization that is # known to work for data with that sort of histogram distribution. # This might involve fitting several parametarized distributions to the # data and choosing the one with the best fit. (check how many modes there # are). # inner_range = quant_high_val - quant_low_val # upper_inner_range = quant_high_val - quant_mid_val # upper_lower_range = quant_mid_val - quant_low_val # Compute amount of weight in each quantile quant_mid_amount = (quant_high_val - quant_low_val) quant_low_amount = (quant_mid_val - quant_low_val) quant_high_amount = (quant_high_val - quant_mid_val) if isclose(quant_mid_amount, 0): high_weight = 0.5 low_weight = 0.5 else: high_weight = quant_high_amount / quant_mid_amount low_weight = quant_low_amount / quant_mid_amount quant_high_residual = (1.0 - quant_high) quant_low_residual = (quant_low - 0.0) # todo: verify, having slight head fog, not 100% sure low_pad_val = quant_low_residual * (low_weight * quant_mid_amount) high_pad_val = quant_high_residual * (high_weight * quant_mid_amount) min_val = max(quant_low_abs, quant_low_val - low_pad_val) max_val = max(quant_high_abs, quant_high_val - high_pad_val) mid_val = quant_mid_val return (min_val, mid_val, max_val)
[docs] def robust_normalize(imdata, return_info=False, nodata=None, axis=None, dtype=np.float32, params='auto', mask=None): """ Normalize data intensities using heuristics to help put sensor data with extremely high or low contrast into a visible range. This function is designed with an emphasis on getting something that is reasonable for visualization. TODO: - [x] Move to kwarray and renamed to robust_normalize? - [ ] Support for M-estimators? Args: imdata (ndarray): raw intensity data return_info (bool): if True, return information about the chosen normalization heuristic. params (str | dict): Can contain keys, low, high, or mid, scaling, extrema e.g. {'low': 0.1, 'mid': 0.8, 'high': 0.9, 'scaling': 'sigmoid'} See documentation in :func:`find_robust_normalizers`. axis (None | int): The axis to normalize over, if unspecified, normalize jointly nodata (None | int): A value representing nodata to leave unchanged during normalization, for example 0 dtype (type) : can be float32 or float64 mask (ndarray | None): A mask indicating what pixels are valid and what pixels should be considered nodata. Mutually exclusive with ``nodata`` argument. A mask value of 1 indicates a VALID pixel. A mask value of 0 indicates an INVALID pixel. Note this is the opposite of a masked array. Returns: ndarray | Tuple[ndarray, Any]: a floating point array with values between 0 and 1. if return_info is specified, also returns extra data Note: This is effectively a combination of :func:`find_robust_normalizers` and :func:`normalize`. Example: >>> # xdoctest: +REQUIRES(module:scipy) >>> from kwarray.util_robust import * # NOQA >>> from kwarray.distributions import Mixture >>> import ubelt as ub >>> # A random mixture distribution for testing >>> data = Mixture.random(6).sample(3000) >>> param_basis = { >>> 'scaling': ['linear', 'sigmoid'], >>> 'high': [0.6, 0.8, 0.9, 1.0], >>> } >>> param_grid = list(ub.named_product(param_basis)) >>> param_grid += ['auto'] >>> param_grid += ['tukey'] >>> rows = [] >>> rows.append({'key': 'orig', 'result': data}) >>> for params in param_grid: >>> key = ub.urepr(params, compact=1) >>> result, info = robust_normalize(data, return_info=True, params=params) >>> print('key = {}'.format(key)) >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> rows.append({'key': key, 'info': info, 'result': result}) >>> # xdoctest: +REQUIRES(--show) >>> import seaborn as sns >>> import kwplot >>> kwplot.autompl() >>> pnum_ = kwplot.PlotNums(nSubplots=len(rows)) >>> for row in rows: >>> ax = kwplot.figure(fnum=1, pnum=pnum_()).gca() >>> sns.histplot(data=row['result'], kde=True, bins=128, ax=ax, stat='density') >>> ax.set_title(row['key']) Example: >>> # xdoctest: +REQUIRES(module:kwimage) >>> from kwarray.util_robust import * # NOQA >>> import ubelt as ub >>> import kwimage >>> import kwarray >>> s = 512 >>> bit_depth = 11 >>> dtype = np.uint16 >>> max_val = int(2 ** bit_depth) >>> min_val = int(0) >>> rng = kwarray.ensure_rng(0) >>> background = np.random.randint(min_val, max_val, size=(s, s), dtype=dtype) >>> poly1 = kwimage.Polygon.random(rng=rng).scale(s / 2) >>> poly2 = kwimage.Polygon.random(rng=rng).scale(s / 2).translate(s / 2) >>> forground = np.zeros_like(background, dtype=np.uint8) >>> forground = poly1.fill(forground, value=255) >>> forground = poly2.fill(forground, value=122) >>> forground = (kwimage.ensure_float01(forground) * max_val).astype(dtype) >>> imdata = background + forground >>> normed, info = kwarray.robust_normalize(imdata, return_info=True) >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(imdata, pnum=(1, 2, 1), fnum=1) >>> kwplot.imshow(normed, pnum=(1, 2, 2), fnum=1) """ if axis is not None: # Hack, normalize each channel individually. This could # be implementd more effciently. reorg = imdata.swapaxes(0, axis) if return_info: infos_to_return = [] ... if mask is None: parts = [] for item in reorg: part = robust_normalize(item, nodata=nodata, params=params, dtype=dtype, axis=None, return_info=return_info) if return_info: infos_to_return.append(part[1]) part = part[0] parts.append(part[None, :]) else: reorg_mask = mask.swapaxes(0, axis) parts = [] for item, item_mask in zip(reorg, reorg_mask): part = robust_normalize(item, nodata=nodata, params=params, dtype=dtype, axis=None, mask=item_mask, return_info=return_info) if return_info: infos_to_return.append(part[1]) part = part[0] parts.append(part[None, :]) recomb = np.concatenate(parts, axis=0) final = recomb.swapaxes(0, axis) if return_info: return final, infos_to_return is_masked = isinstance(imdata, np.ma.MaskedArray) if is_masked: if mask is None: mask = ~imdata.mask imdata = imdata.data if imdata.dtype.kind == 'f': if mask is None: mask = ~np.isnan(imdata) if mask is None: if nodata is not None: mask = imdata != nodata if mask is None: imdata_valid = imdata else: imdata_valid = imdata[mask] assert not np.any(np.isnan(imdata_valid)) normalizer = find_robust_normalizers(imdata_valid, params=params) imdata_normalized = _apply_robust_normalizer(normalizer, imdata, imdata_valid, mask, dtype) if mask is not None: result = np.where(mask, imdata_normalized, imdata) else: result = imdata_normalized if is_masked: result = np.ma.MaskedArray(result, ~mask) if return_info: return result, normalizer else: return result
[docs] def _apply_robust_normalizer(normalizer, imdata, imdata_valid, mask, dtype, copy=True): """ TODO: abstract into a scikit-learn-style Normalizer class which can fit/predict different types of normalizers. """ import kwarray if normalizer['type'] is None: imdata_normalized = imdata.astype(dtype, copy=copy) elif normalizer['type'] == 'normalize': # Note: we are using kwarray normalize, the one in kwimage is deprecated imdata_valid_normalized = kwarray.normalize( imdata_valid.astype(dtype, copy=copy), mode=normalizer['mode'], beta=normalizer['beta'], alpha=normalizer['alpha'], min_val=normalizer.get('min_val', None), max_val=normalizer.get('max_val', None), ) if mask is None: imdata_normalized = imdata_valid_normalized else: imdata_normalized = imdata.copy() if copy else imdata imdata_normalized[mask] = imdata_valid_normalized else: raise KeyError(normalizer['type']) return imdata_normalized
[docs] def normalize(arr, mode='linear', alpha=None, beta=None, out=None, min_val=None, max_val=None): """ Normalizes input values based on a specified scheme. The default behavior is a linear normalization between 0.0 and 1.0 based on the min/max values of the input. Parameters can be specified to achieve more general constrat stretching or signal rebalancing. Implements the linear and sigmoid normalization methods described in [WikiNorm]_. Args: arr (NDArray): array to normalize, usually an image out (NDArray | None): output array. Note, that we will create an internal floating point copy for integer computations. mode (str): either linear or sigmoid. alpha (float): Only used if mode=sigmoid. Division factor (pre-sigmoid). If unspecified computed as: ``max(abs(old_min - beta), abs(old_max - beta)) / 6.212606``. Note this parameter is sensitive to if the input is a float or uint8 image. beta (float): subtractive factor (pre-sigmoid). This should be the intensity of the most interesting bits of the image, i.e. bring them to the center (0) of the distribution. Defaults to ``(max - min) / 2``. Note this parameter is sensitive to if the input is a float or uint8 image. min_val: inputs lower than this minimum value are clipped max_val: inputs higher than this maximum value are clipped. SeeAlso: :func:`find_robust_normalizers` - determine robust parameters for normalize to mitigate the effect of outliers. :func:`robust_normalize` - finds and applies robust normalization parameters References: .. [WikiNorm] https://en.wikipedia.org/wiki/Normalization_(image_processing) Example: >>> raw_f = np.random.rand(8, 8) >>> norm_f = normalize(raw_f) >>> raw_f = np.random.rand(8, 8) * 100 >>> norm_f = normalize(raw_f) >>> assert isclose(norm_f.min(), 0) >>> assert isclose(norm_f.max(), 1) >>> raw_u = (np.random.rand(8, 8) * 255).astype(np.uint8) >>> norm_u = normalize(raw_u) Example: >>> # xdoctest: +REQUIRES(module:kwimage) >>> import kwimage >>> arr = kwimage.grab_test_image('lowcontrast') >>> arr = kwimage.ensure_float01(arr) >>> norms = {} >>> norms['arr'] = arr.copy() >>> norms['linear'] = normalize(arr, mode='linear') >>> # xdoctest: +REQUIRES(module:scipy) >>> norms['sigmoid'] = normalize(arr, mode='sigmoid') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1, doclf=True) >>> pnum_ = kwplot.PlotNums(nSubplots=len(norms)) >>> for key, img in norms.items(): >>> kwplot.imshow(img, pnum=pnum_(), title=key) Example: >>> # xdoctest: +REQUIRES(module:kwimage) >>> arr = np.array([np.inf]) >>> normalize(arr, mode='linear') >>> # xdoctest: +REQUIRES(module:scipy) >>> normalize(arr, mode='sigmoid') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1, doclf=True) >>> pnum_ = kwplot.PlotNums(nSubplots=len(norms)) >>> for key, img in norms.items(): >>> kwplot.imshow(img, pnum=pnum_(), title=key) Benchmark: >>> # Our method is faster than standard in-line implementations for >>> # uint8 and competative with in-line float32, in addition to being >>> # more concise and configurable. In 3.11 all inplace variants are >>> # faster. >>> # xdoctest: +REQUIRES(module:kwimage) >>> import timerit >>> import kwimage >>> import kwarray >>> ti = timerit.Timerit(1000, bestof=10, verbose=2, unit='ms') >>> arr = kwimage.grab_test_image('lowcontrast', dsize=(512, 512)) >>> # >>> arr = kwimage.ensure_float01(arr) >>> out = arr.copy() >>> for timer in ti.reset('inline_naive(float)'): >>> with timer: >>> (arr - arr.min()) / (arr.max() - arr.min()) >>> # >>> for timer in ti.reset('inline_faster(float)'): >>> with timer: >>> max_ = arr.max() >>> min_ = arr.min() >>> result = (arr - min_) / (max_ - min_) >>> # >>> for timer in ti.reset('kwarray.normalize(float)'): >>> with timer: >>> kwarray.normalize(arr) >>> # >>> for timer in ti.reset('kwarray.normalize(float, inplace)'): >>> with timer: >>> kwarray.normalize(arr, out=out) >>> # >>> arr = kwimage.ensure_uint255(arr) >>> out = arr.copy() >>> for timer in ti.reset('inline_naive(uint8)'): >>> with timer: >>> (arr - arr.min()) / (arr.max() - arr.min()) >>> # >>> for timer in ti.reset('inline_faster(uint8)'): >>> with timer: >>> max_ = arr.max() >>> min_ = arr.min() >>> result = (arr - min_) / (max_ - min_) >>> # >>> for timer in ti.reset('kwarray.normalize(uint8)'): >>> with timer: >>> kwarray.normalize(arr) >>> # >>> for timer in ti.reset('kwarray.normalize(uint8, inplace)'): >>> with timer: >>> kwarray.normalize(arr, out=out) >>> print('ti.rankings = {}'.format(ub.urepr( >>> ti.rankings, nl=2, align=':', precision=5))) Ignore: globals().update(xdev.get_func_kwargs(normalize)) """ if out is None: out = arr.copy() # TODO: # - [ ] Parametarize new_min / new_max values # - [ ] infer from datatype # - [ ] explicitly given new_min = 0.0 if arr.dtype.kind in ('i', 'u'): # Need a floating point workspace float_out = out.astype(np.float32) new_max = float(np.iinfo(arr.dtype).max) elif arr.dtype.kind == 'f': float_out = out new_max = 1.0 else: raise TypeError(f'Normalize not implemented for {arr.dtype}') # TODO: # - [ ] Parametarize old_min / old_max strategies # - [X] explicitly given min and max # - [ ] raw-naive min and max inference # - [ ] outlier-aware min and max inference (see util_robust) if min_val is not None: old_min = min_val np.maximum(float_out, min_val, out=float_out) # float_out[float_out < min_val] = min_val else: try: old_min = np.nanmin(float_out) except ValueError: old_min = 0 if max_val is not None: old_max = max_val np.minimum(float_out, max_val, out=float_out) # float_out[float_out > max_val] = max_val else: try: old_max = np.nanmax(float_out) except ValueError: old_max = max(0, old_min) old_span = old_max - old_min new_span = new_max - new_min if mode == 'linear': # linear case # out = (arr - old_min) * (new_span / old_span) + new_min factor = 1.0 if old_span == 0 else (new_span / old_span) if old_min != 0: float_out -= old_min elif mode == 'sigmoid': # nonlinear case # out = new_span * sigmoid((arr - beta) / alpha) + new_min from scipy.special import expit as sigmoid if beta is None: # should center the desired distribution to visualize on zero beta = old_max - old_min if alpha is None: # division factor # from scipy.special import logit # alpha = max(abs(old_min - beta), abs(old_max - beta)) / logit(0.998) # This chooses alpha such the original min/max value will be pushed # towards -1 / +1. alpha = max(abs(old_min - beta), abs(old_max - beta)) / 6.212606 try: if isclose(alpha, 0): alpha = 1 except TypeError: alpha = alpha + np.isclose(alpha, 0) energy = float_out energy -= beta energy /= alpha # Ideally the data of interest is roughly in the range (-6, +6) float_out = sigmoid(energy, out=float_out) factor = new_span else: raise KeyError(mode) # Stretch / shift to the desired output range if factor != 1: float_out *= factor if new_min != 0: float_out += new_min if float_out is not out: final_out = float_out.astype(out.dtype) out.ravel()[:] = final_out.ravel()[:] return out