Source code for mapsims.noise

from collections import defaultdict
import numpy as np
import healpy as hp
import logging

log = logging.getLogger("mapsims")

try:  # PySM >= 3.2.1
    import pysm3.units as u
except ImportError:
    import pysm.units as u

from so_models_v3 import SO_Noise_Calculator_Public_v3_1_1 as so_models

# pixell is optional and needed when CAR simulations are requested
try:
    import pixell
    import pixell.curvedsky
    import pixell.powspec
except:
    pixell = None

from .channel_utils import parse_channels
from .utils import RemoteData

sensitivity_modes = {"baseline": 1, "goal": 2}
one_over_f_modes = {"pessimistic": 0, "optimistic": 1}
default_mask_value = {"healpix": hp.UNSEEN, "car": np.nan}
_hitmap_version = "v0.2"


class BaseNoiseSimulator:
    def __init__(
        self,
        nside=None,
        shape=None,
        wcs=None,
        ell_max=None,
        return_uK_CMB=True,
        apply_beam_correction=False,
        apply_kludge_correction=True,
        homogeneous=False,
        no_power_below_ell=None,
        rolloff_ell=50,
        survey_efficiency=0.2,
        full_covariance=True,
        sky_fraction=None,
        cache_hitmaps=True,
        boolean_sky_fraction=False,
        channels_list=None,
        instrument_parameters=None,
    ):
        """An abstract base class for simulating noise maps

        Simulate the noise power spectrum in spherical harmonics domain and then generate a map
        in microK_CMB or microK_RJ (based on return_uK_CMB)

        The details of the noise properties need to be defined in the child class.

        Parameters
        ----------

        nside : int
            nside of HEALPix map. If None, uses
            rectangular pixel geometry specified through shape and wcs.
        shape : tuple of ints
            shape of ndmap array (see pixell.pixell.enmap). Must also specify wcs.
        wcs : astropy.wcs.wcs.WCS instance
            World Coordinate System for geometry of map (see pixell.pixell.enmap). Must
            also specify shape.
        ell_max : int
            Maximum ell for the angular power spectrum, if not provided set to 3 * nside when using healpix
            or 10000 * (1.0 / pixel_height_arcmin) when using CAR, corresponding roughly to the Nyquist
            frequency.
        return_uK_CMB : bool
            True, output is in microK_CMB, False output is in microK_RJ
        apply_beam_correction : bool
            Include the effect of the beam in the noise angular power spectrum
        apply_kludge_correction : bool
            If True, reduce the hitcount by a factor of 0.85 to account for not-uniformity in the scanning
        homogeneous : bool
            Set to True to generate full-sky maps with no hit-count variation, with noise curves
            corresponding to a survey that covers a sky fraction of sky_fraction (defaults to 1).
        no_power_below_ell : int
            The input spectra have significant power at low :math:`\\ell`,
            we can zero that power specifying an integer :math:`\\ell` value here.
            The power spectra at :math:`\\ell < \\ell_0` are set to zero.
        rolloff_ell : int
            Low ell power damping, see the docstring of
            `so_noise_models.so_models_v3.SO_Noise_Calculator_Public_v3_1_1.rolloff`
        full_covariance : bool
            Whether or not to include the intra-tube covariance between bands.
            If white noise (atmosphere=False) sims are requested, no
            covariance is included regardless of the value of full_covariance.
        survey_efficiency : float
            Fraction of calendar time that may be used to compute map depth.
        sky_fraction : optional,float
            If homogeneous is True, this sky_fraction is used for the noise curves.
        cache_hitmaps : bool
            If True, caches hitmaps.
        boolean_sky_fraction: bool
            If True, determines sky fraction based on fraction of hitmap that is zero. If False,
            determines sky_fraction from <Nhits>.
        channels_list: a list of channels or pass
        instrument_parameters : Path or str
            See the help of MapSims
        """
        if channels_list is None:
            channels_list = parse_channels(instrument_parameters=instrument_parameters)
        self.channels = {ch.tag: ch for ch in channels_list}
        self.tubes = defaultdict(list)
        for ch in channels_list:
            self.tubes[ch.tube].append(ch)

        self.channel_per_tube = len(self.tubes[channels_list[0].tube])

        if nside is None:
            assert shape is not None
            assert wcs is not None
            self.healpix = False
            self.shape = shape[-2:]
            self.wcs = wcs
            self._pixheight = np.abs(wcs.wcs.cdelt[0] * 60.0)
            self.ell_max = (
                ell_max if ell_max is not None else 10000 * (1.0 / self._pixheight)
            )
            self.pixarea_map = pixell.enmap.pixsizemap(self.shape, self.wcs)
            self.map_area = pixell.enmap.area(self.shape, self.wcs)
        else:
            assert wcs is None
            self.healpix = True
            self.nside = nside
            self.ell_max = ell_max if ell_max is not None else 3 * nside
            self.pixarea_map = hp.nside2pixarea(nside)
            self.map_area = 4.0 * np.pi

        self.rolloff_ell = rolloff_ell
        self.boolean_sky_fraction = boolean_sky_fraction
        self._sky_fraction = sky_fraction

        self.apply_beam_correction = apply_beam_correction
        self.apply_kludge_correction = apply_kludge_correction
        self.survey_efficiency = survey_efficiency
        if self.apply_kludge_correction:
            self.survey_efficiency *= 0.85
        self.full_covariance = full_covariance
        self.return_uK_CMB = return_uK_CMB
        self.no_power_below_ell = no_power_below_ell
        self.homogeneous = homogeneous

        self.hitmap_version = _hitmap_version
        self._cache = cache_hitmaps
        self._hmap_cache = {}

    def get_beam_fwhm(self, tube, band=None):
        """Get beam FWHMs in arcminutes corresponding to the tueb.
        This is useful if non-beam-deconvolved sims are requested and you want to
        know what beam to apply to your signal simulation.

        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute

        band : str,optional
            Optionally specify the band name within the tube to get just its
            white noise.


        Returns
        -------

        beam : tuple of floats
            The beam FWHM in arcminutes either as
            a tuple for the pair of bands in the tube, or just for the specific
            band requested.
            See the `band_id` attribute of the Channel class
            to identify which is the index of a Channel in the array.
        """

        survey = self.get_survey(tube)
        noise_indices = self.get_noise_indices(tube, band)
        return survey.get_beams()[noise_indices]

    def get_noise_indices(self, tube, band=None):
        """Gets indices in the so_noise_model package of a channel or the 2 channels of a tube"""
        if band is None:
            band_indices = [ch.noise_band_index for ch in self.tubes[tube]]
        else:
            band_indices = self.channels[tube + "_" + band].noise_band_index
        return band_indices

    def get_survey(self, tube):
        """Internal function to get the survey object
        from the SO noise model code.
        """
        raise AssertionError("Must be overriden: Implement in child class")

    def get_fullsky_noise_spectra(self, tube, ncurve_sky_fraction=1, return_corr=False):
        """Get the noise power spectra corresponding to the requested tube
        from the SO noise model code.

        See get_noise_properties to get spectra scaled with the proper hitmap

        See the `band_id` attribute of the Channel class
        to identify which is the index of a Channel in the returned arrays.

        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute

        ncurve_sky_fraction : float,optional
            The sky fraction to report to the noise simulator code.
            In the current implementation, the default is to pass
            a sky fraction of 1, and scale the result by
            the corresponding sky fraction determined from each
            band's hitmap.

        return_corr : bool
            If True, returns cross-correlation N_XY / sqrt(N_XX * N_YY) coeffient instead of
            cross-correlation power N_XY in the third row of the returned arrays. This is
            more convenient sometimes, e.g. when you need to scale the auto-correlation power by some factor.

        Returns
        -------

        ell : (nells,) ndarray
            Array of nells multipoles starting at ell=0 and spaced by delta_ell=1
            corresponding to the noise power spectra nells_T and nells_P

        nells_T : (3,nells) ndarray
            The first two rows contain the temperature auto-correlation of the noise power
            spectra of each band in the tube. The third row contains the correlation
            power between the two bands by default, but you can get
            the cross-correlation coefficient instead by setting return_corr=True.

        nells_P : (3,nells) ndarray
            Same as for nells_T but for polarization.

        """

        telescope = f"{tube[0]}A"  # get LA or SA from tube name
        survey = self.get_survey(tube)
        if telescope == "SA":
            ell, noise_ell_T, noise_ell_P = survey.get_noise_curves(
                ncurve_sky_fraction,
                self.ell_max,
                delta_ell=1,
                full_covar=True,  # we always obtain the full covariance and later remove correlations as necessary
                deconv_beam=self.apply_beam_correction,
                rolloff_ell=self.rolloff_ell,
            )
            # For SA, so_noise simulates only Polarization,
            # Assume that T is half
            if noise_ell_T is None:
                noise_ell_T = noise_ell_P / 2
        else:
            ell, noise_ell_T, noise_ell_P = survey.get_noise_curves(
                ncurve_sky_fraction,
                self.ell_max,
                delta_ell=1,
                full_covar=True,
                deconv_beam=self.apply_beam_correction,
                rolloff_ell=self.rolloff_ell,
            )

        assert ell[0] == 2  # make sure the noise code is still returning something
        # that starts at ell=2
        ls = np.arange(ell.size + 2)
        nells_T = np.zeros(
            (self.channel_per_tube * (self.channel_per_tube + 1) // 2, ell.size + 2)
        )
        nells_P = np.zeros(
            (self.channel_per_tube * (self.channel_per_tube + 1) // 2, ell.size + 2)
        )
        b_indices = [ch.noise_band_index for ch in self.tubes[tube]]
        for n_out, n_in in zip([nells_T, nells_P], [noise_ell_T, noise_ell_P]):
            for i, b1 in enumerate(b_indices):
                n_out[i, 2:] = n_in[b1][b1]
            # re-scaling if correlation coefficient is requested

            counter = 0
            for i, b1 in enumerate(b_indices):
                for b2 in b_indices[:i]:
                    scale = np.sqrt(n_in[b1][b1] * n_in[b2][b2]) if return_corr else 1
                    n_out[self.channel_per_tube + counter, 2:] = (
                        (n_in[b1][b2] / scale) if self.full_covariance else 0
                    )
                    counter += 1

            if self.no_power_below_ell is not None:
                n_out[:, ls < self.no_power_below_ell] = 0

        ell = ls
        return ell, nells_T, nells_P

    def get_noise_properties(
        self, tube, nsplits=1, hitmap=None, white_noise_rms=None, atmosphere=True
    ):
        """
        Get noise curves scaled with the hitmaps and the hitmaps themselves
        Or if the survey object has no hitmaps return the ivar_map and
        normalized noise curves.

        Parameters
        ----------
        see the docstring of simulate

        See the `band_id` attribute of the Channel class
        to identify which is the index of a Channel in the returned arrays.

        Returns
        -------
        ell : np.array
            Array of :math:`\\ell`
        ps_T, ps_P : np.array
            Tube noise spectra for T and P, one row per channel, the 3rd the crosscorrelation
        fsky : np.array
            Array of sky fractions computed as <normalized N_hits>
        wnoise_power : np.array
            White noise power (high-ell limit)
        weightsMap : np.array
            Array of the weightsMap (either hitmaps or ivar_map) for each channel
        """
        fsky, weightsMap = self._get_requested_hitmaps(tube, hitmap)
        if weightsMap is None:
            fsky = np.array([1.0] * self.channel_per_tube)
            wnoise_scale = 1.0
            # Use a presupplied inverse variance map
            weightsMap = self._load_inverse_variance_map(tube, output_units="uK_CMB")

            # Should you need to divide by the area? I think this depends on the units.
            # This inclusion means that you can use the ivar map generated by the SONoiseSimulator class consistently
            weightsMap /= self.pixarea_map
        else:
            wnoise_scale = self._get_wscale_factor(white_noise_rms, tube, fsky)

        wnoise_power = self.get_white_noise_power(tube, sky_fraction=1, units="sr")
        if wnoise_power is not None:
            # raise AssertionError(" Survey white noise level not specified. Cannot generate ivar_map")
            wnoise_power *= nsplits * fsky * wnoise_scale.flatten()

        if atmosphere:
            ell, ps_T, ps_P = self.get_fullsky_noise_spectra(
                tube, ncurve_sky_fraction=1, return_corr=True
            )
            ps_T[: self.channel_per_tube] = (
                ps_T[: self.channel_per_tube] * fsky[:, None] * nsplits * wnoise_scale
            )
            counter = 0
            for i in range(self.channel_per_tube):
                for j in range(i):
                    ps_T[self.channel_per_tube + counter] *= np.sqrt(
                        np.prod(ps_T[[i, j]], axis=0)
                    )
                    counter += 1

            ps_P[: self.channel_per_tube] = (
                ps_P[: self.channel_per_tube] * fsky[:, None] * nsplits * wnoise_scale
            )
            counter = 0
            for i in range(self.channel_per_tube):
                for j in range(i):
                    ps_P[self.channel_per_tube + counter] *= np.sqrt(
                        np.prod(ps_P[[i, j]], axis=0)
                    )
                    counter += 1

        else:
            if wnoise_power is None:
                raise AssertionError(
                    " Survey white noise level not specified. Cannot generate a white noise spectrum"
                )
            ell = np.arange(self.ell_max)
            ps_T = np.zeros(
                (self.channel_per_tube * (self.channel_per_tube + 1) // 2, ell.size)
            )
            ps_T[: self.channel_per_tube] = wnoise_power[:, None] * np.ones(
                (2, ell.size)
            )
            ps_P = 2.0 * ps_T
        return ell, ps_T, ps_P, fsky, wnoise_power, weightsMap

    def _validate_map(self, fmap):
        """Internal function to validate an externally provided map.
        It checks the healpix or CAR attributes against what the
        class was initialized with. It adds a leading dimension if
        necessary.
        """
        shape = fmap.shape
        if self.healpix:
            if len(shape) == 1:
                npix = shape[0]
            elif len(shape) == 2:
                assert shape[0] == 1 or shape[0] == 2
                npix = shape[1]
            else:
                raise ValueError
            assert npix == hp.nside2npix(self.nside)
            if len(shape) == 1:
                return fmap[None, :]
            else:
                return fmap
        else:
            assert pixell.wcsutils.is_compatible(fmap.wcs, self.wcs)
            if len(shape) == 2:
                ashape = shape
            elif len(shape) == 3:
                assert shape[0] == 1 or shape[0] == 2
                ashape = shape[-2:]
            else:
                raise ValueError
            assert [x == y for x, y in zip(ashape, self.shape)]
            if len(shape) == 2:
                return fmap[None, ...]
            else:
                return fmap

    def _load_map(self, fname, **kwargs):
        """Internal function to load a healpix or CAR map
        from disk and reproject it if necessary.
        """
        # If not a string try as a healpix or CAR map
        if not (isinstance(fname, str)):
            return self._validate_map(fname)

        # Check if in cache
        if self._cache:
            try:
                return self._hmap_cache[fname]
            except:
                pass

        # else load
        if self.healpix:
            hitmap = hp.ud_grade(
                hp.read_map(fname, dtype=np.float64),
                nside_out=self.nside,
            )
        else:

            hitmap = pixell.enmap.read_map(fname)
            if pixell.wcsutils.is_compatible(hitmap.wcs, self.wcs):
                hitmap = pixell.enmap.extract(hitmap, self.shape, self.wcs)
            else:
                log.warning(
                    "WCS of hitmap with nearest pixel-size is not compatible, so interpolating hitmap"
                )
                hitmap = pixell.enmap.project(hitmap, self.shape, self.wcs, order=0)

        # and then cache and return
        if self._cache:
            self._hmap_cache[fname] = hitmap
        return hitmap

    def _average(self, imap):
        # Internal function to calculate <imap> general to healpix and CAR
        if self.healpix:
            assert imap.ndim == 1
        else:
            assert imap.ndim == 2
        return (self.pixarea_map * imap).sum() / (self.map_area)

    def _process_hitmaps(self, hitmaps):
        """Internal function to process hitmaps and based on the
        desired scheme, obtain sky fractions from them.
        """
        nhitmaps = hitmaps.shape[0]
        assert nhitmaps == 1 or nhitmaps == 2
        if self.boolean_sky_fraction:
            raise NotImplementedError
        else:

            output_hitmaps = [(hitmaps[i] / hitmaps[i].max()) for i in range(nhitmaps)]
            # We define sky fraction as <Nhits>
            sky_fractions = [self._average(output_hitmaps[i]) for i in range(nhitmaps)]
        return output_hitmaps, sky_fractions

    def _get_hitmaps_names(self, tube=None):
        """Internal function to get the full name of the hitmaps
        Returns a lits of file names.
        Not implemented in base class
        """
        return None

    def get_hitmaps(self, tube=None, hitmap=None):
        """Get and process hitmaps and sky fractions for the provided tube or provided
        an external one.

        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute

        hitmap : string or map, optional
            Provide the path to a hitmap to override the default used for
            the tube. You could also provide the hitmap as an array
            directly.

        Returns
        -------

        hitmaps : ndarray or ndmap
            Processed hitmaps. See the `band_id` attribute of the Channel class
            to identify which is the index of a Channel in the array.

        sky_fractions : float
            The sky fraction covered by the survey determined from the hitmaps.

        """

        if hitmap is not None:
            return self._process_hitmaps(self._load_map(hitmap))

        # If the survey object has preloaded hitmaps. Use them. Otherwise load form files.
        survey = self.get_survey(tube)
        if hasattr(survey, "get_hitmaps") and survey.get_hitmaps() is not None:

            noise_indices = self.get_noise_indices(tube, None)
            hitmaps = survey.get_hitmaps()[noise_indices]
        else:
            hitmap_filenames = self._get_hitmaps_names(tube)
            if hitmap_filenames is None:
                return None, None
            hitmaps = []
            for hitmap_filename in hitmap_filenames:
                hitmaps.append(self._load_map(hitmap_filename))

        return self._process_hitmaps(np.asarray(hitmaps))

    def get_white_noise_power(self, tube, sky_fraction, band=None, units="sr"):
        """Get white noise power in uK^2-sr (units='sr') or
        uK^2-arcmin^2 (units='arcmin2') corresponding to the tube name tube.
        This is useful if you want to generate your own simulations that do not
        have the atmospheric component.

        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute

        sky_fraction : float
            The sky fraction covered by the survey.

        band : str,optional
            Optionally specify the band name within the tube to get just its
            white noise.

        units: str
            'sr' for white noise power in uK^2-steradian and 'arcmin2' for
            the same in uK^2-arcmin^2 units.

        Returns
        -------

        wnoise : tuple of floats
            The white noise variance in the requested units either as
            a tuple for the pair of bands in the tube, or just for the specific
            band requested.
            See the `band_id` attribute of the Channel class
            to identify which is the index of a Channel in the array.
        """
        survey = self.get_survey(tube)
        noise_indices = self.get_noise_indices(tube, band)
        white_noise = survey.get_white_noise(sky_fraction, units=units)
        if white_noise is None:
            return None
        return white_noise[noise_indices]

    def _load_inverse_variance_map(self, tube, output_units="uK_CMB", band=None):
        """Internal function to return a preloaded inverse var map or load one from a from file.
        By default this just returns None so an inv_var map is computed from a white noise level
        and a hits map
        """
        survey = self.get_survey(tube)
        noise_indices = self.get_noise_indices(tube, band)
        # If the survey has a set of preloaded invariance maps use them
        if hasattr(survey, "get_ivar_maps") and survey.get_ivar_maps() is not None:
            ret = np.array(survey.get_ivar_maps())[noise_indices]
        elif (
            hasattr(survey, "get_ivar_map_filenames")
            and survey.get_ivar_map_filenames() is not None
        ):

            ivar_map_filenames = survey.get_ivar_map_filenames()
            if ivar_map_filenames is None:
                return None
            ivar_map_filenames = [ivar_map_filenames[i] for i in noise_indices]
            ivar_maps = []
            for ivar_map_filename in ivar_map_filenames:
                ivar_maps.append(self._load_map(ivar_map_filename))
            ret = np.array(ivar_maps)
        else:
            return None

        for i in range(self.channel_per_tube):
            freq = self.tubes[tube][i].center_frequency
            unit_conv = (1 * u.uK_CMB).to_value(
                u.Unit(output_units), equivalencies=u.cmb_equivalencies(freq)
            )
            ret[i] /= unit_conv**2.0  # divide by square since the default is 1/uK^2
        return ret

    def get_inverse_variance(
        self, tube, output_units="uK_CMB", hitmap=None, white_noise_rms=None
    ):
        """Get the inverse noise variance in each pixel for the requested tube.
        In the noise model, all the splits and all the I,Q,U components have the
        same position dependence of the noise variance. Each split just has `nsplits`
        times the noise power (or `1/nsplits` the inverse noise variance) and the
        Q,U components have 2x times the noise power (or 1/2 times the inverse
        noise variance) of the intensity components. The inverse noise variance
        provided by this function is for the `nsplits=1` intensity component.
        Two maps are stored in the leading dimension, one for each of the
        two correlated arrays in the dichroic tube.


        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute
        output_units : str
            Output unit supported by PySM.units, e.g. uK_CMB or K_RJ
        hitmap : string or map, optional
            Provide the path to a hitmap to override the default used for
            the tube. You could also provide the hitmap as an array
            directly.
        white_noise_rms : float or tuple of floats, optional
            Optionally scale the simulation so that the small-scale limit white noise
            level is white_noise_rms in uK-arcmin (either a single number or
            a pair for the dichroic array).

        Returns
        -------

        ivar_map : ndarray or ndmap
            Numpy array with the HEALPix or CAR map of the inverse variance
            in each pixel. The default units are uK^(-2). This is an extensive
            quantity that depends on the size of pixels.
            See the `band_id` attribute of the Channel class
            to identify which is the index of a Channel in the array.
        """
        ret = self._load_inverse_variance_map(tube, output_units=output_units)
        if ret is not None:
            return ret

        fsky, hitmaps = self._get_requested_hitmaps(tube, hitmap)
        wnoise_scale = self._get_wscale_factor(white_noise_rms, tube, fsky)
        sel = np.s_[:, None] if self.healpix else np.s_[:, None, None]
        whiteNoise = self.get_white_noise_power(tube, sky_fraction=1, units="sr")
        if whiteNoise is None:
            raise AssertionError(
                " Survey white noise level not specified. Cannot generate ivar_map"
            )
        power = whiteNoise[sel] * fsky[sel] * wnoise_scale[:, 0][sel]
        """
        We now have the physical white noise power uK^2-sr
        and the hitmap
        ivar = hitmap * pixel_area * fsky / <hitmap> / power
        """
        avgNhits = np.asarray(
            [self._average(hitmaps[i]) for i in range(self.channel_per_tube)]
        )
        ret = hitmaps * self.pixarea_map * fsky[sel] / avgNhits[sel] / power
        # Convert to desired units
        for i in range(self.channel_per_tube):
            freq = self.tubes[tube][i].center_frequency
            unit_conv = (1 * u.uK_CMB).to_value(
                u.Unit(output_units), equivalencies=u.cmb_equivalencies(freq)
            )
            ret[i] /= unit_conv**2.0  # divide by square since the default is 1/uK^2
        return ret

    def _get_wscale_factor(self, white_noise_rms, tube, sky_fraction):
        """Internal function to re-scale white noise power
        to a new value corresponding to white noise RMS in uK-arcmin.
        """
        if white_noise_rms is None:
            return np.ones((self.channel_per_tube, 1))
        whiteNoise = self.get_white_noise_power(tube, sky_fraction=1, units="arcmin2")
        if whiteNoise is None:
            raise AssertionError(
                " Survey white noise level not specified. Cannot rescale white noise levels"
            )
        cnoise = np.sqrt(whiteNoise * sky_fraction)
        return (white_noise_rms / cnoise)[:, None]

    def _get_requested_hitmaps(self, tube, hitmap):
        if self.homogeneous and (hitmap is None):
            ones = (
                np.ones(hp.nside2npix(self.nside))
                if self.healpix
                else pixell.enmap.ones(self.shape, self.wcs)
            )
            hitmaps = (
                np.asarray([ones, ones])
                if self.full_covariance
                else ones.reshape((1, -1))
            )
            fsky = self._sky_fraction if self._sky_fraction is not None else 1
            sky_fractions = (
                np.asarray([fsky] * self.channel_per_tube)
                if self.full_covariance
                else np.asarray([fsky])
            )
        else:
            hitmaps, sky_fractions = self.get_hitmaps(tube, hitmap=hitmap)

        if hitmaps is None:
            return None, None

        if len(sky_fractions) == 1:
            assert hitmaps.shape[0] == 1
            fsky = np.asarray([sky_fractions[0]] * self.channel_per_tube)
            hitmaps = np.repeat(hitmaps, 2, axis=0)
        elif len(sky_fractions) == 2:
            assert len(hitmaps) == 2
            fsky = np.asarray(sky_fractions)
        else:
            raise ValueError
        return fsky, hitmaps

    def simulate(
        self,
        tube,
        output_units="uK_CMB",
        seed=None,
        nsplits=1,
        mask_value=None,
        atmosphere=True,
        hitmap=None,
        white_noise_rms=None,
    ):
        """Create a random realization of the noise power spectrum

        Parameters
        ----------

        tube : str
            Specify a tube (for SO: ST0-ST3, LT0-LT6) see the `tubes` attribute
        output_units : str
            Output unit supported by PySM.units, e.g. uK_CMB or K_RJ
        seed : integer or tuple of integers, optional
            Specify a seed. The seed is converted to a tuple if not already
            one and appended to (0,0,6,tube_id) to avoid collisions between
            tubes, with the signal sims and with ACT noise sims, where
            tube_id is the integer ID of the tube.
        nsplits : integer, optional
            Number of splits to generate. The splits will have independent noise
            realizations, with noise power scaled by a factor of nsplits, i.e. atmospheric
            noise is assumed to average down with observing time the same way
            the white noise does. By default, only one split (the coadd) is generated.
        mask_value : float, optional
            The value to set in masked (unobserved) regions. By default, it uses
            the value in default_mask_value, which for healpix is healpy.UNSEEN
            and for CAR is numpy.nan.
        atmosphere : bool, optional
            Whether to include the correlated 1/f from the noise model. This is
            True by default. If it is set to False, then a pure white noise map
            is generated from the white noise power in the noise model, and
            the covariance between arrays is ignored.
        hitmap : string or map, optional
            Provide the path to a hitmap to override the default used for
            the tube. You could also provide the hitmap as an array
            directly.
        white_noise_rms : float or tuple of floats, optional
            Optionally scale the simulation so that the small-scale limit white noise
            level is white_noise_rms in uK-arcmin (either a single number or
            a pair for the dichroic array).

        Returns
        -------

        output_map : ndarray or ndmap
            Numpy array with the HEALPix or CAR map realization of noise.
            The shape of the returned array is (2,3,nsplits,)+oshape, where
            oshape is (npix,) for HEALPix and (Ny,Nx) for CAR.
            The first dimension of size 2 corresponds to the two different
            bands within a dichroic tube.
            See the `band_id` attribute of the Channel class
            to identify which is the index of a Channel in the array.

            The second dimension corresponds to independent split realizations
            of the noise, e.g. it is 1 for full mission.

            The third dimension corresponds to the three polarization
            Stokes components I,Q,U

            The last dimension is the number of pixels
        """
        assert nsplits >= 1
        if mask_value is None:
            mask_value = (
                default_mask_value["healpix"]
                if self.healpix
                else default_mask_value["car"]
            )

        # This seed tuple prevents collisions with the signal sims
        # but we should eventually switch to centralized seed
        # tracking.
        if seed is not None:
            try:
                iter(seed)
            except:
                seed = (seed,)
            tube_id = self.tubes[tube][0].tube_id
            seed = (0, 0, 6, tube_id) + seed
            np.random.seed(seed)

        # In the third row we return the correlation coefficient P12/sqrt(P11*P22)
        # since that can be used straightforwardly when the auto-correlations are re-scaled.
        ell, ps_T, ps_P, fsky, wnoise_power, weightsMap = self.get_noise_properties(
            tube,
            nsplits=nsplits,
            hitmap=hitmap,
            white_noise_rms=white_noise_rms,
            atmosphere=atmosphere,
        )

        if not (atmosphere):
            if self.apply_beam_correction:
                raise NotImplementedError(
                    "Beam correction is not currently implemented for pure-white-noise sims."
                )
            # If no atmosphere is requested, we use a simpler/faster method
            # that generates white noise in real-space.
            if self.healpix:
                ashape = (hp.nside2npix(self.nside),)
                sel = np.s_[:, None, None, None]
                pmap = self.pixarea_map
            else:
                ashape = self.shape[-2:]
                sel = np.s_[:, None, None, None, None]
                pmap = pixell.enmap.enmap(self.pixarea_map, self.wcs)
            spowr = np.sqrt(wnoise_power[sel] / pmap)
            output_map = spowr * np.random.standard_normal(
                (self.channel_per_tube, nsplits, 3) + ashape
            )
            output_map[:, :, 1:, :] = output_map[:, :, 1:, :] * np.sqrt(2.0)
        else:
            if self.healpix:
                npix = hp.nside2npix(self.nside)
                output_map = np.zeros((self.channel_per_tube, nsplits, 3, npix))
                for i in range(nsplits):
                    for i_pol in range(3):
                        output_map[:, i, i_pol] = np.array(
                            hp.synfast(
                                ps_T if i_pol == 0 else ps_P,
                                nside=self.nside,
                                pol=False,
                                new=True,
                            )
                        )
            else:
                output_map = pixell.enmap.zeros((2, nsplits, 3) + self.shape, self.wcs)
                ps_T = pixell.powspec.sym_expand(np.asarray(ps_T), scheme="diag")
                ps_P = pixell.powspec.sym_expand(np.asarray(ps_P), scheme="diag")
                # TODO: These loops can probably be vectorized
                for i in range(nsplits):
                    for i_pol in range(3):
                        output_map[:, i, i_pol] = pixell.curvedsky.rand_map(
                            (self.channel_per_tube,) + self.shape,
                            self.wcs,
                            ps_T if i_pol == 0 else ps_P,
                            spin=0,
                        )

        for i in range(self.channel_per_tube):
            freq = self.tubes[tube][i].center_frequency
            if not (self.homogeneous):
                good = weightsMap[i] != 0
                # Normalize on the Effective sky fraction, see discussion in:
                # https://github.com/simonsobs/mapsims/pull/5#discussion_r244939311
                output_map[i, :, :, good] /= np.sqrt(
                    weightsMap[i][good][..., None, None]
                )
                output_map[i, :, :, np.logical_not(good)] = mask_value
            unit_conv = (1 * u.uK_CMB).to_value(
                u.Unit(output_units), equivalencies=u.cmb_equivalencies(freq)
            )
            output_map[i] *= unit_conv
        return output_map


class ExternalNoiseSimulator(BaseNoiseSimulator):
    def __init__(
        self,
        nside=None,
        shape=None,
        wcs=None,
        ell_max=None,
        return_uK_CMB=True,
        sensitivity_mode="baseline",
        apply_beam_correction=False,
        apply_kludge_correction=True,
        homogeneous=False,
        no_power_below_ell=None,
        rolloff_ell=50,
        survey_efficiency=0.2,
        full_covariance=True,
        channels_list=None,
        sky_fraction=None,
        cache_hitmaps=True,
        boolean_sky_fraction=False,
        survey=None,
    ):

        super(ExternalNoiseSimulator, self).__init__(
            nside=nside,
            shape=shape,
            wcs=wcs,
            ell_max=ell_max,
            return_uK_CMB=return_uK_CMB,
            apply_beam_correction=apply_beam_correction,
            apply_kludge_correction=apply_kludge_correction,
            homogeneous=homogeneous,
            no_power_below_ell=no_power_below_ell,
            rolloff_ell=rolloff_ell,
            survey_efficiency=survey_efficiency,
            full_covariance=full_covariance,
            sky_fraction=sky_fraction,
            cache_hitmaps=cache_hitmaps,
            boolean_sky_fraction=boolean_sky_fraction,
            channels_list=channels_list,
        )
        self._survey = survey

    def get_survey(self, tube):
        """Function to get the survey object
        return the inputted survey
        """

        return self._survey

    def _get_hitmaps_names(self, tube=None, band=None):
        """Internal function to get the full name of the hitmaps"""
        survey = self.get_survey(tube)
        noise_indices = self.get_noise_indices(tube, band)
        hitmap_names = survey.get_hitmap_filenames()
        if hitmap_names is None:
            return None
        return [hitmap_names[i] for i in list(noise_indices)]
        # return [ch.hitmap_name for ch in self.tubes[tube]]


[docs]class SONoiseSimulator(BaseNoiseSimulator): def __init__( self, nside=None, shape=None, wcs=None, ell_max=None, return_uK_CMB=True, sensitivity_mode="baseline", apply_beam_correction=False, apply_kludge_correction=True, homogeneous=False, no_power_below_ell=None, rolloff_ell=50, survey_efficiency=0.2, full_covariance=True, LA_years=5, LA_noise_model="SOLatV3point1", elevation=50, SA_years=5, SA_one_over_f_mode="pessimistic", sky_fraction=None, cache_hitmaps=True, boolean_sky_fraction=False, instrument_parameters=None, ): """Simulate noise maps for Simons Observatory Simulate the noise power spectrum in spherical harmonics domain and then generate a map in microK_CMB or microK_RJ (based on return_uK_CMB) In the constructor, this object calls the published 20180822 noise simulator and generates the expected noise power spectra for all channels. Then you need to call the `simulate` method with a channel identifier to create a simulated map. Parameters ---------- nside : int nside of HEALPix map. If None, uses rectangular pixel geometry specified through shape and wcs. shape : tuple of ints shape of ndmap array (see pixell.pixell.enmap). Must also specify wcs. wcs : astropy.wcs.wcs.WCS instance World Coordinate System for geometry of map (see pixell.pixell.enmap). Must also specify shape. ell_max : int Maximum ell for the angular power spectrum, if not provided set to 3 * nside when using healpix or 10000 * (1.0 / pixel_height_arcmin) when using CAR, corresponding roughly to the Nyquist frequency. return_uK_CMB : bool True, output is in microK_CMB, False output is in microK_RJ sensitivity_mode : str Value should be threshold, baseline or goal to use predefined sensitivities apply_beam_correction : bool Include the effect of the beam in the noise angular power spectrum apply_kludge_correction : bool If True, reduce the hitcount by a factor of 0.85 to account for not-uniformity in the scanning homogeneous : bool Set to True to generate full-sky maps with no hit-count variation, with noise curves corresponding to a survey that covers a sky fraction of sky_fraction (defaults to 1). no_power_below_ell : int The input spectra have significant power at low :math:`\\ell`, we can zero that power specifying an integer :math:`\\ell` value here. The power spectra at :math:`\\ell < \\ell_0` are set to zero. rolloff_ell : int Low ell power damping, see the docstring of `so_noise_models.so_models_v3.SO_Noise_Calculator_Public_v3_1_1.rolloff` survey_efficiency : float Fraction of calendar time that may be used to compute map depth. full_covariance : bool Whether or not to include the intra-tube covariance between bands. If white noise (atmosphere=False) sims are requested, no covariance is included regardless of the value of full_covariance. LA_years : int Total number of years for the Large Aperture telescopes survey LA_noise_model : str Noise model among the ones available in `so_noise_model`, "SOLatV3point1" is default, "SOLatV3" is the model released in 2018 which had a bug in the atmosphere contribution elevation : float Elevation of the scans in degrees, the V3.1.1 noise model includes elevation dependence for the LAT. This should reproduced original V3 results at the reference elevation of 50 degrees. SA_years : int Total number of years for the Small Aperture telescopes survey SA_one_over_f_mode : {"pessimistic", "optimistic", "none"} Correlated noise performance of the detectors on the Small Aperture telescopes sky_fraction : optional,float If homogeneous is True, this sky_fraction is used for the noise curves. cache_hitmaps : bool If True, caches hitmaps. boolean_sky_fraction: bool If True, determines sky fraction based on fraction of hitmap that is zero. If False, determines sky_fraction from <Nhits>. instrument_parameters : Path or str See the help of MapSims """ super(SONoiseSimulator, self).__init__( nside=nside, shape=shape, wcs=wcs, ell_max=ell_max, return_uK_CMB=return_uK_CMB, apply_beam_correction=apply_beam_correction, apply_kludge_correction=apply_kludge_correction, homogeneous=homogeneous, no_power_below_ell=no_power_below_ell, rolloff_ell=rolloff_ell, survey_efficiency=survey_efficiency, full_covariance=full_covariance, sky_fraction=sky_fraction, cache_hitmaps=cache_hitmaps, boolean_sky_fraction=boolean_sky_fraction, instrument_parameters=instrument_parameters, ) self.sensitivity_mode = sensitivity_modes[sensitivity_mode] self.LA_years = LA_years self.LA_noise_model = LA_noise_model self.elevation = elevation self.SA_years = SA_years self.SA_one_over_f_mode = one_over_f_modes[SA_one_over_f_mode] self.remote_data = RemoteData(healpix=self.healpix, version=self.hitmap_version)
[docs] def get_survey(self, tube): """Internal function to get the survey object from the SO noise model code. """ telescope = f"{tube[0]}A" # get LA or SA from tube name if telescope == "SA": if tube == "ST0": N_tubes = [0, 0, 1] # the UHF telescope elif tube == "ST1": N_tubes = [0, 1, 0] # the MF telescope elif tube == "ST2": N_tubes = [0, 0.6, 0] # the MF/LF telescope elif tube == "ST3": N_tubes = [0.4, 0, 0] # the MF/LF telescope else: raise ValueError with np.errstate(divide="ignore", invalid="ignore"): survey = so_models.SOSatV3point1( sensitivity_mode=self.sensitivity_mode, survey_efficiency=self.survey_efficiency, survey_years=self.SA_years, N_tubes=N_tubes, el=None, # SAT does not support noise elevation function one_over_f_mode=self.SA_one_over_f_mode, ) elif telescope == "LA": with np.errstate(divide="ignore", invalid="ignore"): survey = getattr(so_models, self.LA_noise_model)( sensitivity_mode=self.sensitivity_mode, survey_efficiency=self.survey_efficiency, survey_years=self.LA_years, N_tubes=[1, 1, 1], el=self.elevation, ) return survey
def _get_hitmaps_names(self, tube=None): """Internal function to get the full name of the hitmaps""" telescope = f"{tube[0]}A" # get LA or SA from tube name if not (self.healpix): npixheight = min( {"LA": [0.5, 2.0], "SA": [4.0, 12.0]}[telescope], key=lambda x: abs(x - self._pixheight), ) car_suffix = f"_CAR_{npixheight:.2f}_arcmin" else: car_suffix = "" bands = [ch.band for ch in self.tubes[tube]] rnames = [] for band in bands: rnames.append( f"{tube}_{band}_01_of_20.nominal_telescope_all_time_all_hmap{car_suffix}.fits.gz" ) hitmap_filenames = [self.remote_data.get(rname) for rname in rnames] return hitmap_filenames