Source code for dmsky.targets

#!/usr/bin/env python
"""
Module for target objects. A target carries the physical information
about a target (ra, dec, distance, density profile, etc.) and
interfaces with the `jcalc` module to calculate the l.o.s. integral at
a given sky position. Classes inherit from the `Target` baseclass and
can be created with the `factory` function.
"""
import sys
import copy
from os.path import abspath, dirname, join
from collections import OrderedDict as odict

import numpy as np

from dmsky.jcalc import LoSIntegral, LoSIntegralFast, LoSIntegralInterp
from dmsky.utils import coords
from dmsky.utils.units import Units
from dmsky.utils.tools import update_dict, merge_dict, item_version
from dmsky.library import ObjectLibrary
from dmsky.density import factory as density_factory
from dmsky.density import DensityProfile
from dmsky.priors import PriorFunctor
from dmsky.priors import factory as prior_factory

from pymodeler.model import Model
from pymodeler.parameter import Property, Derived


[docs]class Target(Model): """A DM search target """ _params = odict([('title', Property(dtype=str, required=True, help='Human-readable name')), ('name', Property(dtype=str, required=True, help='Machine-readable name')), ('abbr', Property(dtype=str, required=True, help='Title abbreviation')), ('profile', Property(dtype=dict, required=True, help='Density Profile (see `jcalc`)')), ('version', Property(dtype=str, default=None, help='Which version of this target?')), ('ver_key', Property(dtype=str, default=None, help='Short key for the version?')), ('nickname', Property(dtype=str, default=None, help='Do we need this?')), ('altnames', Property(dtype=list, default=[], help='Alternative names')), ('ra', Property(dtype=float, format='%.3f', default=0.0, unit='deg', help='Right Ascension')), ('dec', Property(dtype=float, format='%.3f', default=0.0, unit='deg', help='Declination')), ('distance', Property(dtype=float, format='%.1f', default=0.0, unit='kpc', help='Distance')), ('dsigma', Property(dtype=float, format='%.1f', default=0.0, unit='kpc', help='Distance Uncertainty')), ('major_axis', Property(dtype=float, format='%.3f', default=np.nan, unit='arcmin', help='Major axis')), ('ellipticity', Property(dtype=float, format='%.3f', default=np.nan, unit=None, help='Ellipticity (1 - b/a)')), ('abs_mag', Property(dtype=float, format='%.3f', default=np.nan, unit='mag', help='Absolute magnitude (Mv)')), ('references', Property(dtype=list, default=[], help='Literature references')), ('color', Property(dtype=str, default='k', help='Plotting color')), ('mode', Property(dtype=str, default='interp', help='L.o.S. Integration mode')), ('j_rad_file', Property(dtype=str, default=None, help='File with J factor radial profile')), ('d_rad_file', Property(dtype=str, default=None, help='File with D factor radial profile')), ('j_map_file', Property(dtype=str, default=None, help='File with J factor map')), ('d_map_file', Property(dtype=str, default=None, help='File with D factor map')), ('j_prior_def', Property(dtype=dict, help='Details of J-factor prior distribution')), ('d_prior_def', Property(dtype=dict, help='Details of D-factor prior distribution')), ('density', Derived(dtype=DensityProfile, help='Density profile object')), ('proftype', Derived(dtype=str, help='Profile type (see `jcalc`)')), ('prof_par', Derived(dtype=np.ndarray, help='Profile parameters')), ('prof_err', Derived(dtype=np.ndarray, help='Profile uncertainties')), ('glat', Derived(dtype=float, format='%.3f', unit='deg', help='Galactic Longitude')), ('glon', Derived(dtype=float, format='%.3f', unit='deg', help='Galactic Latitude')), ('rad_max', Derived(dtype=float, format='%.1f', unit='kpc', help='Maximum integration radius')), ('psi_max', Derived(dtype=float, format='%.3f', unit='deg', help='Maximum integration angle')), ('j_integ', Derived(dtype=float, format='%.2e', unit='GeV2 / cm5', help='Integrated J factor')), ('j_sigma', Derived(dtype=float, format='%.2e', unit='GeV2 / cm5', help='Uncertainty on integ. J factor')), ('d_integ', Derived(dtype=float, format='%.2e', unit='GeV / cm2', help='Integrated D factor')), ('d_sigma', Derived(dtype=float, format='%.2e', unit='GeV / cm2', help='Uncertainty on integ. D factor')), ('j_profile', Derived(dtype=LoSIntegral, help='J factor profile')), ('j_derivs', Derived(dtype=dict, help='J factor profile derivatives')), ('j_prior', Derived(dtype=PriorFunctor, help='J factor likelihood prior distribution')), ('d_profile', Derived(dtype=LoSIntegral, help='D factor profile')), ('d_derivs', Derived(dtype=dict, help='D factor profile derivatives')), ('d_prior', Derived(dtype=PriorFunctor, help='D factor likelihood prior distribution'))]) def _density(self): """Create the `DensityProfile` object for this target """ prof_copy = self.profile.copy() ptype = prof_copy.pop('type', "NFW") return density_factory(ptype, **prof_copy) def _proftype(self): """Get the type of `DensityProfile` """ return self.profile.get('type') def _prof_par(self): """Get the profile parameters""" return self.density.param_values() def _prof_err(self): """Get the errors on the profile parameters""" return self.density.param_errors() def _glat(self): """Return the galactic latitude""" return coords.cel2gal(self.ra, self.dec)[1] def _glon(self): """Return the galactic longitude""" return coords.cel2gal(self.ra, self.dec)[0] def _rad_max(self): """Return the maximum integration radius""" units = self.getp('rad_max').unit return Units.convert_to(self.density.rmax, units) def _psi_max(self): """Return the maximum integration angle""" rmax = self.rad_max dist_kpc = self.distance if rmax > dist_kpc: return 180. return np.degrees(np.arcsin(rmax / dist_kpc)) def _j_integ(self): """Compute the integrated J-factor """ jprof = self.j_profile units = self.getp('j_integ').unit return Units.convert_to(jprof.angularIntegral(self.psi_max)[0], units) def _j_sigma(self): """Compute the uncertaintiy on the integrated J-factor """ jd = self.j_derivs den = self.density dv = np.matrix(np.zeros((len(den.deriv_params)))) for i, pname in enumerate(den.deriv_params): dv[0, i] = jd[pname].angularIntegral(self.psi_max)[0] return np.sqrt((dv * self.density.covar * dv.T)[0, 0]) def _d_integ(self): """Compute the integrated D-factor """ dprof = self.d_profile units = self.getp('d_integ').unit return Units.convert_to(dprof.angularIntegral(self.psi_max)[0], units) def _d_sigma(self): """Compute the uncertaintiy on the integrated D-factor """ try: dd = self.d_derivs except ValueError: return None den = self.density dv = np.matrix(np.zeros((len(den.deriv_params)))) for i, pname in enumerate(den.deriv_params): dv[0, i] = dd[pname].angularIntegral(self.psi_max)[0] return np.sqrt((dv * self.density.covar * dv.T)[0, 0]) def _density_integral(self, ann=True, derivPar=None): """Return a functor that calculates the LoS integral for various cases Parameters ---------- ann : bool build the functor for annihilation (i.e., integrate density^2 instead of density) derivPar : str build the functor for the derivative w.r.t. this parameter Returns ------- los : `LoSIntegral` Object that caculates line-of-sight integrals """ if self.distance == 0.: raise ValueError("Requested integration for LoSIntegral with distance = 0. %s " % self) try: if self.mode == 'interp': return LoSIntegralInterp(self.density, self.distance * Units.kpc, ann=ann, derivPar=derivPar) elif self.mode == 'fast': return LoSIntegralFast(self.density, self.distance * Units.kpc, ann=ann, derivPar=derivPar) return LoSIntegral(self.density, self.distance * Units.kpc, ann=ann, derivPar=derivPar) except ValueError as err: msg = str(err) msg += " for source %s: %s" % (self.name, str(self.profile)) raise ValueError(msg) def _j_profile(self): """Return an object that compute the J-factor at any direction Returns ------- los : `LoSIntegral` Object that caculates line-of-sight integrals for J-factors """ return self._density_integral(ann=True, derivPar=None) def _j_derivs(self): """Return an dict of objects that compute derivatives of the J-factor w.r.t. the profile parameters at any direction """ retDict = {} for pname in self.density.deriv_params: retDict[pname] = self._density_integral(ann=True, derivPar=pname) return retDict def _d_profile(self): """Return an object that compute the D-factor at any direction Returns ------- los : `LoSIntegral` Object that caculates line-of-sight integrals for D-factors """ return self._density_integral(ann=False, derivPar=None) def _d_derivs(self): """Return an dict of objects that compute derivatives of the D-factor w.r.t. the profile parameters at any direction """ retDict = {} for pname in self.density.deriv_params: retDict[pname] = self._density_integral(ann=False, derivPar=pname) return retDict def _j_prior(self): """Return the Prior on the J-factor Returns ------- prior : `PriorFunctor` Object that compute Prior value """ prior_copy = self.j_prior_def.copy() prior_type = prior_copy.pop('functype', 'lgauss_like') the_prior = prior_factory(prior_type, **prior_copy) return the_prior def _d_prior(self): """Return the Prior on the D-factor Returns ------- prior : `PriorFunctor` Object that compute Prior value """ prior_copy = self.d_prior_def.copy() prior_type = prior_copy.pop('functype', 'lgauss_like') the_prior = prior_factory(prior_type, **prior_copy) return the_prior def __str__(self, indent=0): """String represetation, includes name, ra, dec, distance and density """ ret = '{0:>{1}}'.format('', indent) ret += self.__class__.__name__ for k in ['name', 'ra', 'dec', 'distance', 'density']: v = getattr(self, k) ret += '\n %-15s: %s' % (k, v) return ret
[docs] def jvalue(self, ra, dec): """Return the J-factor in any direction Parameters ---------- ra : `numpy.ndarray` Right-accension (in degrees) dec : `numpy.ndarray` Declination (in degrees) Returns ------- values : `numpy.array` Return values, same shape as the input ra, dec """ sep = coords.angsep(self.ra, self.dec, ra, dec) return self.j_profile(np.radians(sep))
[docs] def jsigma(self, ra, dec): """Return the uncertainty of J in any direction Parameters ---------- ra : `numpy.ndarray` Right-accension (in degrees) dec : `numpy.ndarray` Declination (in degrees) Returns ------- values : `numpy.array` Return values, same shape as the input ra, dec """ raise RuntimeError('jsigma computation not implemented')
[docs] def dvalue(self, ra, dec): """Return the D-factor in any direction Parameters ---------- ra : `numpy.ndarray` Right-accension (in degrees) dec : `numpy.ndarray` Declination (in degrees) Returns ------- values : `numpy.array` Return values, same shape as the input ra, dec """ sep = coords.angsep(self.ra, self.dec, ra, dec) return self.d_profile(np.radians(sep))
[docs] def dsigma(self, ra, dec): """Return the uncertainty of J in any direction Parameters ---------- ra : `numpy.ndarray` Right-accension (in degrees) dec : `numpy.ndarray` Declination (in degrees) Returns ------- values : `numpy.array` Return values, same shape as the input ra, dec """ raise RuntimeError('dsigma computation not implemented')
def _create_map(self, func, npix=150, subsample=4, coordsys='CEL', projection='AIT'): """Create a spatial map of a function Parameters ---------- func : function Must take ra, dec as inputs the function we are mapping, npix : int Number of pixels along one axis of output map subsample : int Number of subsamples to take per pixel coordsys : str Coordniate system: 'GAL' or 'CEL' projection : str Map projection Returns ------- image : `numpy.ndarray` Image data pix : `numpy.ndarray` Pixel coordinatates wcs : `WCS.wcs` WCS object for map """ from dmsky.utils.wcs import create_image_wcs, get_pixel_skydirs from astropy.coordinates import SkyCoord import astropy.units as u skydir = SkyCoord(self.ra * u.deg, self.dec * u.deg) cdelt = 2 * self.psi_max / npix subnpix = npix * subsample subcdelt = cdelt / subsample subimage, wcs = create_image_wcs(skydir, subcdelt, subnpix, coordsys, projection) pix = get_pixel_skydirs(subimage.shape, wcs) subimage = func(pix.ra, pix.dec).reshape(subimage.shape) # Take the mean of the subsampled pixels if subsample > 1: image, wcs = create_image_wcs(skydir, cdelt, npix, coordsys, projection) pix = get_pixel_skydirs(image.shape, wcs) image = (subimage.reshape(npix, subsample, npix, subsample)).mean(axis=3).mean(axis=1) else: image = subimage return image, pix, wcs
[docs] def create_jmap(self, npix=150, subsample=4, coordsys='CEL', projection='AIT'): """Create a J-factor map Parameters ---------- npix : int Number of pixels along one axis of output map subsample : int Number of subsamples to take per pixel coordsys : str Coordniate system: 'GAL' or 'CEL' projection : str Map projection Returns ------- image : `numpy.ndarray` Image data pix : `numpy.ndarray` Pixel coordinatates wcs : `WCS.wcs` WCS object for map """ return self._create_map(self.jvalue, npix, subsample, coordsys, projection)
[docs] def create_dmap(self, npix=150, subsample=4, coordsys='CEL', projection='AIT'): """Create a D-factor map Parameters ---------- npix : int Number of pixels along one axis of output map subsample : int Number of subsamples to take per pixel coordsys : str Coordniate system: 'GAL' or 'CEL' projection : str Map projection Returns ------- image : `numpy.ndarray` Image data pix : `numpy.ndarray` Pixel coordinatates wcs : `WCS.wcs` WCS object for map """ return self._create_map(self.dvalue, npix, subsample, coordsys, projection)
[docs] def write_j_rad_file(self, j_rad_file=None, npts=50, minpsi=1e-4): """Write a text file with the J-fractor radial profile Parameters ---------- j_rad_file : str Filename to write to npts : int Number of angles to write minpsi : float Value for smallest angle to write """ if j_rad_file is None: j_rad_file = self.j_rad_file self._write_radial_profile(j_rad_file, self.j_profile, npts, minpsi)
[docs] def write_d_rad_file(self, d_rad_file=None, npts=50, minpsi=1e-4): """Write a text file with the D-fractor radial profile Parameters ---------- d_rad_file : str Filename to write to npts : int Number of angles to write minpsi : float Value for smallest angle to write """ if d_rad_file is None: d_rad_file = self.d_rad_file self._write_radial_profile(d_rad_file, self.d_profile, npts, minpsi)
def _write_radial_profile(self, filepath, profile, npts=50, minpsi=1e-4): """ Write radial profile to a text file The Fermi-LAT science tools expects a file two columns: Column 1: Angular offset in degrees Column 2: Profile value Column 2 should be normalized so that the angular integral over the sphere is 1. """ psi_vals = np.zeros((npts)) prof_vals = np.zeros((npts)) psi_vals[0:-1] = np.logspace(np.log10(minpsi), np.log10(self.psi_max), npts - 1) psi_vals[-1] = 180. prof_vals[0:-1] = profile(np.radians(psi_vals[0:-1])) x_vals = np.radians(psi_vals) y_vals = 2. * np.pi * x_vals * prof_vals # Trapezoid summation norm = ((x_vals[1:] - x_vals[0:-1]) * (y_vals[1:] + y_vals[0:-1]) / 2.).sum() prof_vals /= norm if filepath is None: fout = sys.stdout else: # Protect against writing out nans, as those will cause problems latter. if not np.isfinite(prof_vals).all(): raise ValueError('One of more values in the radial profile %s is not finite.' % filepath) fout = open(filepath, 'w!') for psi, prof in zip(psi_vals, prof_vals): fout.write("%0.3e %0.3e\n" % (psi, prof))
[docs] def write_jmap_wcs(self, filename, npix=150, clobber=False, map_kwargs=None, file_kwargs=None): """Write the J-factor to a template map. """ from dmsky.utils.wcs import create_image_hdu if map_kwargs is None: map_kwargs = {} if file_kwargs is None: file_kwargs = {} image, pix, wcs = self.create_jmap(npix=npix, **map_kwargs) # This assumes square pixels. norm = np.sum(image) * np.radians(wcs.wcs.cdelt[0])**2 norm_comment = "[%s] Normalization factor." % (self.getp('j_integ').unit) try: normerr = self.j_sigma normerr_comment = "[%s] Normalization uncertainty." % (self.getp('j_sigma').unit) except RuntimeError: normerr = None # Create the HDU hdu = create_image_hdu(image / norm, wcs) hdu.header.set('NORM', value=norm, comment=norm_comment) if normerr is not None: hdu.header.set('NORMERR', value=normerr, comment=normerr_comment) self.setp('j_map_file', value=filename) return hdu.writeto(filename, clobber=clobber, **file_kwargs)
[docs] def write_jmap_hpx(self, filename): """Write the J-factor to a template map. """ from dmsky.skymap import Skymap themap = Skymap([self], ann=True) themap.write_fits(filename) return themap
write_jmap = write_jmap_wcs
[docs] def write_dmap_wcs(self, filename, npix=150, clobber=False, map_kwargs=None, file_kwargs=None): """Write the D-factor to a template map. """ from dmsky.utils.wcs import create_image_hdu if map_kwargs is None: map_kwargs = {} if file_kwargs is None: file_kwargs = {} image, pix, wcs = self.create_dmap(npix=npix, **map_kwargs) # This assumes square pixels. norm = np.sum(image) * np.radians(wcs.wcs.cdelt[0])**2 norm_comment = "[%s] Normalization factor." % (self.getp('d_integ').unit) try: normerr = self.d_sigma normerr_comment = "[%s] Normalization uncertainty." % (self.getp('d_sigma').unit) except RuntimeError: normerr = None # Create the HDU hdu = create_image_hdu(image / norm, wcs) hdu.header.set('NORM', value=norm, comment=norm_comment) if normerr is not None: hdu.header.set('NORMERR', value=normerr, comment=normerr_comment) self.setp('d_map_file', value=filename) return hdu.writeto(filename, clobber=clobber, **file_kwargs)
[docs] def write_dmap_hpx(self, filename): """Write the D-factor to a template map. """ from dmsky.skymap import Skymap themap = Skymap([self], ann=False) themap.write_fits(filename)
write_dmap = write_dmap_wcs
[docs]class Galactic(Target): """Class to add specifics for Galactic DM targets""" pass
[docs]class Dwarf(Target): """Class to add specifics for Dwarf Galaxy DM targets"""
[docs] def j_photo(self, a=18.17, b=0.23): """Photometric J-factor from Eq 14 of Pace & Strigari (2019) [1802.06811v2] J = 10^{a} * (Lv / 1e4 Lsun)^{b} * (D/100 kpc)^-2 * (rhalf/100 pc)^-0.5 For Pace & Strigari (2019): a = 18.17, b = 0.23 For Pace & Strigari 1802.06811v1: a = 17.93, b = 0.32 Parameters ---------- a : normalization exponent b : luminosity scaling Returns ------- J : photometric J-factor within 0.5 deg """ Lv = 10**( (self.abs_mag - 4.83) / -2.5 ) # Lsun rhalf = self.major_axis * np.sqrt(1 - self.ellipticity) rhalf_physical = np.tan( np.radians(rhalf/60.) ) * self.distance * 1000 # pc photo_j = 10**(18.17) * (Lv / 1e4)**0.23 * (self.distance/100.)**-2 * (rhalf_physical/100.)**(-0.5) return photo_j
[docs]class Galaxy(Target): """Class to add specifics for Galaxy DM targets""" pass
[docs]class Cluster(Target): """Class to add specifics for Galaxy Cluster DM targets""" pass
[docs]class Isotropic(Target): """Class to add specifics for Isotropic DM targets""" pass
def factory(ttype, **kwargs): """Factory function to tuild a `Target` objects """ import dmsky.factory return dmsky.factory.factory(ttype, module=__name__, **kwargs)
[docs]class TargetLibrary(ObjectLibrary): """A top-level object, keeping track of all the `Target` objects that we have created """ _suffix = 'targets' _defaults = ( ('path', join(dirname(abspath(__file__)), 'data', _suffix)), )
[docs] def get_target_dict(self, name, version=None, default='default', **kwargs): """Step through the various levels of dependencies to get the full dictionary for a target. target: version -> ... -> target: default -> default: type Parameters ---------- name : str target name version : str version for target parameters default : str name of default parameters to use kwargs : dict keyword arguments passed to target dict Returns ------- dict : dictionary of target parameters """ n, v = item_version(name) if version is not None and v is not None: msg = "Version specified twice: %s, %s" % (name, version) raise ValueError(msg) if v is not None: version = v if version is None: version = default name = n # Start with the target:version requested try: ret = self.library[name][version] except KeyError as e: msg = "Couldn't find name=%s, version=%s"%(name,version) raise KeyError(msg) # Walk down the chain until we either return 'None' or the # 'default' version ret['version'] = version while (version is not None) and (version != default): version = ret.get('base', default) ret = merge_dict(self.library[name][version], ret) kwargs['name'] = name # And finally, overwrite with kwargs update_dict(ret, kwargs) return copy.deepcopy(ret)
[docs] def create_target(self, name, version=None, default='default', **kwargs): """Create a `Target` Parameters ---------- name : str A name for the `Target` version : str Key that specifies which set of parameters to used for this target default : str Name of default parameter dictionary to use Returns ------- target : `Target` The newly created Target """ kw = self.get_target_dict(name, version, default, **kwargs) ttype = kw.pop('type') return factory(ttype, **kw)