Source code for mapsims.runner

import importlib
import logging
import os
import os.path
from astropy.table import Table
from astropy.utils import data
import healpy as hp
import numpy as np

log = logging.getLogger("mapsims")

try:  # PySM >= 3.2.1
    import pysm3.units as u
    import pysm3 as pysm
except ImportError:
    import pysm.units as u
    import pysm
import toml
from .version import __version__

# pixell is optional and needed when CAR simulations are requested
try:
    import pixell
    import pixell.curvedsky
    import pixell.powspec
except:
    pixell = None

from .utils import merge_dict

import socket

on_cori_login = socket.gethostname().startswith("cori")

try:
    if "DISABLE_MPI" in os.environ or on_cori_login:
        raise ImportError
    from mpi4py import MPI

    COMM_WORLD = MPI.COMM_WORLD
except ImportError:
    COMM_WORLD = None

from .channel_utils import parse_channels

PYSM_COMPONENTS = {
    comp[0]: comp for comp in ["synchrotron", "dust", "freefree", "cmb", "ame"]
}
default_output_filename_template = "mapsims_{tag}_{telescope}_{band}_nside{nside}_{split}_of_{nsplits}_{pixelization}.fits"


[docs]def get_default_so_resolution(ch, field="NSIDE"): "Load the default Simons Observatory resolution" default_resolution = Table.read( data.get_pkg_data_filename("data/so_default_resolution.csv") ) default_resolution.add_index("channel") first_ch = ch if not isinstance(ch, tuple) else ch[0] output = default_resolution.loc[first_ch.telescope + "_" + str(first_ch.band)][ field ] if field == "CAR_resol": output *= u.arcmin return output
def get_map_shape(ch, nside=None, car_resolution=None, car=False, healpix=True): """Get map shape (and WCS for CAR) If N_side or car_resolution is None, get the default value from the instrument model file Parameters ---------- ch : string Channel tag, e.g. SA_LF2 nside : int Desired healpix N_side car_resolution : astropy.Quantity CAR pixels resolution with angle unit car : bool Set to True for CAR healpix : bool Set to True for HEALPix Returns ------- nside : int N_side, either return the input or default None for CAR healpix_shape : tuple of int or None (npix,) for HEALPix car_shape : tuple of int or None (Nx, Ny) for CAR car_wcs : astropy.WCS or None CAR map WCS """ if car: if car_resolution is None: car_resolution = ch.car_resol car_shape, car_wcs = pixell.enmap.fullsky_geometry( res=car_resolution.to_value(u.radian), variant="fejer1" ) else: car_shape, car_wcs = None, None if healpix: if nside is None: nside = ch.nside healpix_shape = (hp.nside2npix(nside),) else: healpix_shape = None return nside, car_resolution, healpix_shape, car_shape, car_wcs def function_accepts_argument(func, arg): """Check if a function or class accepts argument arg Parameters ---------- func : Python function or Class Input Python function or Class to check arg : str keyword or positional argument to check Returns ------- accepts_argument : bool True if function/class constructor accepts argument """ if not hasattr(func, "__code__"): func = func.__init__ return arg in func.__code__.co_varnames def command_line_script(args=None): import argparse parser = argparse.ArgumentParser( description="Execute map based simulations for Simons Observatory" ) parser.add_argument("config", type=str, help="Configuration file", nargs="+") parser.add_argument("--nside", type=int, required=False, help="NSIDE") parser.add_argument( "--verbose", required=False, action="store_true", help="Set logging to INFO" ) parser.add_argument( "--num", type=int, required=False, help="Simulation number, generally used as seed", ) parser.add_argument( "--nsplits", type=int, required=False, help="Number of noise splits" ) parser.add_argument( "--channels", type=str, help="Channels e.g. all, 'LT1_UHF1,LT0_UHF1', 'tube:LT1', see docstring of MapSim", required=False, ) res = parser.parse_args(args) override = { key: getattr(res, key) for key in ["nside", "channels", "num", "nsplits"] if getattr(res, key) is not None } logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") if res.verbose: for each in [log, logging.getLogger("pysm3")]: each.setLevel(logging.INFO) log.info("Parsing configuration from %s", ", ".join(res.config)) simulator = from_config(res.config, override=override) simulator.execute(write_outputs=True) def import_class_from_string(class_string): module_name, class_name = class_string.rsplit(".", 1) return getattr(importlib.import_module(module_name), class_name)
[docs]def from_config(config_file, override=None): if isinstance(config_file, str): config_file = [config_file] config = toml.load(config_file[0]) for conf in config_file[1:]: merge_dict(config, toml.load(conf)) if override is not None: config.update(override) pysm_components_string = None output_reference_frame = None nside = config.get("nside", None) modeling_nside = config.get("modeling_nside", None) lmax_over_nside = config.get("lmax_over_nside", None) car = config.get("car", False) healpix = config.get("healpix", True) # parse_channels will throw error if instrument_parameters is missing or None channels = parse_channels( config["channels"], config.get("instrument_parameters", None) ) car_resolution = config.get("car_resolution_arcmin", None) if car_resolution is not None: car_resolution = car_resolution * u.arcmin nside, car_resolution, healpix_shape, car_shape, car_wcs = get_map_shape( ch=channels[0], nside=nside, car_resolution=car_resolution, car=car, healpix=healpix, ) shape = healpix_shape if healpix else car_shape wcs = car_wcs output_reference_frame = config.pop("output_reference_frame", None) components = {} for component_type in ["pysm_components", "other_components"]: components[component_type] = {} if component_type in config: component_type_config = config[component_type] if component_type == "pysm_components": pysm_components_string = component_type_config.pop( "pysm_components_string", None ) for comp_name in component_type_config: comp_config = component_type_config[comp_name] comp_class = import_class_from_string(comp_config.pop("class")) log.info("Creating component %s", comp_class) if ( function_accepts_argument(comp_class, "instrument_parameters") and "instrument_parameters" not in comp_config ): comp_config["instrument_parameters"] = config[ "instrument_parameters" ] if ( function_accepts_argument(comp_class, "num") and "num" in config and "num" not in comp_config ): # If a component has an argument "num" and we provide a configuration # "num" to MapSims, we pass it to all the class. # it can be overridden by the actual component config # This is used for example by `SOStandalonePrecomputedCMB` comp_config["num"] = config["num"] if function_accepts_argument(comp_class, "shape") and shape is not None: comp_config["shape"] = car_shape comp_config["wcs"] = car_wcs components[component_type][comp_name] = comp_class( nside=modeling_nside, **comp_config ) map_sim = MapSim( channels=config["channels"], nside=nside, modeling_nside=modeling_nside, lmax_over_nside=lmax_over_nside, car=car, car_resolution=car_resolution, num=config["num"], nsplits=config.get("nsplits", 1), unit=config["unit"], tag=config["tag"], output_folder=config.get("output_folder", "output"), output_filename_template=config.get( "output_filename_template", default_output_filename_template ), pysm_components_string=pysm_components_string, pysm_custom_components=components["pysm_components"], output_reference_frame=output_reference_frame, other_components=components["other_components"], instrument_parameters=config["instrument_parameters"], ) return map_sim
[docs]class MapSim: def __init__( self, channels, nside=None, modeling_nside=None, lmax_over_nside=None, car=False, healpix=True, car_resolution=None, num=0, nsplits=1, unit="uK_CMB", output_folder="output", tag="sky", output_filename_template=default_output_filename_template, pysm_components_string=None, output_reference_frame=None, pysm_custom_components=None, other_components=None, instrument_parameters=None, ): """Run map based simulations MapSim executes PySM for each of the input channels with a sky defined by default PySM components in `pysm_components_string` and custom components in `pysm_custom_components` and rotates in Alm space to the reference frame `output_reference_frame`. Then for each of the channels specified, smoothes the map with the channel beam and finally adds the map generated by `other_components`, for example noise maps, and writes the outputs to disk. Parameters ---------- channels : str If "all", all channels are included. Otherwise a list of channel tags: LT1_UHF1 and LT0_UHF1 = "LT1_UHF1,LT0_UHF1" Otherwise, provide a string with: * a key, e.g. tube or telescope * : * comma separated list of desider values e.g. all SAT channels = "telescope:SA" LT1 and LT2 tubes = "tube:LT1,LT2" modeling_nside : int Run PySM at higher Nside to increase the accuracy of the results, it is recommended to set modeling Nside twice of nside If None, modeling is done using `nside` nside : int output HEALPix Nside, if None, automatically pick the default resolution of the first channel, see https://github.com/simonsobs/mapsims/tree/master/mapsims/data/so_default_resolution.csv lmax_over_nside : float used to compute Ell_max used in the smoothing process car : bool True for CAR output healpix : bool True for HEALPix output car_resolution : astropy.Quantity CAR pixels resolution with angle unit unit : str Unit of output maps output_folder : str Relative or absolute path to output folder, string template with {nside} and {tag} fields num : int Realization number, generally used as seed, default is 0, automatically padded to 4 digits nsplits : int Number of noise splits, see the documentation of :py:class:`SONoiseSimulator` tag : str String to describe the current simulation, for example its content, which is used into string interpolation for `output_folder` and `output_filename_template` output_filename_template : str String template with {telescope} {channel} {nside} {tag} fields pysm_components_string : str Comma separated string of PySM components, i.e. "s1,d4,a2" output_reference_frame : str The requested output reference frame G for Galactic, C for Equatorial or E for Ecliptic, set to None to apply no rotation pysm_custom_components : dict Dictionary of other components executed through PySM other_components : dict Dictionary of component name, component class pairs, the output of these are **not** rotated, they should already be in the same reference frame specified in output_reference_frame. instrument_parameters : ascii file in IPAC table format path or str A string specifies an instrument parameters file included in the package `data/` folder A path or a string containing a path to an externally provided IPAC table file with the expected format. Instrument parameters in IPAC ascii format, one channel per row, with columns (with units): tag, band, center_frequency, fwhm It also assumes that in the same folder there are IPAC table files named bandpass_{tag}.tbl with columns: bandpass_frequency, bandpass_weight """ self.channels = parse_channels( instrument_parameters=instrument_parameters, filter=channels ) self.car = car self.car_resolution = car_resolution self.healpix = healpix self.pixelizations = [] if healpix: self.pixelizations.append("healpix") if car: self.pixelizations.append("car") self.shape = {} ( self.nside, self.car_resolution, self.shape["healpix"], self.shape["car"], self.car_wcs, ) = get_map_shape( ch=self.channels[0], nside=nside, car_resolution=car_resolution, car=self.car, healpix=self.healpix, ) self.modeling_nside = ( modeling_nside if modeling_nside is not None else min(8192, max(2048, 2 * nside)) ) assert lmax_over_nside is not None, "Need to provide lmax_over_nside" self.lmax = int(min(8192 * 2, min(self.modeling_nside, self.nside) * lmax_over_nside)) log.info( "Nside: %d, Modeling Nside: %d, Ellmax: %d", self.nside, self.modeling_nside, self.lmax, ) self.unit = unit self.num = num self.nsplits = nsplits self.pysm_components_string = pysm_components_string self.pysm_custom_components = pysm_custom_components self.run_pysm = not ( (pysm_components_string is None) and (pysm_custom_components is None or len(pysm_custom_components) == 0) ) self.other_components = other_components self.tag = tag try: self.output_folder = output_folder.format( nside=self.nside, tag=self.tag, num=self.num ) except AttributeError: self.output_folder = output_folder if not os.path.exists(self.output_folder): os.makedirs(self.output_folder) self.output_filename_template = output_filename_template self.rot = None self.output_reference_frame = output_reference_frame
[docs] def execute(self, write_outputs=False): """Run map simulations Execute simulations for all channels and write to disk the maps, and return their filenames (relative to output folder) unless `write_outputs` is False, then return them. """ if self.run_pysm: sky_config = [] preset_strings = [] if self.pysm_components_string is not None: for model in self.pysm_components_string.split(","): preset_strings.append(model) input_reference_frame = "G" log.info("Initializing the PySM Sky object") self.pysm_sky = pysm.Sky( nside=self.modeling_nside, preset_strings=preset_strings, component_objects=sky_config, output_unit=u.Unit(self.unit), ) if self.pysm_custom_components is not None: for comp_name, comp in self.pysm_custom_components.items(): self.pysm_sky.components.append(comp) output = {} # ch can be single channel or tuple of 2 channels (tube dichroic) for ch in self.channels: log.info("Processing channel %s", str(ch)) if not isinstance(ch, tuple): ch = [ch] output_map = [] for p in self.pixelizations: output_map_shape = (len(ch), 3) + self.shape[p] output_map.append(np.zeros(output_map_shape, dtype=np.float64)) if self.run_pysm: for ch_index, each in enumerate(ch): log.info("Bandpass integration for %s", str(each)) bandpass_integrated_map = self.pysm_sky.get_emission( *each.bandpass ).value beam_width_arcmin = each.beam # smoothing and coordinate rotation with 1 spherical harmonics transform log.info("Smoothing and coord-transform for %s", str(each)) smoothed_maps = pysm.apply_smoothing_and_coord_transform( bandpass_integrated_map, fwhm=beam_width_arcmin, lmax=self.lmax, return_healpix=self.healpix, return_car=self.car, output_nside=self.nside, output_car_resol=self.car_resolution, rot=None if input_reference_frame == self.output_reference_frame else hp.Rotator( coord=( input_reference_frame, self.output_reference_frame, ) ), map_dist=None if COMM_WORLD is None else pysm.MapDistribution( nside=self.nside, smoothing_lmax=self.lmax, mpi_comm=COMM_WORLD, ), ) if len(self.pixelizations) == 1: smoothed_maps = [smoothed_maps] # turn UNSEEN into NaN if self.healpix: smoothed_maps[0][hp.mask_bad(smoothed_maps[0])] = np.nan for pix_index, smoothed_map in enumerate(smoothed_maps): output_map[pix_index][ch_index] += smoothed_map for pix_index, p in enumerate(self.pixelizations): output_map[pix_index] = output_map[pix_index].reshape( (len(ch), 1, 3) + self.shape[p] ) if self.other_components is not None: for comp in self.other_components.values(): log.info("Additional component %s for %s", str(comp), str(ch)) assert ( len(self.pixelizations) == 1 ), "Other components do not support multiple pixelizations" kwargs = dict(tube=ch[0].tube, output_units=self.unit) if function_accepts_argument(comp.simulate, "ch"): kwargs.pop("tube") kwargs["ch"] = ch if function_accepts_argument(comp.simulate, "nsplits"): kwargs["nsplits"] = self.nsplits if function_accepts_argument(comp.simulate, "seed"): kwargs["seed"] = self.num component_map = comp.simulate(**kwargs) if self.nsplits == 1: for p in self.pixelizations: component_map = component_map.reshape( (len(ch), 1, 3) + self.shape[p] ) component_map[hp.mask_bad(component_map)] = np.nan output_map[0] += component_map for ch_index, each in enumerate(ch): if write_outputs: output[each.tag] = [] for split in range(self.nsplits): for pix_index, p in enumerate(self.pixelizations): filename = self.output_filename_template.format( telescope=each.telescope if each.tube is None else each.tube, band=each.band, nside=self.nside, tag=self.tag, num=self.num, nsplits=self.nsplits, split=split + 1, pixelization=p, ) output[each.tag].append(filename) log.info("Writing output map " + filename) each_split_channel_map = output_map[pix_index][ch_index][ split ] extra_metadata = dict( telescop=each.telescope if each.tube is None else each.tube, band=each.band, tag=self.tag, num=self.num, ell_max=self.lmax, nsplits=self.nsplits, split=split + 1, pysmvers=pysm.__version__, mapsimsv=__version__, ) if p == "car": extra_metadata["units"] = self.unit log.info("Writing output map") pixell.enmap.write_map( os.path.join(self.output_folder, filename), each_split_channel_map, extra=extra_metadata, ) elif p == "healpix": log.info("Replace NaN with UNSEEN") each_split_channel_map[ np.isnan(each_split_channel_map) ] = hp.UNSEEN log.info("Reorder ring to nest") each_split_channel_map = hp.reorder( each_split_channel_map, r2n=True ) log.info("Writing output map") hp.write_map( os.path.join(self.output_folder, filename), each_split_channel_map, coord=self.output_reference_frame, column_units=self.unit, dtype=np.float32, overwrite=True, extra_header=[ (k, v) for k, v in extra_metadata.items() ], nest=True, ) else: output[each.tag] = [] for pix_index, p in enumerate(self.pixelizations): channel_map = output_map[pix_index][ch_index] if p == "healpix": channel_map[np.isnan(channel_map)] = hp.UNSEEN if self.nsplits == 1: channel_map = channel_map[0] output[each.tag].append(channel_map) if len(output[each.tag]) == 1: output[each.tag] = output[each.tag][0] return output