Source code for jwst.photom.photom

import functools
import logging
import warnings

import numpy as np
from astropy import units as u
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import dqflags

from jwst.lib.dispaxis import get_dispersion_direction
from jwst.lib.pipe_utils import match_nans_and_flags
from jwst.lib.wcs_utils import get_wavelengths
from jwst.photom import time_dependence

log = logging.getLogger(__name__)

PHOT_TOL = 0.001  # relative tolerance between PIXAR_* keys

# Conversion factor from MJy/sr to uJy/arcsec^2
MJSR_TO_UJA2 = (u.megajansky / u.steradian).to(u.microjansky / u.arcsecond / u.arcsecond)

# Conversion factor from square arcseconds to steradians
A2_TO_SR = (np.pi / (180.0 * 3600.0)) ** 2

__all__ = ["MatchFitsTableRowError", "DataModelTypeError", "find_row", "DataSet"]


[docs] class MatchFitsTableRowError(Exception): """Class to handle error when matching FITS Table rows.""" def __init__(self, message): if message is None: message = "Expected to match one row in FITS table." super().__init__(message)
[docs] class DataModelTypeError(Exception): """Class to handle case of unexpected datamodel type.""" def __init__(self, message): if message is None: message = "Unexpected DataModel type." super().__init__(message)
[docs] def find_row(fits_table, match_fields): """ Find a row in a FITS table matching fields. Parameters ---------- fits_table : `~astropy.io.fits.fitsrec.FITS_rec` FITS table match_fields : dict ``{field_name: value}`` pair to use as a matching criteria. Returns ------- row : int or None FITS table row index, None if no match. Raises ------ Warning When a field name is not in the table. jwst.photom.photom.MatchFitsTableRowError When more than one row matches. """ def _normalize_strings(field): if isinstance(field[0], str): return np.array([s.upper() for s in field]) return field # item[1] is always converted to upper case in the `DataSet` initializer. results = [ _normalize_strings(fits_table.field(item[0])) == item[1] for item in match_fields.items() ] row = functools.reduce(np.logical_and, results).nonzero()[0] if len(row) > 1: raise MatchFitsTableRowError( f"Expected to find one matching row in table, found {len(row)}." ) if len(row) == 0: log.warning("Expected to find one matching row in table, found 0.") return None return row[0]
[docs] class DataSet: """ Input dataset to which the photom information will be applied. Store vital params, such as instrument, detector, filter, pupil, and exposure type. Parameters ---------- model : `~stdatamodels.jwst.datamodels.JwstDataModel` Input data model object. Updated in-place. inverse : bool Invert the math operations used to apply the corrections. source_type : str or None Force processing using the specified source type. apply_time_correction : bool Switch to apply/not apply a time correction, if available. """ def __init__( self, model, inverse=False, source_type=None, apply_time_correction=True, ): # Set up attributes necessary for calculation. self.band = None if model.meta.instrument.band is not None: self.band = model.meta.instrument.band.upper() self.instrument = model.meta.instrument.name.upper() self.detector = model.meta.instrument.detector.upper() self.exptype = model.meta.exposure.type.upper() self.filter = None if model.meta.instrument.filter is not None: self.filter = model.meta.instrument.filter.upper() self.grating = None if model.meta.instrument.grating is not None: self.grating = model.meta.instrument.grating.upper() self.order = None if ( model.meta.hasattr("wcsinfo") and model.meta.wcsinfo.hasattr("spectral_order") and model.meta.wcsinfo.spectral_order is not None ): self.order = model.meta.wcsinfo.spectral_order self.pupil = None if model.meta.instrument.pupil is not None: self.pupil = model.meta.instrument.pupil.upper() self.subarray = None if model.meta.subarray.name is not None: self.subarray = model.meta.subarray.name.upper() # Initialize other non-correction pars attributes. self.slitnum = -1 self.specnum = -1 self.integ_row = -1 self.inverse = inverse self.source_type = None self.apply_time_correction = apply_time_correction # For MultiSlitModels, only set a generic source_type value for the # entire datamodel if the user has set the source_type parameter. # Otherwise leave the generic source_type set to None, which will # force the use of per-slit source_types later in processing. if isinstance(model, datamodels.MultiSlitModel): if source_type is not None: self.source_type = source_type # For non-MultiSlitModel inputs, where there's only 1 target and # one source_type, use the user-provided source_type value, if it # exists. Otherwise, use the generic source_type value provided in # the input model (if it exists). else: if source_type is not None: self.source_type = source_type else: if model.meta.target.source_type is not None: self.source_type = model.meta.target.source_type.upper() # Store the input model for updating self.input = model # Let the user know what we're working with log.info("Using instrument: %s", self.instrument) log.info(" detector: %s", self.detector) log.info(" exp_type: %s", self.exptype) if self.filter is not None: log.info(" filter: %s", self.filter) if self.pupil is not None: log.info(" pupil: %s", self.pupil) if self.grating is not None: log.info(" grating: %s", self.grating) if self.band is not None: log.info(" band: %s", self.band)
[docs] def calc_nirspec(self, ftab, area_fname): """ Apply photometric calibration data to dataset and update conversion factor. For the NIRSPEC instrument, reference file matching is based on FILTER and GRATING, as well as SLIT name for the fixed-slits mode. The routine will find the corresponding information in the reference file, apply it to the data, and write the scalar conversion factor to the output model. All NIRSpec modes use wavelength-dependent flux calibration factors. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.NrsFsPhotomModel` or \ `~stdatamodels.jwst.datamodels.NrsMosPhotomModel` NIRSpec photom reference file data model. area_fname : str Pixel area map reference file name. """ # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) # Normal fixed-slit exposures get handled as a MultiSlitModel if self.exptype == "NRS_FIXEDSLIT": # We have to find and apply a separate set of flux cal # data for each of the fixed slits in the input for slit in self.input.slits: log.info(f"Working on slit {slit.name}") self.slitnum += 1 fields_to_match = { "filter": self.filter, "grating": self.grating, "slit": slit.name, } row = find_row(ftab.phot_table, fields_to_match) if row is None: continue self.photom_io(ftab.phot_table[row], time_correction=correction_table[row]) # Bright object fixed-slit exposures use a SlitModel elif self.exptype == "NRS_BRIGHTOBJ": # Bright object always uses S1600A1 slit slit_name = "S1600A1" log.info(f"Working on slit {slit_name}") fields_to_match = {"filter": self.filter, "grating": self.grating, "slit": slit_name} row = find_row(ftab.phot_table, fields_to_match) if row is None: return self.photom_io(ftab.phot_table[row], time_correction=correction_table[row]) # IFU and MSA exposures use one set of flux cal data else: fields_to_match = {"filter": self.filter, "grating": self.grating} row = find_row(ftab.phot_table, fields_to_match) if row is None: return # MSA (MOS) data if isinstance(self.input, datamodels.MultiSlitModel) and self.exptype == "NRS_MSASPEC": # Loop over the MSA slits, applying the same photom # ref data to all slits for slit in self.input.slits: log.info(f"Working on slit {slit.name}") self.slitnum += 1 self.photom_io(ftab.phot_table[row], time_correction=correction_table[row]) # IFU data else: tabdata = ftab.phot_table[row] # Get the scalar conversion factor from the PHOTMJ column; conv_factor = tabdata["photmj"] unit_is_surface_brightness = False # Convert to surface brightness units for all types of data if self.input.meta.photometry.pixelarea_steradians is None: log.warning("Pixel area is None, so can't convert flux to surface brightness!") else: log.debug("Converting conversion factor from flux to surface brightness") conv_factor /= self.input.meta.photometry.pixelarea_steradians unit_is_surface_brightness = True # Populate the photometry keywords log.info(f"PHOTMJSR value: {conv_factor:.6g}") self.input.meta.photometry.conversion_megajanskys = conv_factor self.input.meta.photometry.conversion_microjanskys = conv_factor * MJSR_TO_UJA2 # Get the length of the relative response arrays in this table row. # If the nelem column is not present, we'll use the entire wavelength # and relresponse arrays. try: nelem = tabdata["nelem"] except KeyError: nelem = None waves = tabdata["wavelength"] relresps = tabdata["relresponse"] if nelem is not None: waves = waves[:nelem] relresps = relresps[:nelem] # Convert wavelengths from meters to microns, if necessary microns_100 = 1.0e-4 # 100 microns, in meters if waves.max() > 0.0 and waves.max() < microns_100: waves *= 1.0e6 # Load the pixel area table for the IFU slices area_model = datamodels.open(area_fname) area_data = area_model.area_table # Compute 2D wavelength and pixel area arrays for the whole image wave2d, area2d, dqmap = self.calc_nrs_ifu_sens2d(area_data) # Compute relative sensitivity for each pixel based # on its wavelength sens2d = np.interp(wave2d, waves, relresps) sens2d *= tabdata["photmj"] # include the initial scalar conversion factor -> MJ # convert all data (both point source and extended) to surface brightness sens2d /= area2d * A2_TO_SR # divide by pixel area * A2_TO_SR -> MJy/sr # Reset NON_SCIENCE pixels to 1 in sens2d array and flag # them in the science data DQ array where_dq = np.bitwise_and(dqmap, dqflags.pixel["NON_SCIENCE"]) sens2d[where_dq > 0] = 1.0 self.input.dq = np.bitwise_or(self.input.dq, dqmap) # Multiply the science data and uncertainty arrays by the conversion factors sens2d_squared = sens2d * sens2d if not self.inverse: self.input.data *= sens2d self.input.err *= sens2d self.input.var_poisson *= sens2d_squared self.input.var_rnoise *= sens2d_squared else: self.input.data /= sens2d self.input.err /= sens2d self.input.var_poisson /= sens2d_squared self.input.var_rnoise /= sens2d_squared if self.input.var_flat is not None and np.size(self.input.var_flat) > 0: if not self.inverse: self.input.var_flat *= sens2d_squared else: self.input.var_flat /= sens2d_squared # Update BUNIT values for the science data and err if not self.inverse: if unit_is_surface_brightness: self.input.meta.bunit_data = "MJy/sr" self.input.meta.bunit_err = "MJy/sr" else: self.input.meta.bunit_data = "MJy" self.input.meta.bunit_err = "MJy" else: self.input.meta.bunit_data = "DN/s" self.input.meta.bunit_err = "DN/s" area_model.close()
[docs] def calc_niriss(self, ftab): """ Apply photometric calibration data to dataset and update conversion factor. For NIRISS matching is based on FILTER and PUPIL, as well as ORDER for spectroscopic modes. There may be multiple entries for a given ``FILTER + PUPIL`` combination, corresponding to different spectral orders. Data for all orders will be retrieved. The routine will find the corresponding information in the reference file and apply the conversions to the science arrays. If wavelength-dependent conversion factors exist, which will be the case for spectroscopic modes, they will be loaded and applied along with the scalar conversion. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.NisSossPhotomModel`, \ `~stdatamodels.jwst.datamodels.NisWfssPhotomModel`, or \ `~stdatamodels.jwst.datamodels.NisImgPhotomModel` NIRISS photom reference file data model. """ # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) # Handle MultiSlit models separately, which are used for NIRISS WFSS if isinstance(self.input, datamodels.MultiSlitModel): self.calc_wfss(ftab, correction_table, ["filter", "pupil", "order"]) elif isinstance(self.input, datamodels.CubeModel): raise DataModelTypeError( f"Unexpected input data model type for NIRISS: {str(self.input)}" ) elif self.exptype in ["NIS_SOSS"]: if isinstance(self.input, datamodels.ImageModel): raise DataModelTypeError( f"Unexpected input data model type for NIRISS: {str(self.input)}" ) for spec in self.input.spec: self.specnum += 1 self.order = spec.spectral_order fields_to_match = {"filter": self.filter, "pupil": self.pupil, "order": self.order} row = find_row(ftab.phot_table, fields_to_match) if row is None: return # Correct each integration for integ_row in range(len(spec.spec_table)): self.integ_row = integ_row self.photom_io( ftab.phot_table[row], order=self.order, time_correction=correction_table[row], ) else: fields_to_match = {"filter": self.filter, "pupil": self.pupil} row = find_row(ftab.phot_table, fields_to_match) if row is None: return self.photom_io(ftab.phot_table[row], time_correction=correction_table[row])
[docs] def calc_miri(self, ftab): """ Apply photometric calibration data to dataset and update conversion factor. For MIRI imaging and LRS modes, matching is based on FILTER and SUBARRAY. For MIRI WFSS the matching is based on FILTER and SUBARRAY. MIRI MRS uses dedicated photom reference files per ``CHANNEL + BAND``. For Imaging and LRS, the routine will find the corresponding row of information in the reference file, apply it, and store the scalar conversion factor in the output model PHOTMJSR keyword. If wavelength-dependent conversion values exist, which is the case for LRS mode, they will be included in the applied conversion. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.MirImgPhotomModel`, \ `~stdatamodels.jwst.datamodels.MirMrsPhotomModel`, or \ `~stdatamodels.jwst.datamodels.MirLrsPhotomModel` MIRI photom reference file data model. """ # Handle MultiSlit models and MIRI WFSS if isinstance(self.input, datamodels.MultiSlitModel) and self.exptype == "MIR_WFSS": # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) self.calc_wfss(ftab, correction_table, ["filter", "subarray"]) # Imaging detector elif self.detector == "MIRIMAGE": # Get the subarray value of the input data model log.info(" subarray: %s", self.subarray) fields_to_match = {"subarray": self.subarray, "filter": self.filter} row = find_row(ftab.phot_table, fields_to_match) if row is None: # Search again using subarray="GENERIC" for old ref files fields_to_match = {"subarray": "GENERIC", "filter": self.filter} row = find_row(ftab.phot_table, fields_to_match) if row is None: return # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) self.photom_io(ftab.phot_table[row], time_correction=correction_table[row]) # MRS detectors elif self.detector == "MIRIFUSHORT" or self.detector == "MIRIFULONG": # Make sure all NaNs have DO_NOT_USE flag set where_nan = np.isnan(ftab.data) ftab.dq[where_nan] = np.bitwise_or(ftab.dq[where_nan], dqflags.pixel["DO_NOT_USE"]) # Compute the combined 2D sensitivity factors sens2d = ftab.data # Multiply the science data and uncertainty arrays by the 2D # sensitivity factors sens2d_squared = sens2d * sens2d if not self.inverse: self.input.data *= sens2d self.input.err *= sens2d self.input.var_poisson *= sens2d_squared self.input.var_rnoise *= sens2d_squared else: self.input.data /= sens2d self.input.err /= sens2d self.input.var_poisson /= sens2d_squared self.input.var_rnoise /= sens2d_squared if self.input.var_flat is not None and np.size(self.input.var_flat) > 0: process_var_flat = True if not self.inverse: self.input.var_flat *= sens2d_squared else: self.input.var_flat /= sens2d_squared else: process_var_flat = False # Update the science dq self.input.dq = np.bitwise_or(self.input.dq, ftab.dq) # Check if reference file contains time dependent correction try: ftab.getarray_noinit("timecoeff_powerlaw_ch1") except AttributeError: # Old style ref file; skip the correction log.info( "Skipping MRS MIRI time correction. " "Extensions not found in the reference file." ) self.apply_time_correction = False if self.apply_time_correction: log.info("Applying MRS IFU time dependent correction.") mid_time = self.input.meta.exposure.mid_time correction = time_dependence.miri_mrs_time_correction( self.input, self.detector, ftab, mid_time ) inv_correction_sq = 1.0 / (correction * correction) self.input.data /= correction self.input.err /= correction self.input.var_poisson *= inv_correction_sq self.input.var_rnoise *= inv_correction_sq if process_var_flat: self.input.var_flat *= inv_correction_sq else: log.info("Not applying MRS IFU time dependent correction.") # Retrieve the scalar conversion factor from the reference data conv_factor = ftab.meta.photometry.conversion_megajanskys # Store the conversion factors in the meta data self.input.meta.photometry.conversion_megajanskys = conv_factor self.input.meta.photometry.conversion_microjanskys = conv_factor * MJSR_TO_UJA2 # Update BUNIT values for the science data and err if not self.inverse: self.input.meta.bunit_data = "MJy/sr" self.input.meta.bunit_err = "MJy/sr" else: self.input.meta.bunit_data = "DN/s" self.input.meta.bunit_err = "DN/s"
[docs] def calc_nircam(self, ftab): """ Apply photometric calibration data to dataset and update conversion factor. For NIRCAM, matching is based on FILTER and PUPIL. The routine will find the corresponding information in the reference file, apply the conversion factors, and store the scalar conversion factor in the output model. If wavelength-dependent conversion factors exist, they will be included in the calibration. For WFSS (grism) mode, the calibration information extracted from the reference file is applied to each slit instance in the science data. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.NrcImgPhotomModel` or \ `~stdatamodels.jwst.datamodels.NrcWfssPhotomModel` NIRCam photom reference file data model. """ # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) # Handle WFSS data separately from regular imaging if isinstance(self.input, datamodels.MultiSlitModel) and self.exptype == "NRC_WFSS": self.calc_wfss(ftab, correction_table, ["filter", "pupil", "order"]) elif self.exptype == "NRC_TSGRISM": fields_to_match = {"filter": self.filter, "pupil": self.pupil, "order": self.order} row = find_row(ftab.phot_table, fields_to_match) if row is None: return self.photom_io(ftab.phot_table[row], time_correction=correction_table[row]) else: # check for subarray in the phot_table: older files do not have it fields_to_match = {"filter": self.filter, "pupil": self.pupil} if "subarray" in ftab.phot_table.columns.names: log.info("Matching to subarray: %s", self.subarray) fields_to_match["subarray"] = self.subarray row = find_row(ftab.phot_table, fields_to_match) if row is None: return self.photom_io(ftab.phot_table[row], time_correction=correction_table[row])
[docs] def calc_fgs(self, ftab): """ Apply photometric calibration data to dataset and update conversion factor. For FGS, there is no matching required, because the instrument does not contain any filters or pupil wheel elements. The only mode is CLEAR. The routine will find the corresponding information in the reference file (which should have only a single row), apply it to the data, and write the conversion factor to the output model. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.FgsImgPhotomModel` FGS photom reference file data model. """ # Get a time-dependent correction from the reference file if available mid_time = self.input.meta.exposure.mid_time correction_table = time_dependence.get_correction_table(ftab, mid_time) # Read the first (and only) row in the reference file self.photom_io(ftab.phot_table[0], time_correction=correction_table[0])
[docs] def calc_nrs_ifu_sens2d(self, area_data): """ Create the 2-D wavelength and pixel area arrays. These are needed for constructing a NIRSpec IFU sensitivity map. Parameters ---------- area_data : ndarray Array of 1-D pixel area values for the IFU slices. Returns ------- wave2d : ndarray Array of 2-D wavelengths per pixel. area2d : ndarray Array of 2-D pixel area values. dqmap : ndarray Array of 2-D DQ flags per pixel. """ import gwcs from jwst.assign_wcs import nirspec # for NIRSpec IFU data microns_100 = 1.0e-4 # 100 microns, in meters # Create empty 2D arrays for the wavelengths and pixel areas wave2d = np.zeros_like(self.input.data) area2d = np.ones_like(self.input.data) # Create and initialize an array for the 2D dq map to be returned. # initialize all pixels to NON_SCIENCE, because operations below # only touch pixels within the bounding_box of each slice dqmap = np.zeros_like(self.input.dq) + dqflags.pixel["NON_SCIENCE"] # Get the list of WCSs for the IFU slices list_of_wcs = nirspec.nrs_ifu_wcs(self.input) # Loop over the slices for k, ifu_wcs in enumerate(list_of_wcs): # Construct array indexes for pixels in this slice x, y = gwcs.wcstools.grid_from_bounding_box( ifu_wcs.bounding_box, step=(1, 1), center=True ) log.debug(f"Slice {k}: {x[0][0]} {x[-1][-1]} {y[0][0]} {y[-1][-1]}") # Get the world coords for all pixels in this slice coords = ifu_wcs(x, y) dq = dqmap[y.astype(int), x.astype(int)] wl = coords[2] # pull out the valid wavelengths and reset other array to not include # nan values valid = ~np.isnan(wl) wl = wl[valid] dq = dq[valid] x = x[valid] y = y[valid] dq[:] = 0 if wl.max() < microns_100: log.info("Wavelengths in WCS table appear to be in meters") dqmap[y.astype(int), x.astype(int)] = dq # Insert the wavelength values for this slice into the # whole image array wave2d[y.astype(int), x.astype(int)] = wl # Insert the pixel area value for this slice into the # whole image array ar = np.ones_like(wl) ar[:] = area_data[np.where(area_data["slice_id"] == k)]["pixarea"][0] area2d[y.astype(int), x.astype(int)] = ar return wave2d, area2d, dqmap
[docs] def calc_wfss(self, ftab, correction_table, match_fields): """ Apply photometric calibration to all slits in a WFSS exposure. Iterates over each slit in the input `~stdatamodels.jwst.datamodels.MultiSlitModel`, looks up the row in the photom reference table matched by the attributes specified in ``match_fields``, and calls :meth:`photom_io` to apply the conversion. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.NrcWfssPhotomModel` or \ `~stdatamodels.jwst.datamodels.NisWfssPhotomModel` Photom reference file data model. correction_table : array-like Time-dependence correction values. match_fields : list of str List of field names to use for matching rows in the photom reference table. """ fields_to_match = {} for field in match_fields: value = getattr(self, field) fields_to_match[field] = value for slit in self.input.slits: log.info(f"Working on slit {slit.name}") # Increment slit number self.slitnum += 1 # Get the spectral order number for this slit if "order" in match_fields: order = slit.meta.wcsinfo.spectral_order fields_to_match["order"] = order row = find_row(ftab.phot_table, fields_to_match) if row is None: continue phot_unit = getattr(ftab, "phot_unit", None) self.photom_io( ftab.phot_table[row], time_correction=correction_table[row], phot_unit=phot_unit )
[docs] def photom_io(self, tabdata, order=None, time_correction=None, phot_unit=None): """ Combine photometric conversion factors and apply to the science dataset. Parameters ---------- tabdata : `~astropy.io.fits.FITS_rec` Single row of data from reference table. order : int Spectral order number. time_correction : float or None Multiplicative correction for time dependence, defined as the fractional amount of light recorded now divided by the light recorded on the zero-day MJD (t0). The scalar conversion factor will be divided by the correction value if provided, and if ``self.apply_time_correction`` is `True`. phot_unit : str or None Unit string for the photometric conversion factor from the reference file ``phot_unit`` attribute (e.g., ``"MJy Angstrom s / (DN sr)"``). When provided, it is used to compute a numeric conversion factor to the expected unit for the relevant observing mode. If `None`, no unit conversion is applied. Currently only implemented for WFSS data. """ # First get the scalar conversion factor. # For most modes, the scalar conversion factor in the photom reference # file is in units of (MJy / sr) / (DN / s), and the output from # the photom step will be in units of surface brightness, specifically # MJy / sr. For NIRSpec and NIRISS SOSS, however, the scalar # conversion factor is in units of MJy / (DN / s); for point-source # targets, the output will be in units of flux density, MJy. For an # extended source (or if the source type is unknown), the conversion # factor will be divided by the solid angle of a pixel, so the output # of the photom step will be in units of surface brightness, as for # other types of data. unit_is_surface_brightness = True # default try: conversion = tabdata["photmjsr"] # unit is MJy / sr except KeyError: conversion = tabdata["photmj"] # unit is MJy if isinstance(self.input, datamodels.MultiSlitModel): slit = self.input.slits[self.slitnum] if self.exptype in ["NRS_MSASPEC", "NRS_FIXEDSLIT"]: srctype = self.source_type if self.source_type else slit.source_type else: srctype = self.source_type if srctype is None or srctype.upper() != "POINT": if slit.meta.photometry.pixelarea_steradians is None: log.warning( "Pixel area is None, so can't convert flux to surface brightness!" ) else: log.debug("Converting conversion factor from flux to surface brightness") conversion /= slit.meta.photometry.pixelarea_steradians else: conversion_uniform = conversion / slit.meta.photometry.pixelarea_steradians unit_is_surface_brightness = False elif isinstance(self.input, datamodels.TSOMultiSpecModel): # output from extract1d should not require this area conversion unit_is_surface_brightness = False else: if self.source_type is None or self.source_type != "POINT": if self.input.meta.photometry.pixelarea_steradians is None: log.warning( "Pixel area is None, so can't convert flux to surface brightness!" ) else: log.debug("Converting conversion factor from flux to surface brightness") conversion /= self.input.meta.photometry.pixelarea_steradians else: pixel_area_steradians = self.input.meta.photometry.pixelarea_steradians conversion_uniform = conversion / pixel_area_steradians unit_is_surface_brightness = False # Apply the time-dependence correction if self.apply_time_correction and time_correction is not None and time_correction != 1.0: log.info(f"Multiplicative time dependence correction is {time_correction:.6g}") conversion /= time_correction # Store the conversion factor in the meta data log.info(f"PHOTMJSR value: {conversion:.6g}") if isinstance(self.input, datamodels.MultiSlitModel): self.input.slits[self.slitnum].meta.photometry.conversion_megajanskys = conversion self.input.slits[self.slitnum].meta.photometry.conversion_microjanskys = ( conversion * MJSR_TO_UJA2 ) elif isinstance(self.input, datamodels.TSOMultiSpecModel): # No place in the schema to store photometry info pass else: self.input.meta.photometry.conversion_megajanskys = conversion self.input.meta.photometry.conversion_microjanskys = conversion * MJSR_TO_UJA2 if phot_unit is not None: # expected_unit is the unit we decide is "standard" for photmj or photmjsr. # real reference files may use a different unit, passed in as phot_unit, but # phot_unit must be compatible with expected_unit for a given mode. expected_unit = None if self.exptype in ["NRC_WFSS", "NRC_TSGRISM", "NIS_WFSS", "MIR_WFSS"]: expected_unit = "MJy micron s / (DN sr)" if expected_unit is None: log.warning( f"phot_unit attribute found ({phot_unit}), but no expected unit defined for " f"exptype {self.exptype}. No unit conversion will be applied." ) factor = 1.0 else: factor = u.Unit(phot_unit).to(u.Unit(expected_unit)) conversion *= factor # If the photom reference file is for spectroscopic data, the table # in the reference file should contain a "wavelength" column (among # other columns). try: wl_test = tabdata["wavelength"] is_spectroscopic = True del wl_test except KeyError: is_spectroscopic = False # Get the length of the relative response arrays in this row. If the # nelem column is not present, we'll use the entire wavelength and # relresponse arrays. if is_spectroscopic: try: nelem = tabdata["nelem"] except KeyError: nelem = None else: nelem = None # For spectroscopic data, include the relative response array in # the flux conversion. no_cal = None if is_spectroscopic: waves = tabdata["wavelength"] relresps = tabdata["relresponse"] if nelem is not None: waves = waves[:nelem] relresps = relresps[:nelem] # Make sure waves and relresps are in increasing wavelength order if not np.all(np.diff(waves) > 0): index = np.argsort(waves) waves = waves[index].copy() relresps = relresps[index].copy() # Convert wavelengths from meters to microns, if necessary microns_100 = 1.0e-4 # 100 microns, in meters if waves.max() > 0.0 and waves.max() < microns_100: waves *= 1.0e6 # Compute a 2-D grid of conversion factors, as a function of wavelength if isinstance(self.input, datamodels.MultiSlitModel): slit = self.input.slits[self.slitnum] # The NIRSpec fixed-slit primary slit needs special handling if # it contains a point source if self.exptype.upper() == "NRS_FIXEDSLIT" and slit.source_type.upper() == "POINT": # First, compute 2D array of photom correction values using # uncorrected wavelengths, which is appropriate for a uniform source conversion_2d_uniform, no_cal = self.create_2d_conversion( slit, self.exptype, conversion_uniform, waves, relresps, order, use_wavecorr=False, ) slit.photom_uniform = conversion_2d_uniform # store the result # Now repeat the process using corrected wavelength values, # which is appropriate for a point source. This is the version of # the correction that will actually get applied to the data below. conversion, no_cal = self.create_2d_conversion( slit, self.exptype, conversion, waves, relresps, order, use_wavecorr=True ) slit.photom_point = conversion # store the result elif self.exptype in ["NRC_WFSS", "NRC_TSGRISM", "NIS_WFSS", "MIR_WFSS"]: log.info("Including spectral dispersion in 2-d flux calibration") conversion, no_cal = self.create_2d_conversion( slit, self.exptype, conversion, waves, relresps, order, include_dispersion=True, ) else: conversion, no_cal = self.create_2d_conversion( slit, self.exptype, conversion, waves, relresps, order ) elif isinstance(self.input, datamodels.TSOMultiSpecModel): # This input does not require a 2d conversion, but a 1d interpolation on the # input wavelength vector to find the relresponse. conversion, no_cal = self.create_1d_conversion( self.input.spec[self.specnum], conversion, waves, relresps, self.integ_row ) else: # NRC_TSGRISM data produces a SpecModel, which is handled here if self.exptype in ["NRC_WFSS", "NRC_TSGRISM", "NIS_WFSS"]: log.info("Including spectral dispersion in 2-d flux calibration") conversion, no_cal = self.create_2d_conversion( self.input, self.exptype, conversion, waves, relresps, order, include_dispersion=True, ) else: conversion, no_cal = self.create_2d_conversion( self.input, self.exptype, conversion, waves, relresps, order ) # Apply the conversion to the data and all uncertainty arrays if isinstance(self.input, datamodels.MultiSlitModel): slit = self.input.slits[self.slitnum] conversion_squared = conversion * conversion if not self.inverse: slit.data *= conversion slit.err *= conversion else: slit.data /= conversion slit.err /= conversion if slit.var_poisson is not None and np.size(slit.var_poisson) > 0: if not self.inverse: slit.var_poisson *= conversion_squared else: slit.var_poisson /= conversion_squared if slit.var_rnoise is not None and np.size(slit.var_rnoise) > 0: if not self.inverse: slit.var_rnoise *= conversion_squared else: slit.var_rnoise /= conversion_squared if slit.var_flat is not None and np.size(slit.var_flat) > 0: if not self.inverse: slit.var_flat *= conversion_squared else: slit.var_flat /= conversion_squared if no_cal is not None: slit.dq[..., no_cal] = np.bitwise_or( slit.dq[..., no_cal], dqflags.pixel["DO_NOT_USE"] ) if not self.inverse: if unit_is_surface_brightness: slit.meta.bunit_data = "MJy/sr" slit.meta.bunit_err = "MJy/sr" else: slit.meta.bunit_data = "MJy" slit.meta.bunit_err = "MJy" # Setting top model to None so they will not be written to FITs File. # Information on the units should only come from the individual slits. self.input.meta.bunit_data = None self.input.meta.bunit_err = None else: self.input.meta.bunit_data = "DN/s" self.input.meta.bunit_err = "DN/s" # Make sure output model has consistent NaN and DO_NOT_USE values match_nans_and_flags(slit) elif isinstance(self.input, datamodels.TSOMultiSpecModel): # Does this block need to address SB columns as well, or will # they (presumably) never be populated for SOSS? # It appears flux_error is the only error column populated? spec = self.input.spec[self.specnum] spec.spec_table.FLUX[self.integ_row] *= conversion spec.spec_table.FLUX_ERROR[self.integ_row] *= conversion spec.spec_table.FLUX_VAR_POISSON[self.integ_row] *= conversion**2.0 spec.spec_table.FLUX_VAR_RNOISE[self.integ_row] *= conversion**2.0 spec.spec_table.FLUX_VAR_FLAT[self.integ_row] *= conversion**2.0 spec.spec_table.BACKGROUND[self.integ_row] *= conversion spec.spec_table.BKGD_ERROR[self.integ_row] *= conversion spec.spec_table.BKGD_VAR_POISSON[self.integ_row] *= conversion**2.0 spec.spec_table.BKGD_VAR_RNOISE[self.integ_row] *= conversion**2.0 spec.spec_table.BKGD_VAR_FLAT[self.integ_row] *= conversion**2.0 spec.spec_table.columns["FLUX"].unit = "MJy" spec.spec_table.columns["FLUX_ERROR"].unit = "MJy" spec.spec_table.columns["FLUX_VAR_POISSON"].unit = "MJy^2" spec.spec_table.columns["FLUX_VAR_RNOISE"].unit = "MJy^2" spec.spec_table.columns["FLUX_VAR_FLAT"].unit = "MJy^2" spec.spec_table.columns["BACKGROUND"].unit = "MJy" spec.spec_table.columns["BKGD_ERROR"].unit = "MJy" spec.spec_table.columns["BKGD_VAR_POISSON"].unit = "MJy^2" spec.spec_table.columns["BKGD_VAR_RNOISE"].unit = "MJy^2" spec.spec_table.columns["BKGD_VAR_FLAT"].unit = "MJy^2" else: conversion_squared = conversion * conversion if not self.inverse: self.input.data *= conversion self.input.err *= conversion else: self.input.data /= conversion self.input.err /= conversion if self.input.var_poisson is not None and np.size(self.input.var_poisson) > 0: if not self.inverse: self.input.var_poisson *= conversion_squared else: self.input.var_poisson /= conversion_squared if self.input.var_rnoise is not None and np.size(self.input.var_rnoise) > 0: if not self.inverse: self.input.var_rnoise *= conversion_squared else: self.input.var_rnoise /= conversion_squared if self.input.var_flat is not None and np.size(self.input.var_flat) > 0: if not self.inverse: self.input.var_flat *= conversion_squared else: self.input.var_flat /= conversion_squared if no_cal is not None: self.input.dq[..., no_cal] = np.bitwise_or( self.input.dq[..., no_cal], dqflags.pixel["DO_NOT_USE"] ) if not self.inverse: if unit_is_surface_brightness: self.input.meta.bunit_data = "MJy/sr" self.input.meta.bunit_err = "MJy/sr" else: self.input.meta.bunit_data = "MJy" self.input.meta.bunit_err = "MJy" else: self.input.meta.bunit_data = "DN/s" self.input.meta.bunit_err = "DN/s" # Make sure output model has consistent NaN and DO_NOT_USE values match_nans_and_flags(self.input)
[docs] def create_2d_conversion( self, model, exptype, conversion, waves, relresps, order, use_wavecorr=None, include_dispersion=False, ): """ Create a 2D array of photometric conversion values. This array is based on wavelengths per pixel and response as a function of wavelength. Parameters ---------- model : `~stdatamodels.jwst.datamodels.JwstDataModel` Input data model containing the necessary wavelength information. exptype : str Exposure type of the input. conversion : float Initial scalar photometric conversion value. waves : ndarray 1D wavelength vector on which relative response values are sampled. relresps : ndarray 1D photometric response values, as a function of waves. order : int Spectral order number. use_wavecorr : bool or None Flag indicating whether or not to use corrected wavelengths. Typically only used for NIRSpec fixed-slit data. include_dispersion : bool or None Flag indicating whether the dispersion needs to be incorporated into the 2-D conversion factors. Returns ------- conversion : ndarray 2D array of computed photometric conversion values. no_cal : ndarray 2D mask indicating where no conversion is available. """ # Get the 2D wavelength array corresponding to the input # image pixel values wl_array = get_wavelengths(model, exptype, order, use_wavecorr) wl_array[np.isnan(wl_array)] = -1.0 # Interpolate the photometric response values onto the # 2D wavelength grid # waves is in microns, so wl_array must be in microns too conv_2d = np.interp(wl_array, waves, relresps, left=np.nan, right=np.nan) if include_dispersion: dispaxis = get_dispersion_direction(self.exptype, self.grating, self.filter, self.pupil) if dispaxis is not None: dispersion_array = self.get_dispersion_array(wl_array, dispaxis) conv_2d /= np.abs(dispersion_array) else: log.warning( "Unable to get dispersion direction, so cannot calculate dispersion array" ) conversion = conversion * conv_2d no_cal = np.isnan(conv_2d) conversion[no_cal] = 0.0 return conversion, no_cal
[docs] def get_dispersion_array(self, wavelength_array, dispaxis): """ Create an array of dispersion values from the wavelength array. Parameters ---------- wavelength_array : float 2-D array of wavelength values, assumed to be in microns. dispaxis : int Direction along which light is dispersed: * 1 = along rows * 2 = along columns Returns ------- dispersion_array : float 2-D array of dispersion values, in microns/pixel. """ nrows, ncols = wavelength_array.shape dispersion_array = np.zeros(wavelength_array.shape) if dispaxis == 1: for row in range(nrows): dispersion_array[row] = np.gradient(wavelength_array[row]) elif dispaxis == 2: for column in range(ncols): dispersion_array[:, column] = np.gradient(wavelength_array[:, column]) else: log.warning(f"Can't process data with DISPAXIS={dispaxis}") return dispersion_array
[docs] def create_1d_conversion(self, model, conversion, waves, relresps, integ_row): """ Resample the photometric conversion array. Create a 1D array of photometric conversion values based on wavelength array of input spectrum and response as a function of wavelength. Parameters ---------- model : `~stdatamodels.jwst.datamodels.JwstDataModel` Input data model containing the necessary wavelength information. conversion : float Initial scalar photometric conversion value. waves : ndarray 1D wavelength vector on which relative response values are sampled. relresps : ndarray 1D photometric response values, as a function of waves. integ_row : int Table row number for the spectrum for the current integration. Returns ------- conversion : ndarray 1D array of computed photometric conversion values. no_cal : ndarray 1D mask indicating where no conversion is available. """ # Get the 2D wavelength array corresponding to the input # image pixel values wl_array = model.spec_table["WAVELENGTH"][integ_row] flip_wl = False if np.nanargmax(wl_array) - np.nanargmin(wl_array) < 0: # Need monotonically increasing wavelengths for interp # Bool flag to flip fit if True flip_wl = True wl_array = wl_array[::-1] wl_array[np.isnan(wl_array)] = -1.0 if np.nanargmax(waves) - np.nanargmin(waves) < 0: # Need monotonically increasing wavelengths for interp # This shouldn't have effects external to method. waves = waves[::-1] relresps = relresps[::-1] # Interpolate the photometric response values onto the # 1D wavelength grid conv_1d = np.interp(wl_array, waves, relresps, left=np.nan, right=np.nan) if flip_wl: # If wl_array was flipped, flip the conversion before returning it. conv_1d = conv_1d[::-1] # Combine the scalar and 1D conversion factors conversion = conversion * conv_1d no_cal = np.isnan(conv_1d) conversion[no_cal] = 0.0 return conversion, no_cal
[docs] def pixarea_from_ftab(self, ftab): """ Get pixel area in steradians and arsec^2 from photom reference file. Read the pixel area values in the PIXAR_A2 and PIXAR_SR keys from the primary header of the photom reference file. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.JwstDataModel` A photom reference file data model. Returns ------- area_ster : float Pixel area in steradians. area_a2 : float Pixel area in arcsec^2. """ area_ster, area_a2 = None, None area_ster = ftab.meta.photometry.pixelarea_steradians log.info("Attempting to obtain PIXAR_SR and PIXAR_A2 values from PHOTOM reference file.") if area_ster is None: log.warning("The PIXAR_SR keyword is missing from %s", ftab.meta.filename) area_a2 = ftab.meta.photometry.pixelarea_arcsecsq if area_a2 is None: log.warning("The PIXAR_A2 keyword is missing from %s", ftab.meta.filename) if area_ster is not None and area_a2 is not None: log.info("Values for PIXAR_SR and PIXAR_A2 obtained from PHOTOM reference file.") return area_ster, area_a2
[docs] def save_area_info(self, ftab, area_fname): """ Populate area attributes in science dataset. Read the pixel area values in the PIXAR_A2 and PIXAR_SR keys from the primary header of the pixel area reference file or (for NIRSpec data) from the PIXAREA column in the selected row of the AREA table in the area reference file. Use that information to populate the pixel area keywords in the output product. Except for NIRSpec data, also copy the pixel area data array from the pixel area reference file to the area extension of the output product. Parameters ---------- ftab : `~stdatamodels.jwst.datamodels.JwstDataModel` A photom reference file data model. area_fname : str Pixel area reference file name. """ use_pixarea_rfile = False area_ster, area_a2 = None, None if area_fname is not None and area_fname != "N/A": use_pixarea_rfile = True # Load the pixel area reference file pix_area = datamodels.open(area_fname) if self.instrument != "NIRSPEC": if use_pixarea_rfile: # Copy the pixel area data array to the appropriate attribute # of the science data model if isinstance(self.input, datamodels.MultiSlitModel): # Note that this only copied to the first slit. self.input.slits[0].area = pix_area.data else: ystart = self.input.meta.subarray.ystart - 1 xstart = self.input.meta.subarray.xstart - 1 yend = ystart + self.input.meta.subarray.ysize xend = xstart + self.input.meta.subarray.xsize self.input.area = pix_area.data[ystart:yend, xstart:xend] log.info("Pixel area map copied to output.") # Load the average pixel area values from the AREA reference file header # Don't need to do this for NIRSpec, because pixel areas will be # copied using save_area_nirspec try: area_ster = pix_area.meta.photometry.pixelarea_steradians if area_ster is None: log.warning("The PIXAR_SR keyword is missing from %s", area_fname) area_a2 = pix_area.meta.photometry.pixelarea_arcsecsq if area_a2 is None: log.warning("The PIXAR_A2 keyword is missing from %s", area_fname) if area_ster is not None and area_a2 is not None: log.info( "Values for PIXAR_SR and PIXAR_A2 obtained from AREA reference file." ) # The area reference file might be older, try the photom reference file except (AttributeError, KeyError): area_ster, area_a2 = self.pixarea_from_ftab(ftab) pix_area.close() # The area reference file might be older, try the photom reference file else: area_ster, area_a2 = self.pixarea_from_ftab(ftab) # Copy the pixel area values to the output log.debug("PIXAR_SR = %s, PIXAR_A2 = %s", str(area_ster), str(area_a2)) if area_a2 is not None: area_a2 = float(area_a2) if area_ster is not None: area_ster = float(area_ster) # For MultiSlitModels, copy to each slit attribute if isinstance(self.input, datamodels.MultiSlitModel): for slit in self.input.slits: slit.meta.photometry.pixelarea_arcsecsq = area_a2 slit.meta.photometry.pixelarea_steradians = area_ster else: self.input.meta.photometry.pixelarea_arcsecsq = area_a2 self.input.meta.photometry.pixelarea_steradians = area_ster else: self.save_area_nirspec(pix_area) pix_area.close()
[docs] def save_area_nirspec(self, pix_area): """ Populate the pixel area attributes of science dataset. Read the pixel area value from the PIXAREA column in the selected row of the AREA table in the area reference file. Use that information to populate the pixel area keywords in the output product. Parameters ---------- pix_area : `~stdatamodels.jwst.datamodels.JwstDataModel` Pixel area reference file data model. """ exp_type = self.exptype pixarea = pix_area.area_table["pixarea"] if exp_type == "NRS_MSASPEC": quadrant = pix_area.area_table["quadrant"] shutter_x = pix_area.area_table["shutter_x"] shutter_y = pix_area.area_table["shutter_y"] n_failures = 0 for slit in self.input.slits: match_q = quadrant == slit.quadrant match_x = shutter_x == slit.xcen match_y = shutter_y == slit.ycen match = np.logical_and(match_q, np.logical_and(match_x, match_y)) n_matches = match.sum(dtype=np.int64) if n_matches != 1: n_failures += 1 slit.meta.photometry.pixelarea_arcsecsq = 1.0 slit.meta.photometry.pixelarea_steradians = 1.0 else: slit.meta.photometry.pixelarea_arcsecsq = float(pixarea[match].item()) slit.meta.photometry.pixelarea_steradians = ( slit.meta.photometry.pixelarea_arcsecsq * A2_TO_SR ) if n_failures > 0: log.warning( "%d failures out of %d, matching MSA data to area reference file", n_failures, len(self.input.slits), ) elif exp_type == "NRS_BRIGHTOBJ": slit_id = pix_area.area_table["slit_id"] nrows = len(slit_id) slit_name = self.input.name # "S1600A1" foundit = False for k in range(nrows): if slit_id[k] == slit_name: foundit = True self.input.meta.photometry.pixelarea_arcsecsq = float(pixarea[k]) self.input.meta.photometry.pixelarea_steradians = ( self.input.meta.photometry.pixelarea_arcsecsq * A2_TO_SR ) break if not foundit: log.warning("%s not found in pixel area table", slit_name) self.input.meta.photometry.pixelarea_arcsecsq = 1.0 self.input.meta.photometry.pixelarea_steradians = 1.0 elif exp_type in ["NRS_LAMP", "NRS_FIXEDSLIT"]: slit_id = pix_area.area_table["slit_id"] nrows = len(slit_id) for slit in self.input.slits: foundit = False for k in range(nrows): if slit_id[k] == slit.name: foundit = True slit.meta.photometry.pixelarea_arcsecsq = float(pixarea[k]) slit.meta.photometry.pixelarea_steradians = ( slit.meta.photometry.pixelarea_arcsecsq * A2_TO_SR ) break if not foundit: log.warning("%s not found in pixel area table", slit.name) slit.meta.photometry.pixelarea_arcsecsq = 1.0 slit.meta.photometry.pixelarea_steradians = 1.0 elif exp_type == "NRS_IFU": # There is a slice_id column for selecting a matching slice, but # we're just going to average the pixel area for all slices. pixel_area = np.nanmean(pixarea) self.input.meta.photometry.pixelarea_arcsecsq = pixel_area self.input.meta.photometry.pixelarea_steradians = ( self.input.meta.photometry.pixelarea_arcsecsq * A2_TO_SR ) else: log.warning( "EXP_TYPE of NIRSpec data is %s, which is not an " "expected value; pixel area keywords will be set to 1.0", exp_type, ) self.input.meta.photometry.pixelarea_arcsecsq = 1.0 self.input.meta.photometry.pixelarea_steradians = 1.0
[docs] def apply_photom(self, photom_fname, area_fname): """ Apply the photom calibration step. Open the reference file, retrieve the conversion factors from the reference file that are appropriate to the instrument mode. This can consist of both a scalar factor and an array of wavelength-dependent factors. The combination of both factors are applied to the input model. The scalar factor is also written to the PHOTMJSR and PHOTUJA2 keywords in the model. If a pixel area map reference file exists for the instrument mode, it is attached to the input model. Parameters ---------- photom_fname : str Photom reference file name. area_fname : str Pixel area map reference file name. Returns ------- output_model : `~stdatamodels.jwst.datamodels.JwstDataModel` Output data model with the flux calibrations applied. """ with datamodels.open(photom_fname, strict_validation=True) as ftab: # Make sure the file is valid. # This will raise a ValueError if time coefficient tables # are present and they do not match the photometry table. ftab.validate() # Load the pixel area reference file, if it exists, and attach the # reference data to the science model # SOSS data are in a TSOMultiSpecModel, which will not allow for # saving the area info. if self.exptype != "NIS_SOSS": self.save_area_info(ftab, area_fname) if self.instrument == "NIRISS": self.calc_niriss(ftab) elif self.instrument == "NIRSPEC": # Some NIRSpec flat variance values can overflow when multiplied by the # flux conversion factor. Photom values for some regions are also set to zero. with warnings.catch_warnings(): warnings.filterwarnings("ignore", "overflow encountered", RuntimeWarning) warnings.filterwarnings("ignore", "divide by zero", RuntimeWarning) warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) self.calc_nirspec(ftab, area_fname) elif self.instrument == "NIRCAM": self.calc_nircam(ftab) elif self.instrument == "MIRI": # MIRI data may have NaNs in data or error arrays with warnings.catch_warnings(): warnings.filterwarnings("ignore", "invalid value", RuntimeWarning) self.calc_miri(ftab) elif self.instrument == "FGS": self.calc_fgs(ftab) else: raise RuntimeError(f"Instrument {self.instrument} is not recognized") return self.input