Source code for pileup

#!/usr/bin/python
# -*- coding: UTF-8 -*-

"""A module to study the pile-up of distributions"""

from __future__ import print_function

import math

import sys
import numpy as np
from scipy.optimize import fmin_l_bfgs_b, curve_fit
from scipy.spatial.distance import sqeuclidean

__author__ = 'Dih5'
__version__ = "0.1.0"


[docs]def poisson_lambda(rate): """Return the poisson characteristic parameter, given a rate of counting/pulse frequency""" return -np.log(1 - rate)
[docs]def count_to_particle_ratio(l): """ Return the expected value of the ratio of the number of particles arriving in a detector with the number of counts. This number is calculated assuming the number of counts per pulse in a detection is a random process conditioned by (number of counts > 0). Thus, the factor is given by :math:`\\frac{\\lambda}{1-e^{-\\lambda}}`. """ return l/(1-math.exp(-l))
def _exp_n(x, n): """Maclaurin series of order n of the exponential""" return np.sum([x ** j / np.math.factorial(j) for j in range(0, n + 1)]) def _mercator(x, n): """Mercator series of order n of the logarithm""" return np.sum([(-1) ** (j + 1) * x ** j / j for j in range(1, n + 1)]) def _pile_series(yy, l, n, bin_size=1.0): f = np.array(l) * yy # the piled-up function f_i = np.copy(f) # i-th convolution power of f n_fact = l # Hold lambda^i/i! # The first one is already added for i in range(2, n + 1): n_fact *= l / i f_i = np.convolve(f_i, yy) # Longer first to avoid swapping f_i /= sum(f_i) * bin_size f.resize(f_i.shape) f += f_i * np.array(n_fact) return f[:len(yy)] * np.array(1 / (np.exp(l) - 1)) # Make sure to cut added tails def _pile_fourier(yy, l, bin_size=1.0): four = bin_size * l * np.fft.fft(yy) # Discrete Fourier Transform of lambda*yy pile_factor = np.exp(l) - 1 # exp(lambda)-1 piled_four = list(map(lambda t: (np.exp(t) - 1) / pile_factor, four)) # Piled-up function in Fourier Space return np.real(np.fft.ifft(piled_four)) / bin_size def _pile_fourier_series(yy, l, n, bin_size=1.0): four = bin_size * l * np.fft.fft(yy) # Discrete Fourier Transform of lambda*yy pile_factor = np.exp(l) - 1 # exp(lambda)-1 piled_four = list(map(lambda t: (_exp_n(t, n) - 1) / pile_factor, four)) # Piled-up function in Fourier Space return np.real(np.fft.ifft(piled_four)) / bin_size def _pile_fourier_r(yy, l, bin_size=1.0): four = bin_size * l * np.fft.rfft(yy) # Discrete Fourier Transform of lambda*yy pile_factor = np.exp(l) - 1 # exp(lambda)-1 piled_four = list(map(lambda t: (np.exp(t) - 1) / pile_factor, four)) # Piled-up function in Fourier Space return np.fft.irfft(piled_four, len(yy)) / bin_size
[docs]def pile(yy, l, method='Fourier', series_order=10, bin_size=None, zero_pad=None): """ Calculate a piled spectrum. Args: yy (list of float): vector defining the original spectrum. l (float): poisson piling parameter. method (str): the method used to calculate the piling. Available methods include: * "series": A series expansion of the convolution exponential. * "fourier": "Exact" solution in the fourier domain. * "fourier_c": Same as fourier, using the fft instead of rfft. * "fourier_series": A series expansion in the fourier domain. series_order (int): the number of terms used in a series expansion method. bin_size(float): dE in the spectrum. If None, automatically chosen so it is normalized. zero_pad (int): Number of zeros to pad to the end of yy. Returns: (`numpy.ndarray`): The piled spectrum. """ if l == 0.0: # If pile-up is null, just copy the original spectrum return yy[:] if not bin_size: bin_size = 1 / sum(yy) if zero_pad: if isinstance(zero_pad, int): yy = np.pad(yy, (0, zero_pad), 'constant') else: raise TypeError("zero_pad must be an integer.") if not isinstance(method, str): print("Selected method is not a string. Using Fourier.", file=sys.stderr) r = _pile_fourier(yy, l, bin_size=bin_size) else: m = method.lower() if m == 'series': r = _pile_series(yy, l, series_order, bin_size=bin_size) elif m == 'fourier': r = _pile_fourier_r(yy, l, bin_size=bin_size) elif m == "fourierc" or m == "fourier_c" or m == "fourier-c": r = _pile_fourier(yy, l, bin_size=bin_size) elif m == 'fourier_series': r = _pile_fourier_series(yy, l, series_order, bin_size=bin_size) else: print("Unknown method selected. Using Fourier.", file=sys.stderr) r = _pile_fourier(yy, l, bin_size=bin_size) if zero_pad: return r[:-zero_pad] else: return r
def _depile_series(yy, l, n, bin_size=1.0): # BEWARE: This method fails to depile spectra... but the failure is mathematical! # Note this is analogous to the Mercator series, which only converges in (-1,1] f = np.array(np.exp(l) - 1) * yy # the depiled-up function f_i = np.copy(yy) # i-th convolution power of yy n_fact = 1 # (-1)^(i+1)*(exp(lambda)-1)^i # The first one is already added for i in range(2, n + 1): n_fact *= -1 * (np.exp(l) - 1) f_i = np.convolve(f_i, yy) # Longer first to avoid swapping f_i /= sum(f_i) * bin_size f.resize(f_i.shape) f += f_i * np.array(n_fact / i) return f / np.array(l) def _depile_fourier(yy, l, bin_size=1.0): four = bin_size * np.fft.fft(yy) # Discrete Fourier Transform of yy depile_factor = np.exp(l) - 1 # exp(lambda)-1 depiled_four = list(map(lambda t: np.log(1 + depile_factor * t) / l, four)) # Depiled function in Fourier Space return np.real(np.fft.ifft(depiled_four)) / bin_size def _depile_fourier_r(yy, l, bin_size=1.0): four = bin_size * np.fft.rfft(yy) # Discrete Fourier Transform of yy depile_factor = np.exp(l) - 1 # exp(lambda)-1 depiled_four = list(map(lambda t: np.log(1 + depile_factor * t) / l, four)) # Depiled function in Fourier Space return np.fft.irfft(depiled_four, len(yy)) / bin_size def _depile_fourier_series(yy, l, n, bin_size=1.0): four = bin_size * np.fft.fft(yy) # Discrete Fourier Transform of yy depile_factor = np.exp(l) - 1 # exp(lambda)-1 depiled_four = list( map(lambda t: (_mercator(depile_factor * t, n)) / l, four)) # Piled-up function in Fourier Space return np.real(np.fft.ifft(depiled_four)) / bin_size def _depile_nonparametric_fit(yy, l, depiled_0=None, bin_size=1.0): def distance_function(yy_tentative): return sqeuclidean(pile(yy_tentative, l, bin_size=bin_size), yy) if depiled_0 is None: depiled_0 = np.repeat(1 / len(yy), len(yy)) x, y, d = fmin_l_bfgs_b(distance_function, depiled_0, bounds=[(0, np.inf) for _ in range(len(yy))], approx_grad=True) return x def _depile_parametric_fit(yy, l, f, par_0=None, bin_size=1.0, fit_pars=None): xx = np.linspace(0, len(yy) * bin_size, len(yy)) def _piled_function(*args): return pile(f(*args), l) popt, pcov = curve_fit(_piled_function, np.linspace(0, len(yy) * bin_size, len(yy)), yy, p0=par_0) if fit_pars is not None: fit_pars.append(popt) fit_pars.append(pcov) return f(xx, *popt)
[docs]def depile(yy, l, method='Fourier', series_order=20, bin_size=None, zero_pad=None, f=None, par_0=None, fit_pars=None, depiled_0=None): """ Calculate a depiled spectrum. Args: yy (list of float): vector defining the piled spectrum. l (float): poisson piling parameter. method (str): the method used to calculate the depiling. Available methods include: * "series": A series expansion analogous to the Mercator series. **Might not converge**. * "fourier": "Exact" solution in the fourier domain. * "fourier_c": Same as fourier, using the fft instead of rfft. * "fourier_series": A series expansion in the fourier domain. * "parametric": Non-linear least squares fit a function using `scipy.optimize.curve_fit`. * "nonparametric": Non-linear least squares vector minimization using `scipy.optimize.fmin_l_bfgs_b`. series_order (int): the number of terms used in a series expansion method. f (callable): if "parametric" is used, the model function, which takes the independent variable as as the first argument and the parameters to fit as separate remaining arguments. par_0 (list of float): if "parametric" is used, the initial guess for the parameters. If None, then the initial values will all be 1 (if the number of parameters for the function can be determined using introspection, otherwise a ValueError is raised). fit_pars (list): If "parametric" is used, provide an empty list to recover the best-fit parameters and the covariance estimation. See `scipy.optimize.curve_fit`. depiled_0 (list of float): if "nonparametric" is used, the initial guess for the distribution. If None, the uniform distribution will be the starting distribution. bin_size (float): dE in the spectrum. If None, chosen so it is normalized. zero_pad (int): Number of zeros to pad to the end of yy. Returns: (`numpy.ndarray`): The depiled spectrum. Note: "parametric" and "nonparametric" methods might return non-global best-fits. Also, the time taken might be long, specially in "nonparametric". """ if l == 0.0: # If pile-up is null, just copy the original spectrum return yy[:] if not bin_size: bin_size = 1 / sum(yy) if zero_pad: if isinstance(zero_pad, int): yy = np.pad(yy, (0, zero_pad), 'constant') else: raise TypeError("zero_pad must be an integer.") if not isinstance(method, str): print("Selected method is not a string. Using Fourier.", file=sys.stderr) return _depile_fourier(yy, l) m = method.lower() if m == 'series': r = _depile_series(yy, l, series_order, bin_size=bin_size) elif m == 'fourier': r = _depile_fourier_r(yy, l, bin_size=bin_size) elif m == "fourierc" or m == "fourier_c" or m == "fourier-c": r = _depile_fourier(yy, l, bin_size=bin_size) elif m == 'fourier_series': r = _depile_fourier_series(yy, l, series_order, bin_size=bin_size) elif m == 'nonparametric': r = _depile_nonparametric_fit(yy, l, bin_size=bin_size, depiled_0=depiled_0) elif m == 'parametric': if f is None: raise TypeError("A function must be supplied to make a parametric fit.") r = _depile_parametric_fit(yy, l, f, par_0=par_0, bin_size=bin_size, fit_pars=fit_pars) else: print("Unknown method selected. Using Fourier.", file=sys.stderr) r = _depile_fourier(yy, l, bin_size=bin_size) if zero_pad: return r[:-zero_pad] else: return r
[docs]def piled_sample(l, f_sample, size=None): """ Sample from a distributions with an added poissonian pile-up""" # Sample once and add latter is faster than sampling each time event_counts = np.random.poisson(l, size) all_samples = f_sample(sum(event_counts)) cumulative = 0 for i in event_counts: if i == 0: continue yield sum(all_samples[cumulative:cumulative + i]) cumulative += i