#!/usr/bin/env python
"""
Python module for computing the line-of-sight integral over a
spherically symmetric distribution.
"""
__author__ = "Matthew Wood"
__date__ = "12/01/2011"
import numpy as np
from scipy.integrate import quad
from scipy.interpolate import interp1d, interp2d
from scipy.interpolate import UnivariateSpline
from dmsky.utils.units import Units
[docs]class LoSFn(object):
"""Integrand function (luminosity density) for LoS integration. The
parameter `alpha` introduces a change of variables:
x' = x^(1/alpha).
A value of alpha > 1 samples the integrand closer to x = 0
(distance of closest approach). The change of variables requires
the substitution:
dx = alpha * (x')^(alpha-1) dx'
"""
def __init__(self, dp, d, xi, alpha=3.0):
"""C'tor
Parameters
----------
dp : `DensityProfile`
The density profile to integrate
d : float
Distance to the halo center
xi: float
Offset angle in radians.
alpha: float
Rescaling exponent for line-of-sight coordinate.
"""
self.dp = dp
self.d2 = d**2
self.sinxi2 = np.sin(xi)**2
self.alpha = alpha
def __call__(self, xp, alpha=None):
""""Return the density along the Line-of-sight.
Parameters
----------
xp : `numpy.array`
Distance along the line of sight
alpha : float
Rescaling exponent for line-of-sight coordinate
Returns
-------
values : `numpy.array`
Return values, same shape as the input xp
"""
if alpha is None:
alpha = self.alpha
x = xp**alpha
r = np.sqrt(x**2 + self.d2 * self.sinxi2)
try:
if (x <= 0.).all():
raise ValueError("Tried to compute _rho at x=0")
except AttributeError:
if x <= 0.:
raise ValueError("Tried to compute _rho at x=0")
return self.func(r) * alpha * xp**(alpha - 1.0)
[docs] def func(self, r):
"""Function to compute the integrand
"""
return self.dp.rho(r)**2
[docs]class LoSAnnihilate(LoSFn):
"""Integrand function for LoS annihilation (J-factor)."""
[docs] def func(self, r):
"""Function to compute the line-of-sight integrand
"""
return self.dp.rho(r)**2
[docs]class LoSAnnihilate_Deriv(LoSFn):
"""Integrand function for LoS annihilation (J-factor)."""
def __init__(self, dp, d, xi, paramNames, alpha=3.0):
"""C'tor
Parameters
----------
dp : `DensityProfile`
The density profile to integrate
d : float
Distance to the halo center
xi: float
Offset angle in radians.
paramNames : list
Parameters to differentiate w.r.t.
alpha: float
Rescaling exponent for line-of-sight coordinate.
"""
super(LoSAnnihilate_Deriv, self).__init__(dp, d, xi, alpha)
self.__paramNames = paramNames
[docs] def func(self, r):
"""Function to compute the line-of-sight integrand
"""
return 2. * self.dp.rho(r) * self.dp.rho_deriv(r, self.__paramNames)
[docs]class LoSDecay(LoSFn):
"""Integrand function for LoS decay (D-factor)."""
[docs] def func(self, r):
"""Function to compute the line-of-sight integrand
"""
return self.dp.rho(r)
[docs]class LoSDecay_Deriv(LoSFn):
"""Integrand function for LoS decay (D-factor)."""
def __init__(self, dp, d, xi, paramNames, alpha=1.0):
"""C'tor
Parameters
----------
dp : `DensityProfile`
The density profile to integrate
d : float
Distance to the halo center
xi: float
Offset angle in radians.
paramNames : list
Parameters to differentiate w.r.t.
alpha: float
Rescaling exponent for line-of-sight coordinate.
"""
super(LoSDecay_Deriv, self).__init__(dp, d, xi, alpha)
self.__paramNames = paramNames
[docs] def func(self, r):
"""Function to compute the line-of-sight integrand
"""
return self.dp.rho(r) * self.dp.rho_deriv(r, self.__paramNames)
[docs]class LoSIntegral(object):
"""Slowest (and most accurate?) LoS integral. Uses
scipy.integrate.quad with a change of variables to better sample
the LoS close to the halo center.
"""
def __init__(self, density, dhalo, alpha=3.0, ann=True, derivPar=None):
"""C'tor
Parameters
----------
density : `DensityProfile`
The density profile to integrate
dhalo : float
Distance to the halo center
alpha: float
Rescaling exponent for line-of-sight coordinate.
ann: bool
True for annihilation, False for decay
derivPar : list
Parameters to differentiate w.r.t.
"""
self.dp = density
self.dhalo = dhalo
self.alpha = float(alpha)
self.ann = ann
self.derivPar = derivPar
@property
def rmax(self):
"""Return the maximum integration radius
"""
return self.dp.rmax
@property
def name(self):
"""Return the name of this profile
"""
return self.__class__.__name__
def _make_losfn(self, dhalo, psi):
"""Build a function that returns the line-of-sight integral
Parameters
----------
dhalo : float
Distance to the halo center
psi : float
Angular offsets from the halo center
Returns
-------
integrator : `LoSFn`
Object that computes the line of sight integral
"""
if self.ann:
if self.derivPar is None:
losfn = LoSAnnihilate(self.dp, dhalo, psi, self.alpha)
else:
losfn = LoSAnnihilate_Deriv(self.dp, dhalo, psi, [self.derivPar], self.alpha)
else:
if self.derivPar is None:
losfn = LoSDecay(self.dp, dhalo, psi, self.alpha)
else:
losfn = LoSDecay_Deriv(self.dp, dhalo, psi, [self.derivPar], self.alpha)
return losfn
def __call__(self, psi, dhalo=None, degrees=False):
"""Evaluate the LoS integral at the offset angle psi for a halo
located at the distance dhalo.
Parameters
----------
psi : `numpy.ndarray`
Array of offset angles (in radians by default)
dhalo : `numpy.ndarray`
Array of halo distances
degrees : bool
Flag to interpret `psi` in degrees instead of radians
Returns
-------
values : `numpy.array`
Return values, same shape as the input psi
"""
scalar = np.isscalar(psi)
if degrees:
psi = np.deg2rad(psi)
psi = np.atleast_1d(psi)
if dhalo is None:
if self.dhalo is None:
msg = "Halo distance must be specified"
raise Exception(msg)
dhalo = self.dhalo
dhalo = np.atleast_1d(dhalo)
if dhalo.size == 1:
dhalo = dhalo * np.ones_like(psi)
else:
if dhalo.size != psi.size:
msg = "Array sizes must match"
raise ValueError(msg)
v = self._integrate(psi, dhalo)
if scalar:
return v[0]
return v
def _integrate(self, psi, dhalo):
"""Evaluate the LoS integral at the offset angle psi for a halo
located at the distance dhalo.
Parameters
----------
psi : `numpy.ndarray`
Array of offset angles (in radians by default)
dhalo : `numpy.ndarray`
Array of halo distances
Returns
-------
values : `numpy.array`
Return values, same shape as the input psi
"""
# Arrays must be the same shape
assert psi.shape == dhalo.shape
# Output Array
v = np.zeros_like(psi)
# Closest approach to halo center
rmin = dhalo * np.sin(psi)
# Maximum extent of halo
rmax = self.rmax
for i, t in np.ndenumerate(psi):
s0, s1 = 0, 0
losfn = self._make_losfn(dhalo[i], psi[i])
# Closest approach to halo center
#rmin = dhalo[i]*np.sin(psi[i])
# If observer inside the halo...
if rmax > dhalo[i]:
if psi[i] < np.pi / 2.:
x0 = np.power(dhalo[i] * np.cos(psi[i]), 1. / self.alpha)
s0 = 2 * quad(losfn, 0.0, x0)[0]
x1 = np.power(np.sqrt(rmax**2 - rmin[i]**2), 1. / self.alpha)
s1 = quad(losfn, x0, x1)[0]
else:
x0 = np.power(np.abs(dhalo[i] * np.cos(psi[i])), 1. / self.alpha)
x1 = np.power(np.sqrt(rmax**2 - rmin[i]**2), 1. / self.alpha)
s1 = quad(losfn, x0, x1)[0]
# If observer outside the halo...
elif (rmax > rmin[i]) and (psi[i] < np.pi / 2.):
x0 = np.power(np.sqrt(rmax**2 - rmin[i]**2), 1. / self.alpha)
s0 = 2 * quad(losfn, 0.0, x0)[0]
v[i] = s0 + s1
return v
[docs] def angularIntegral(self, angle=None):
"""Compute the solid-angle integrated j-value
within a given radius
Parameters
----------
angle : `numpy.ndarray` or None
Maximum integration angle (in degrees)
If None, use the 'rmax' and 'dhalo' parameters.
Returns
-------
values : `numpy.array`
Return values, same shape as the input xp
"""
if angle is None:
angle = np.degrees(np.arctan2(self.rmax, self.dhalo))
angle = np.asarray(angle)
if angle.ndim == 0:
angle = np.array([angle])
integrand = lambda r: self(r) * 2 * np.pi * np.sin(r)
integral = []
for a in angle:
integral.append(quad(integrand, 1e-7, np.radians(a), full_output=True)[0])
integral = np.asarray(integral)
return integral
[docs]class LoSIntegralFast(LoSIntegral):
"""Vectorized version of LoSIntegral that performs midpoint
integration with a fixed number of steps.
"""
def __init__(self, density, dhalo, alpha=3.0, ann=True, nsteps=400, derivPar=None):
"""C'tor
Parameters
----------
density : `DensityProfile`
The density profile to integrate
dhalo : float
Distance to the halo center
alpha: float
Rescaling exponent for line-of-sight coordinate.
ann: bool
True for annihilation, False for decay
nsteps : int
Number of steps for
derivPar : list
Parameters to differentiate w.r.t.
"""
super(LoSIntegralFast, self).__init__(density, dhalo, alpha, ann, derivPar)
self.nsteps = nsteps
xedge = np.linspace(0, 1.0, self.nsteps + 1)
self.x = 0.5 * (xedge[1:] + xedge[:-1])
@property
def rmax(self):
"""Return the maximum integration radius
"""
if self.dp.rmax < np.inf:
return self.dp.rmax
return 1000 * self.dp.rs
def _integrate(self, psi, dhalo):
"""Internal function to do the LOS integral
Parameters
----------
psi : `numpy.ndarray`
Array of offset angles (in radians by default)
dhalo : `numpy.ndarray`
Array of halo distances
Returns
-------
values : `numpy.array`
Return values, same shape as the input psi
"""
# Arrays must be the same shape
assert psi.shape == dhalo.shape
v = np.zeros(psi.shape + (1,))
dhalo = np.ones(v.shape) * dhalo[..., np.newaxis]
psi = np.ones(v.shape) * psi[..., np.newaxis]
# Closest approach to halo center
rmin = dhalo * np.sin(psi)
losfn = self._make_losfn(dhalo, psi)
msk0 = self.rmax > dhalo
msk1 = self.rmax > rmin
# Distance between observer and point of closest approach
xlim0 = np.abs(dhalo * np.cos(psi))**(1. / self.alpha)
# Distance from point of closest approach to maximum
# integration radius
xlim1 = np.zeros(shape=psi.shape)
xlim1[msk1] = np.sqrt(self.rmax**2 - rmin[msk1]**2)**(1. / self.alpha)
# If observer inside the halo...
if np.any(msk0):
msk01 = msk0 & (psi < np.pi / 2.)
msk02 = msk0 & ~(psi < np.pi / 2.)
if np.any(msk01):
dx0 = xlim0 / float(self.nsteps)
dx1 = (xlim1 - xlim0) / float(self.nsteps)
x0 = self.x * xlim0
x1 = xlim0 + self.x * (xlim1 - xlim0)
s0 = 2 * np.apply_over_axes(np.sum, losfn(x0) * dx0, axes=[-1])
s1 = np.apply_over_axes(np.sum, losfn(x1) * dx1, axes=[-1])
v[msk01] = s0[msk01] + s1[msk01]
if np.any(msk02):
dx1 = (xlim1 - xlim0) / float(self.nsteps)
x1 = xlim0 + self.x * (xlim1 - xlim0)
s0 = np.apply_over_axes(np.sum, losfn(x1) * dx1, axes=[-1])
v[msk02] = s0[msk02]
# Observer outside the halo...
# Only calculate integral for psi < pi/2
msk11 = (~msk0 & msk1) & (psi < np.pi / 2.)
if np.any(msk11):
dx0 = xlim1 / float(self.nsteps)
x0 = xlim1 * self.x
s0 = 2 * np.apply_over_axes(np.sum, losfn(x0) * dx0, axes=[-1])
v[msk11] = s0[msk11]
return v.reshape(v.shape[:-1])
[docs]class LoSIntegralInterp(LoSIntegralFast):
""" Interpolate fast integral a for even faster look-up. """
def __init__(self, density, dhalo, alpha=3.0, ann=True, nsteps=400, derivPar=None):
"""C'tor
Parameters
----------
density : `DensityProfile`
The density profile to integrate
dhalo : float
Distance to the halo center
alpha: float
Rescaling exponent for line-of-sight coordinate.
ann: bool
True for annihilation, False for decay
nsteps : int
Number of steps for vectorization
derivPar : list
Parameters to differentiate w.r.t.
"""
super(LoSIntegralInterp, self).__init__(density, dhalo, alpha, ann, nsteps, derivPar)
try:
if (self.dhalo <= 0).any():
raise ValueError("dhalo == 0")
except AttributeError:
if self.dhalo <= 0:
raise ValueError("dhalo == 0")
self.func = self.create_func(self.dhalo)
[docs] def create_profile(self, dhalo, nsteps=None):
"""Create a spatial J-factor profile
Parameters
----------
dhalo : `numpy.ndarray`
Array of halo distances
nsteps : int
Number of steps for vectorization
Returns
-------
dhalo, psi : `numpy.meshgrid`
Array of halo distances and angular offsets
jval : `numpy.array`
Corresponding J-factors
"""
if not nsteps:
nsteps = self.nsteps
dhalo = np.unique(np.atleast_1d(dhalo))
if (dhalo <= 0.).any():
raise ValueError("Halo distance == 0")
psi = np.logspace(np.log10(1e-7), np.log10(np.pi), nsteps)
_dhalo, _psi = np.meshgrid(dhalo, psi)
_jval = super(LoSIntegralInterp, self)._integrate(_psi, _dhalo)
mask = np.isfinite(_jval) * (_jval > 0.)
if mask.sum() < 4:
raise ValueError('Failed to compute grid for density values. Is profile well defined')
return np.log10([_dhalo, _psi, _jval])
[docs] def create_func(self, dhalo):
"""Create the spline function
Parameters
----------
dhalo : `numpy.ndarray`
Array of halo distances
Returns
-------
func : function
A function that return J-factor as a function of psi and dhalo
"""
log_dhalo, log_psi, log_jval = self.create_profile(dhalo)
zeroval = -99
log_jval[np.where(log_jval == -np.inf)] = zeroval
if log_dhalo.shape[-1] == 1:
# print('interp1d')
# spline=UnivariateSpline(log_psi.flat,log_jval.flat,k=2,s=0)
#fn = lambda psi: 10**(spline(np.log10(psi)))
interp = interp1d(log_psi.flat, log_jval.flat, kind='linear')
def fn(psi, dhalo):
"""Function to compute the J-factor
"""
log_jval = interp(np.log10(psi))
log_jval[np.where(log_jval < zeroval + 1)] = -np.inf
return 10**log_jval
else:
# print('interp2d')
#spline = bisplrep(log_psi,log_dhalo,log_jval,s=0.0,kx=2,ky=2)
#fn = lambda psi,dhalo: 10**bisplev(np.log10(psi[:,0]),np.log10(dhalo[0,:]),spline)
interp = interp2d(log_psi, log_dhalo, log_jval, kind='linear')
def fn(psi, dhalo):
"""Function to compute the J-factor
"""
log_jval = interp(np.log10(psi[:, 0]), np.log10(dhalo[0, :])).T
log_jval[np.where(log_jval < zeroval + 1)] = -np.inf
return 10**log_jval
return fn
def _integrate(self, psi, dhalo):
"""Internal function to the the LOS integral
Parameters
----------
psi : `numpy.ndarray`
Array of offset angles (in radians by default)
dhalo : `numpy.ndarray`
Array of halo distances
Returns
-------
values : `numpy.array`
Return values, same shape as the input psi
"""
# Arrays must be the same shape
if psi.shape != dhalo.shape:
msg = "Shape of psi and dhalo must match"
raise ValueError(msg)
if psi.ndim > 1 and not (np.unique(psi) == psi[:, 0]).all():
msg = "np.unique(psi) != psi[:,0]"
raise ValueError(msg)
if dhalo.ndim > 1 and not (np.unique(dhalo) == dhalo[0, :]).all():
msg = "np.unique(dhalo) != dhalo[0,:]"
raise ValueError(msg)
# All halo distances within pre-existing interpolation range
if ((np.max(self.dhalo) >= dhalo) & (np.min(self.dhalo) <= dhalo)).all():
func = self.func
else:
func = self.create_func(dhalo)
v = func(psi, dhalo)
if v.shape != psi.shape:
msg = "Input and output shape do not match"
raise ValueError(msg)
return v
[docs]class LoSIntegralFile(LoSIntegralInterp):
"""Interpolate over a pre-generated file.
NOT IMPLEMENTED YET
"""
def __init__(self, dp, dist, filename, ann=True):
"""C'tor
Parameters
----------
dp : `DensityProfile`
The density profile to integrate
dist : float
Distance to the halo center
filename: str
File with tabulated results
ann: bool
True for annihilation, False for decay
"""
super(LoSIntegralFile, self).__init__(dp, dist, ann=ann)
self.filename = filename
[docs] def create_profile(self, dhalo, nsteps=300):
"""Build the profile values"""
log_psi, log_jval = np.loadtxt(self.filename, unpack=True)
return self.dhalo, log_psi, log_jval
[docs]class ROIIntegrator(object):
"""Class to integrate a J-factor over a region of interest
"""
def __init__(self, jspline, lat_cut, lon_cut, source_list=None):
""" C'tor
"""
self._jspline = jspline
self._lat_cut = lat_cut
self._lon_cut = lon_cut
nbin_thetagc = 720
thetagc_max = 180.
self._phi_edges = np.linspace(0., 360., 720 + 1)
self._theta_edges = np.linspace(0., thetagc_max, nbin_thetagc + 1)
self._sources = None
if not source_list is None:
source_list = np.loadtxt(opts.source_list, unpack=True, usecols=(1, 2))
self._sources = Vector3D.createLatLon(np.radians(source_list[0]),
np.radians(source_list[1]))
self.compute()
[docs] def compute(self):
"""Integrate the ROI
"""
yaxis = Vector3D(np.pi / 2. * np.array([0., 1., 0.]))
costh_edges = np.cos(np.radians(self._theta_edges))
costh_width = costh_edges[:-1] - costh_edges[1:]
phi = 0.5 * (self._phi_edges[:-1] + self._phi_edges[1:])
self._theta = 0.5 * (self._theta_edges[:-1] + self._theta_edges[1:])
self._jv = []
self._domega = []
for i0, th in enumerate(self._theta):
jtot = integrate(lambda t: self._jspline(8.5 * Units.kpc, t) * np.sin(t),
np.radians(self._theta_edges[i0]),
np.radians(self._theta_edges[i0 + 1]), 100)
# jval = jspline(np.radians(th))*costh_width[i0]
v = Vector3D.createThetaPhi(np.radians(th), np.radians(phi))
v.rotate(yaxis)
lat = np.degrees(v.lat())
lon = np.degrees(v.phi())
src_msk = len(lat) * [True]
if not self._sources is None:
for k in range(len(v.lat())):
p = Vector3D(v._x[:, k])
sep = np.degrees(p.separation(self._sources))
imin = np.argmin(sep)
minsep = sep[imin]
if minsep < 0.62:
src_msk[k] = False
msk = ((np.abs(lat) >= self._lat_cut) |
((np.abs(lat) <= self._lat_cut) & (np.abs(lon) < self._lon_cut)))
msk &= src_msk
dphi = 2. * np.pi * float(len(lat[msk])) / float(len(phi))
jtot *= dphi
# jsum += jtot
# domegasum += costh_width[i0]*dphi
self._jv.append(jtot)
self._domega.append(costh_width[i0] * dphi)
self._jv = np.array(self._jv)
self._jv_cum = np.cumsum(self._jv)
self._jv_cum_spline = UnivariateSpline(self._theta_edges[1:],
self._jv_cum,
s=0, k=1)
self._domega = np.array(self._domega)
self._domega_cum = np.cumsum(self._domega)
[docs] def eval(self, rgc, decay=False):
"""Evaluate the J-factor
"""
if decay:
units0 = Units.gev_cm2
units1 = (8.5 * Units.kpc * 0.4 * Units.gev_cm3)
else:
units0 = Units.gev2_cm5
units1 = (8.5 * Units.kpc * np.power(0.4 * Units.gev_cm3, 2))
rgc = [float(t) for t in rgc.split('/')]
if len(rgc) == 1:
jv = self._jv_cum_spline(rgc[0])
domega = np.cos(np.radians(rgc[0])) * 2 * np.pi / Units.deg2
else:
jv = self._jv_cum_spline(rgc[1]) - self._jv_cum_spline(rgc[0])
domega = -(np.cos(np.radians(rgc[1])) -
np.cos(np.radians(rgc[0]))) * 2 * np.pi / Units.deg2
print('%20.6g %20.6g %20.6g %20.6g' % (jv,
jv / units0,
jv / units1, domega))
[docs] def print_profile(self, decay=False):
"""Print the profile
"""
if decay:
units0 = Units.gev_cm2
units1 = (8.5 * Units.kpc * 0.4 * Units.gev_cm3)
else:
units0 = Units.gev2_cm5
units1 = (8.5 * Units.kpc * np.power(0.4 * Units.gev_cm3, 2))
for i, th in enumerate(self._theta_edges[1:]):
jv = self._jv_cum[i]
print('%10.2f %20.6g %20.6g %20.6g %20.6g' % (th, jv,
jv / units0,
jv / units1,
self._domega_cum[i]))