# Module for calculating pathloss correction for science data sets
import logging
import math
import numpy as np
import stdatamodels.jwst.datamodels as datamodels
from gwcs import wcstools
from stcal.alignment.util import compute_scale
from jwst.assign_wcs import nirspec
from jwst.lib.pipe_utils import match_nans_and_flags
from jwst.lib.wcs_utils import get_wavelengths
log = logging.getLogger(__name__)
# There are 30 slices in the NIRSpec IFU, numbered from 0 to 29
NIRSPEC_IFU_SLICES = np.arange(30)
__all__ = [
"get_center",
"shutter_above_is_closed",
"shutter_below_is_closed",
"get_aperture_from_model",
"calculate_pathloss_vector",
"calculate_two_shutter_uniform_pathloss",
"do_correction",
"interpolate_onto_grid",
"is_pointsource",
"do_correction_mos",
"do_correction_fixedslit",
"do_correction_ifu",
"do_correction_lrs",
"do_correction_soss",
]
[docs]
def get_center(exp_type, input_model, offsets=False):
"""
Get the center of the target in the aperture.
``(0.0, 0.0)`` is the aperture center. Coordinates go
from -0.5 to 0.5.
Parameters
----------
exp_type : str
Keyword value
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data to be corrected
offsets : bool
Only applies for MIRI LRS fixed-slit; if `True`, the offsets
will be returned as well in the form of ``(imx, imy)``
Returns
-------
xcenter : float
X-coordinate center of the target in the aperture
ycenter : float
Y-coordinate center of the target in the aperture
imx : float, optional
X-location relative to LRS aperture reference point
imy : float, optional
Y-location relative to LRS aperture reference point
"""
if exp_type not in [
"NRS_MSASPEC",
"NRS_FIXEDSLIT",
"NRS_BRIGHTOBJ",
"NRS_IFU",
"MIR_LRS-FIXEDSLIT",
]:
log.warning(f"get_center not implemented for exp_type {exp_type}")
log.warning("Using (0.0, 0.0)")
return 0.0, 0.0
if exp_type == "NRS_IFU":
# Currently assume IFU sources are centered
return 0.0, 0.0
# MSA centering is specified in the MultiSlit model
# "input_model" treated as a slit object
# can't use getattr here because of bug where calling getattr will auto-populate
# schema-defined attributes
if input_model.hasattr("source_xpos"):
xcenter = input_model.source_xpos
else:
xcenter = None
if input_model.hasattr("source_ypos"):
ycenter = input_model.source_ypos
else:
ycenter = None
if exp_type in ["NRS_MSASPEC", "NRS_FIXEDSLIT", "NRS_BRIGHTOBJ"]:
if xcenter is not None and ycenter is not None:
log.info(f"Using source_xpos, source_ypos = {xcenter, ycenter}")
return xcenter, ycenter
log.warning("Unable to get source center from model source_xpos, source_ypos")
log.warning("Using 0.0, 0.0")
xcenter = 0.0
ycenter = 0.0
return xcenter, ycenter
# only MIR_LRS-FIXEDSLIT left. Get center from target ra, dec
# get slit reference point from wcs object
det_to_sky = input_model.meta.wcs.get_transform("detector", "world")
sky_to_det = input_model.meta.wcs.get_transform("world", "detector")
imx = -det_to_sky.offset_1 # aperture ref point from specwcs
imy = -det_to_sky.offset_2
if xcenter is None or ycenter is None:
# compute location of target on detector
_ref_ra, _ref_dec, ref_wave = det_to_sky(imx, imy)
xcenter, ycenter = sky_to_det(
input_model.meta.target.ra, input_model.meta.target.dec, ref_wave
)
log.info(f"LRS target location from RA/Dec = {xcenter, ycenter}")
else:
log.info(f"LRS target location from source_xpos, source_ypos = {xcenter, ycenter}")
# compute location relative to LRS aperture reference point
xcenter -= imx
ycenter -= imy
if offsets:
return xcenter, ycenter, imx, imy
return xcenter, ycenter
[docs]
def shutter_above_is_closed(shutter_state):
"""
Check if the shutter above the target shutter is closed.
Parameters
----------
shutter_state : str
String that describes the shutter state.
Returns
-------
result : bool
`True` if the shutter above the target shutter is closed.
"""
ref_loc = shutter_state.find("x")
nshutters = len(shutter_state)
if ref_loc == nshutters - 1 or shutter_state[ref_loc + 1] == "0":
return True
else:
return False
[docs]
def shutter_below_is_closed(shutter_state):
"""
Check if the shutter below the target shutter is closed.
Parameters
----------
shutter_state : str
String that describes the shutter state.
Returns
-------
result : bool
`True` if the shutter below the target shutter is closed.
"""
ref_loc = shutter_state.find("x")
if ref_loc == 0 or shutter_state[ref_loc - 1] == "0":
return True
else:
return False
[docs]
def get_aperture_from_model(input_model, match):
"""
Determine the correct aperture in the model to use.
Based on the value of the 'match' parameter:
* For MSA, match is the shutter state string.
* For fixed slit, match is the name of the slit.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data to be corrected
match : str
Aperture name or shutter state
Returns
-------
aperture : str or None
Aperture name
"""
if input_model.meta.exposure.type == "NRS_MSASPEC":
# Currently there are only 2 apertures in the MSA pathloss reference file: 1x1 and 1x3
# Only return the 1x1 aperture if the reference shutter has closed shutters above and below
if shutter_below_is_closed(match) and shutter_above_is_closed(match):
matchsize = 1
else:
matchsize = 3
for aperture in input_model.apertures:
# Only return the aperture
if aperture.shutters == matchsize:
return aperture
elif input_model.meta.exposure.type in ["NRS_FIXEDSLIT", "NRS_BRIGHTOBJ", "NIS_SOSS"]:
for aperture in input_model.apertures:
log.debug(aperture.name)
if aperture.name == match:
return aperture
else:
log.warning(f"Unable to get aperture from exp_type {input_model.meta.exposure.type}")
# If nothing matches, return None
return None
[docs]
def calculate_pathloss_vector(pathloss_refdata, pathloss_wcs, xcenter, ycenter, calc_wave=True):
"""
Calculate the pathloss vectors from the pathloss model.
Use the coordinates of the center of the target to interpolate the
pathloss value as a function of wavelength at that location
Parameters
----------
pathloss_refdata : ndarray
The input pathloss data array
pathloss_wcs : `~gwcs.wcs.WCS`
The pathloss datamodel's WCS attribute
xcenter : float
The x-center of the target (-0.5 to 0.5)
ycenter : float
The y-center of the target (-0.5 to 0.5)
calc_wave : bool
Calculate a wavelength vector from the reference file
Returns
-------
wavelength : ndarray
The 1-D wavelength array
pathloss : ndarray
The corresponding 1-D pathloss array
is_inside_slitlet : bool
`True` if the object source position is inside the slitlet,
otherwise `False`
"""
is_inside_slitlet = True
if len(pathloss_refdata.shape) == 3:
wavesize, nrows, ncols = pathloss_refdata.shape
else:
wavesize = pathloss_refdata.shape[0]
wavelength = np.zeros(wavesize, dtype=np.float32)
pathloss_vector = np.zeros(wavesize, dtype=np.float32)
# uniformsource.data is 1-d. We just return it, along with
# a vector of wavelengths calculated using the WCS.
# Uniform source is always inside the slitlet
if len(pathloss_refdata.shape) == 1:
crpix1 = pathloss_wcs.crpix1
crval1 = pathloss_wcs.crval1
cdelt1 = pathloss_wcs.cdelt1
for i in np.arange(wavesize):
wavelength[i] = crval1 + (float(i + 1) - crpix1) * cdelt1
return wavelength, pathloss_refdata, True
# pointsource.data is 3-d, so we have to extract a wavelength vector
# at the specified location. We do this using bilinear interpolation
else:
# If requested, calculate a wavelength vector from the ref file
# WCS info
if calc_wave:
crpix3 = pathloss_wcs.crpix3
crval3 = pathloss_wcs.crval3
cdelt3 = pathloss_wcs.cdelt3
for i in np.arange(wavesize):
wavelength[i] = crval3 + (float(i + 1) - crpix3) * cdelt3
# Calculate python index of object center
crpix1 = pathloss_wcs.crpix1
crval1 = pathloss_wcs.crval1
cdelt1 = pathloss_wcs.cdelt1
crpix2 = pathloss_wcs.crpix2
crval2 = pathloss_wcs.crval2
cdelt2 = pathloss_wcs.cdelt2
object_colindex = crpix1 + (xcenter - crval1) / cdelt1 - 1
object_rowindex = crpix2 + (ycenter - crval2) / cdelt2 - 1
# check whether target is inside slit boundaries
if (
object_colindex < 0
or object_colindex >= (ncols - 1)
or object_rowindex < 0
or object_rowindex >= (nrows - 1)
):
is_inside_slitlet = False
else:
# Do bilinear interpolation to get the array of
# path loss vs wavelength
dx1 = object_colindex - int(object_colindex)
dx2 = 1.0 - dx1
dy1 = object_rowindex - int(object_rowindex)
dy2 = 1.0 - dy1
a11 = dx1 * dy1
a12 = dx1 * dy2
a21 = dx2 * dy1
a22 = dx2 * dy2
j, i = int(object_colindex), int(object_rowindex)
pathloss_vector = (
a22 * pathloss_refdata[:, i, j]
+ a21 * pathloss_refdata[:, i + 1, j]
+ a12 * pathloss_refdata[:, i, j + 1]
+ a11 * pathloss_refdata[:, i + 1, j + 1]
)
return wavelength, pathloss_vector, is_inside_slitlet
[docs]
def do_correction(
input_model,
pathloss_model=None,
inverse=False,
source_type=None,
user_slit_loc=None,
return_corrections=True,
):
"""
Execute all tasks for Path Loss Correction.
Parameters
----------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
Science data to be corrected. Updated in place.
pathloss_model : `~stdatamodels.jwst.datamodels.MirLrsPathlossModel`, \
`~stdatamodels.jwst.datamodels.PathlossModel`, or None, optional
Pathloss correction data
inverse : bool, optional
Invert the math operations used to apply the pathloss correction.
source_type : str or None, optional
Force processing using the specified source type.
user_slit_loc : float, optional
User-provided slit location in units of arcsec, where ``(0, 0)``
is the center and the edges are +/-0.255 arcsec.
return_corrections : bool, optional
If `True`, a model containing the applied corrections is returned.
Returns
-------
input_model : `~stdatamodels.jwst.datamodels.JwstDataModel`
The corrected science data with pathloss extensions added.
corrections : `~stdatamodels.jwst.datamodels.JwstDataModel`, optional
A model of the correction arrays, returned if ``return_corrections``
is `True`.
"""
if not pathloss_model:
raise RuntimeError("A PathlossModel must be specified.")
exp_type = input_model.meta.exposure.type
log.info(f"Input exposure type is {exp_type}")
corrections = None
if exp_type == "NRS_MSASPEC":
corrections = do_correction_mos(input_model, pathloss_model, inverse, source_type)
elif exp_type in ["NRS_FIXEDSLIT", "NRS_BRIGHTOBJ"]:
corrections = do_correction_fixedslit(input_model, pathloss_model, inverse, source_type)
elif exp_type == "NRS_IFU":
corrections = do_correction_ifu(input_model, pathloss_model, inverse, source_type)
elif exp_type in ["MIR_LRS-FIXEDSLIT", "MIR_WFSS"]:
# only apply correction to LRS fixed-slit if target is point source
if is_pointsource(input_model.meta.target.source_type):
do_correction_lrs(input_model, pathloss_model, user_slit_loc)
else:
log.warning("Not a point source; skipping correction for LRS.")
input_model.meta.cal_step.pathloss = "SKIPPED"
elif exp_type == "NIS_SOSS":
if inverse:
log.warning("Use of inversion with NIS_SOSS is not implemented. Skipping")
input_model.meta.cal_step.pathloss = "SKIPPED"
elif source_type is not None:
log.warning("Forcing of source type with NIS_SOSS is not implemented. Skipping")
input_model.meta.cal_step.pathloss = "SKIPPED"
else:
do_correction_soss(input_model, pathloss_model)
if return_corrections:
return input_model, corrections
else:
return input_model
[docs]
def interpolate_onto_grid(wavelength_grid, wavelength_vector, pathloss_vector):
"""
Interpolate pathloss value onto grid.
Get the value of pathloss by interpolating each non-NaN element of
``wavelength_grid`` into ``pathloss_vector`` using the index lookup of
``wavelength_vector``. Pixels with wavelengths outside the range of the
reference file should have a correction of NaN.
Parameters
----------
wavelength_grid : ndarray
The 2-D grid of wavelengths for each science data pixel
wavelength_vector : ndarray
The 1-D vector of wavelengths
pathloss_vector : ndarray
Corresponding 1-D vector of pathloss values
Returns
-------
pathloss_grid : ndarray
Grid of pathloss corrections for each non-NaN pixel
"""
# Need to set the pathloss correction of pixels whose wavelength is outside
# the wavelength range of the reference file to NaN. This trick will accomplish
# that while still allowing the use of array linear interpolation
#
# Pad out the wavelength and pathloss vectors by adding another element
# at the beginning and end, and put NaN in the new first and last elements
# of the extended pathloss vector
extended_pathloss_vector = np.zeros(len(pathloss_vector) + 2)
extended_pathloss_vector[1:-1] = pathloss_vector
extended_pathloss_vector[0] = np.nan
extended_pathloss_vector[-1] = np.nan
extended_wavelength_vector = np.zeros(len(wavelength_vector) + 2)
extended_wavelength_vector[1:-1] = wavelength_vector
extended_wavelength_vector[0] = wavelength_vector[0] - 0.1
extended_wavelength_vector[-1] = wavelength_vector[-1] + 0.1
# Find the indices in the original wavelength array that correspond
# to the lower of 2 adjacent wavelength values spanning the wavelength
# values in the wavelength grid. NaNs and values > max wavelength will
# return an index to an element 1 past the array, values below min wavelength
# will return 0
upper_indices = np.searchsorted(wavelength_vector, wavelength_grid)
# Move these indices so they correspond to the extended arrays
lower_indices = upper_indices
upper_indices = upper_indices + 1
# Now we can just proceed without worrying about values outside the wavelength
# array
numerator = wavelength_grid - extended_wavelength_vector[lower_indices]
denominator = (
extended_wavelength_vector[upper_indices] - extended_wavelength_vector[lower_indices]
)
fraction = numerator / denominator
pathloss_grid = wavelength_grid * 0.0
pathloss_grid = extended_pathloss_vector[lower_indices] + fraction * (
extended_pathloss_vector[upper_indices] - extended_pathloss_vector[lower_indices]
)
return pathloss_grid
[docs]
def is_pointsource(srctype):
"""
Check whether input is a point source.
Parameters
----------
srctype : str
Determined type of source.
Returns
-------
result : bool
`True` if ``srctype`` is "POINT"
"""
if srctype is None:
return False
elif srctype.upper() == "POINT":
return True
else:
return False
[docs]
def do_correction_mos(data, pathloss, inverse=False, source_type=None):
"""
Path loss correction for NIRSpec MOS.
Data are modified in-place.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The NIRSpec MOS data to be corrected.
pathloss : `~stdatamodels.jwst.datamodels.PathlossModel` or None
The pathloss reference data.
inverse : bool
Invert the math operations used to apply the pathloss correction.
source_type : str or None
Force processing using the specified source type.
Returns
-------
corrections : `~stdatamodels.jwst.datamodels.MultiSlitModel`
The pathloss corrections applied.
"""
exp_type = data.meta.exposure.type
# Loop over all MOS slitlets
corrections = datamodels.MultiSlitModel()
correction_found = False
for slit_number, slit in enumerate(data.slits):
log.info(f"Working on slit {slit_number}")
correction = _corrections_for_mos(slit, pathloss, exp_type, source_type)
corrections.slits.append(correction)
# Apply the correction
if not correction:
log.warning(f"No correction provided for slit {slit_number}. Skipping")
continue
correction_found = True
if not inverse:
slit.data /= correction.data
slit.err /= correction.data
slit.var_poisson /= correction.data**2
slit.var_rnoise /= correction.data**2
if slit.var_flat is not None and np.size(slit.var_flat) > 0:
slit.var_flat /= correction.data**2
else:
slit.data *= correction.data
slit.err *= correction.data
slit.var_poisson *= correction.data**2
slit.var_rnoise *= correction.data**2
if slit.var_flat is not None and np.size(slit.var_flat) > 0:
slit.var_flat *= correction.data**2
slit.pathloss_point = correction.pathloss_point
slit.pathloss_uniform = correction.pathloss_uniform
slit.pathloss_correction_type = correction.pathloss_correction_type
# Make sure all NaNs and flags match up in the output slit model
match_nans_and_flags(slit)
# Set step status to complete
if correction_found:
data.meta.cal_step.pathloss = "COMPLETE"
else:
data.meta.cal_step.pathloss = "SKIPPED"
return corrections
[docs]
def do_correction_fixedslit(data, pathloss, inverse=False, source_type=None):
"""
Path loss correction for NIRSpec fixed-slit modes.
Data are modified in-place.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The NIRSpec fixed-slit data to be corrected.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data.
inverse : bool
Invert the math operations used to apply the pathloss correction.
source_type : str or None
Force processing using the specified source type.
Returns
-------
corrections : `~stdatamodels.jwst.datamodels.MultiSlitModel`
The pathloss corrections applied.
"""
exp_type = data.meta.exposure.type
# Loop over all slits contained in the input
corrections = datamodels.MultiSlitModel()
correction_found = False
for slit_number, slit in enumerate(data.slits):
log.info(f"Working on slit {slit.name}")
correction = _corrections_for_fixedslit(slit, pathloss, exp_type, source_type)
corrections.slits.append(correction)
# Apply the correction
if not correction:
log.warning(f"No correction provided for slit {slit_number}. Skipping")
continue
correction_found = True
if not inverse:
slit.data /= correction.data
slit.err /= correction.data
if slit.var_poisson is not None:
slit.var_poisson /= correction.data**2
if slit.var_rnoise is not None:
slit.var_rnoise /= correction.data**2
if slit.var_flat is not None:
slit.var_flat /= correction.data**2
else:
slit.data *= correction.data
slit.err *= correction.data
if slit.var_poisson is not None:
slit.var_poisson *= correction.data**2
if slit.var_rnoise is not None:
slit.var_rnoise *= correction.data**2
if slit.var_flat is not None:
slit.var_flat *= correction.data**2
slit.pathloss_point = correction.pathloss_point
slit.pathloss_uniform = correction.pathloss_uniform
slit.pathloss_correction_type = correction.pathloss_correction_type
# Make sure all NaNs and flags match up in the output slit model
match_nans_and_flags(slit)
# Set step status to complete
if correction_found:
data.meta.cal_step.pathloss = "COMPLETE"
else:
data.meta.cal_step.pathloss = "SKIPPED"
return corrections
[docs]
def do_correction_ifu(data, pathloss, inverse=False, source_type=None):
"""
Path loss correction for NIRSpec IFU.
Data are modified in-place.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The NIRSpec IFU data to be corrected.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data.
inverse : bool
Invert the math operations used to apply the pathloss correction.
source_type : str or None
Force processing using the specified source type.
Returns
-------
corrections : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss corrections applied.
"""
correction = _corrections_for_ifu(data, pathloss, source_type)
# For IFU, the pathloss correction type is not stored in the model.
# Just log it here.
log.info(f"Correction type used: {correction.pathloss_correction_type}")
if not inverse:
data.data /= correction.data
data.err /= correction.data
data.var_poisson /= correction.data**2
data.var_rnoise /= correction.data**2
if data.var_flat is not None and np.size(data.var_flat) > 0:
data.var_flat /= correction.data**2
else:
data.data *= correction.data
data.err *= correction.data
data.var_poisson *= correction.data**2
data.var_rnoise *= correction.data**2
if data.var_flat is not None and np.size(data.var_flat) > 0:
data.var_flat *= correction.data**2
data.pathloss_point = correction.pathloss_point
data.pathloss_uniform = correction.pathloss_uniform
# This might be useful to other steps
data.wavelength = correction.wavelength
# Make sure all NaNs and flags match up in the output data model
match_nans_and_flags(data)
# Set the step status to complete
data.meta.cal_step.pathloss = "COMPLETE"
return correction
[docs]
def do_correction_lrs(data, pathloss, user_slit_loc):
"""
Path loss correction for MIRI LRS fixed-slit.
Data are modified in-place.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The MIRI LRS fixed-slit data to be corrected.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data.
user_slit_loc : float
User-provided slit location in units of arcsec, where ``(0, 0)``
is the center and the edges are +/-0.255 arcsec.
"""
correction = _corrections_for_lrs(data, pathloss, user_slit_loc)
# LRS correction is multiplicative
data.data *= correction.data
data.err *= correction.data
data.var_poisson *= correction.data**2
data.var_rnoise *= correction.data**2
if data.var_flat is not None and np.size(data.var_flat) > 0:
data.var_flat *= correction.data**2
data.pathloss_point = correction.pathloss_point
# This might be useful to other steps
data.wavelength = correction.wavelength
# Make sure all NaNs and flags match up in the output data model
match_nans_and_flags(data)
# Set the step status to complete
data.meta.cal_step.pathloss = "COMPLETE"
return
[docs]
def do_correction_soss(data, pathloss):
"""
Path loss correction for NIRISS SOSS.
NIRISS SOSS pathloss correction is basically a correction for the
flux from the 2nd and 3rd order dispersion that falls outside the
subarray aperture. The correction depends on the pupil wheel position
and column number (or wavelength). The simple option is to do the
correction by column number, then the only interpolation needed is a
1-D interpolation into the pupil wheel position dimension.
Data are modified in-place.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The NIRISS SOSS data to be corrected.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data.
"""
# Omit correction if this is a TSO observation
if data.meta.visit.tsovisit:
log.warning("NIRISS SOSS TSO observations skip the pathloss step")
data.meta.cal_step.pathloss = "SKIPPED"
return
# Get the pupil wheel position
pupil_wheel_position = data.meta.instrument.pupil_position
if pupil_wheel_position is None:
log.warning(
f"Unable to get pupil wheel position from PWCPOS keyword for {data.meta.filename}"
)
log.warning("Pathloss correction skipped")
data.meta.cal_step.pathloss = "SKIPPED"
return
# Get the aperture from the reference file that matches the subarray
subarray = data.meta.subarray.name
aperture = get_aperture_from_model(pathloss, subarray)
if aperture is None:
log.warning(f"Unable to get Aperture from reference file for subarray {subarray}")
log.warning("Pathloss correction skipped")
data.meta.cal_step.pathloss = "SKIPPED"
return
else:
log.info(f"Aperture {aperture.name} selected from reference file")
# Set up pathloss correction array
pathloss_array = aperture.pointsource_data[0]
nrows, ncols = pathloss_array.shape
_, data_ncols = data.data.shape
correction = np.ones(data_ncols, dtype=np.float32)
crpix1 = aperture.pointsource_wcs.crpix1
crval1 = aperture.pointsource_wcs.crval1
cdelt1 = aperture.pointsource_wcs.cdelt1
pupil_wheel_index = crpix1 + (pupil_wheel_position - crval1) / cdelt1 - 1
if pupil_wheel_index < 0 or pupil_wheel_index > (ncols - 2):
log.warning("Pupil Wheel position outside reference file coverage")
log.warning("Setting pathloss correction to 1.0")
else:
ix = int(pupil_wheel_index)
dx = pupil_wheel_index - ix
crpix2 = aperture.pointsource_wcs.crpix2
crval2 = aperture.pointsource_wcs.crval2
cdelt2 = aperture.pointsource_wcs.cdelt2
for row in range(data_ncols):
row_1indexed = row + 1
refrow_index = math.floor(crpix2 + (row_1indexed - crval2) / cdelt2 - 0.5)
if refrow_index < 0 or refrow_index > (nrows - 1):
correction[row] = 1.0
else:
correction[row] = (1.0 - dx) * pathloss_array[
refrow_index, ix
] + dx * pathloss_array[refrow_index, ix + 1]
# Create and apply the 2D correction
pathloss_2d = np.broadcast_to(correction, data.data.shape)
data.data /= pathloss_2d
data.err /= pathloss_2d
data.var_poisson /= pathloss_2d**2
data.var_rnoise /= pathloss_2d**2
if data.var_flat is not None and np.size(data.var_flat) > 0:
data.var_flat /= pathloss_2d**2
data.pathloss_point = pathloss_2d
# Make sure all NaNs and flags match up in the output data model
match_nans_and_flags(data)
# Set step status to complete
data.meta.cal_step.pathloss = "COMPLETE"
def _corrections_for_mos(slit, pathloss, exp_type, source_type=None):
"""
Calculate the correction arrays for MOS slit.
Parameters
----------
slit : `~stdatamodels.jwst.datamodels.SlitModel`
The slit being operated on.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data
exp_type : str
Exposure type
source_type : str or None
Force processing using the specified source type.
Returns
-------
correction : `~stdatamodels.jwst.datamodels.SlitModel`
The correction arrays
"""
correction = None
size = slit.data.size
# Only work on slits with data.size > 0
if size <= 0:
log.warning(f"Slit has data size = {size}")
return correction
# Get centering
xcenter, ycenter = get_center(exp_type, slit)
# Calculate the 1-d wavelength and pathloss vectors
# for the source position
# Get the aperture from the reference file that matches the slit
slitlength = len(slit.shutter_state)
aperture = get_aperture_from_model(pathloss, slit.shutter_state)
if aperture is None:
log.warning(f"Cannot find matching pathloss model for slit with {slitlength} shutters")
return correction
log.info(f"Shutter state = {slit.shutter_state}, using {aperture.name} entry in ref file")
two_shutters = False
if slitlength == 2:
two_shutters = True
if shutter_below_is_closed(slit.shutter_state) and not shutter_above_is_closed(
slit.shutter_state
):
ycenter = ycenter - 1.0
log.info("Shutter below fiducial is closed, using lower region of pathloss array")
if not shutter_below_is_closed(slit.shutter_state) and shutter_above_is_closed(
slit.shutter_state
):
ycenter = ycenter + 1.0
log.info("Shutter above fiducial is closed, using upper region of pathloss array")
(wavelength_pointsource, pathloss_pointsource_vector, is_inside_slitlet) = (
calculate_pathloss_vector(
aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter
)
)
if two_shutters:
(wavelength_uniformsource, pathloss_uniform_vector) = (
calculate_two_shutter_uniform_pathloss(pathloss)
)
else:
(wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector(
aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter
)
# This should only happen if the 2 shutter uniform pathloss calculation has an error
if wavelength_uniformsource is None or pathloss_uniform_vector is None:
log.warning("Unable to calculate 2 shutter uniform pathloss.")
log.warning("Using 3 shutter aperture.")
(wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector(
aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter
)
if not is_inside_slitlet:
log.warning("Source is outside slit.")
return correction
# Wavelengths in the reference file are in meters,
# need them to be in microns
wavelength_pointsource *= 1.0e6
wavelength_uniformsource *= 1.0e6
wavelength_array = slit.wavelength
# Compute the point source pathloss 2D correction
pathloss_2d_ps = interpolate_onto_grid(
wavelength_array, wavelength_pointsource, pathloss_pointsource_vector
)
# Compute the uniform source pathloss 2D correction
pathloss_2d_un = interpolate_onto_grid(
wavelength_array, wavelength_uniformsource, pathloss_uniform_vector
)
# Use the appropriate correction for this slit
if is_pointsource(source_type or slit.source_type):
pathloss_2d = pathloss_2d_ps
correction_type = "POINT"
else:
pathloss_2d = pathloss_2d_un
correction_type = "UNIFORM"
# Save the corrections. The `data` portion is the correction used.
# The individual ones will be saved in the respective attributes.
correction = datamodels.SlitModel(data=pathloss_2d)
correction.pathloss_point = pathloss_2d_ps
correction.pathloss_uniform = pathloss_2d_un
correction.pathloss_correction_type = correction_type
return correction
def _corrections_for_fixedslit(slit, pathloss, exp_type, source_type):
"""
Calculate the correction arrays for fixed-slit.
Parameters
----------
slit : `~stdatamodels.jwst.datamodels.SlitModel`
The slit being operated on.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data
exp_type : str
Exposure type
source_type : str or None
Force processing using the specified source type.
Returns
-------
correction : `~stdatamodels.jwst.datamodels.SlitModel`
The correction arrays
"""
correction = None
# Get centering
xcenter, ycenter = get_center(exp_type, slit)
# Calculate the 1-d wavelength and pathloss vectors for the source position
# Get the aperture from the reference file that matches the slit
aperture = get_aperture_from_model(pathloss, slit.name)
if aperture is None:
log.warning(f"Cannot find matching pathloss model for {slit.name}")
log.warning("Skipping pathloss correction for this slit")
return correction
log.info(f"Using aperture {aperture.name}")
(wavelength_pointsource, pathloss_pointsource_vector, is_inside_slit) = (
calculate_pathloss_vector(
aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter
)
)
(wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector(
aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter
)
if not is_inside_slit:
log.warning(f"Source is outside slit. Skipping pathloss correction for slit {slit.name}")
return correction
# Wavelengths in the reference file are in meters,
# need them to be in microns
wavelength_pointsource *= 1.0e6
wavelength_uniformsource *= 1.0e6
# Use the appropriate correction for this slit
if is_pointsource(source_type or slit.source_type):
# calculate the point source corrected wavelengths and uncorrected wavelengths
# for the slit
wavelength_array_corr = get_wavelengths(slit, use_wavecorr=True)
wavelength_array_uncorr = get_wavelengths(slit, use_wavecorr=False)
# Compute the point source pathloss 2D correction
pathloss_2d_ps = interpolate_onto_grid(
wavelength_array_corr, wavelength_pointsource, pathloss_pointsource_vector
)
# Compute the uniform source pathloss 2D correction
pathloss_2d_un = interpolate_onto_grid(
wavelength_array_uncorr, wavelength_uniformsource, pathloss_uniform_vector
)
pathloss_2d = pathloss_2d_ps
correction_type = "POINT"
else:
wavelength_array = slit.wavelength
# Compute the point source pathloss 2D correction
pathloss_2d_ps = interpolate_onto_grid(
wavelength_array, wavelength_pointsource, pathloss_pointsource_vector
)
# Compute the uniform source pathloss 2D correction
pathloss_2d_un = interpolate_onto_grid(
wavelength_array, wavelength_uniformsource, pathloss_uniform_vector
)
pathloss_2d = pathloss_2d_un
correction_type = "UNIFORM"
# Save the corrections. The `data` portion is the correction used.
# The individual ones will be saved in the respective attributes.
correction = datamodels.SlitModel(data=pathloss_2d)
correction.pathloss_point = pathloss_2d_ps
correction.pathloss_uniform = pathloss_2d_un
correction.pathloss_correction_type = correction_type
return correction
def _corrections_for_ifu(data, pathloss, source_type):
"""
Calculate the correction arrays for IFU.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.SlitModel`
The data being operated on.
pathloss : `~stdatamodels.jwst.datamodels.JwstDataModel`
The pathloss reference data
source_type : str or None
Force processing using the specified source type.
Returns
-------
correction : `~stdatamodels.jwst.datamodels.SlitModel`
The correction arrays
"""
# IFU targets are always inside slit
# Get centering
xcenter, ycenter = get_center(data.meta.exposure.type, None)
# Calculate the 1-d wavelength and pathloss vectors for the source position
aperture = pathloss.apertures[0]
(wavelength_pointsource, pathloss_pointsource_vector, _) = calculate_pathloss_vector(
aperture.pointsource_data, aperture.pointsource_wcs, xcenter, ycenter
)
(wavelength_uniformsource, pathloss_uniform_vector, _) = calculate_pathloss_vector(
aperture.uniform_data, aperture.uniform_wcs, xcenter, ycenter
)
# Wavelengths in the reference file are in meters;
# need them to be in microns
wavelength_pointsource *= 1.0e6
wavelength_uniformsource *= 1.0e6
# Create the 2-d wavelength arrays, initialize with NaNs
wavelength_array = np.zeros(data.shape, dtype=np.float32)
wavelength_array.fill(np.nan)
for this_slice in NIRSPEC_IFU_SLICES:
slice_wcs = nirspec.nrs_wcs_set_input(data, this_slice)
x, y = wcstools.grid_from_bounding_box(slice_wcs.bounding_box)
ra, dec, wavelength = slice_wcs(x, y)
valid = ~np.isnan(wavelength)
x = x[valid]
y = y[valid]
wavelength_array[y.astype(int), x.astype(int)] = wavelength[valid]
# Compute the point source pathloss 2D correction
pathloss_2d_ps = interpolate_onto_grid(
wavelength_array, wavelength_pointsource, pathloss_pointsource_vector
)
# Compute the uniform source pathloss 2D correction
pathloss_2d_un = interpolate_onto_grid(
wavelength_array, wavelength_uniformsource, pathloss_uniform_vector
)
# Use the appropriate correction for the source type
if is_pointsource(source_type or data.meta.target.source_type):
pathloss_2d = pathloss_2d_ps
correction_type = "POINT"
else:
pathloss_2d = pathloss_2d_un
correction_type = "UNIFORM"
# Save the corrections. The `data` portion is the correction used.
# The individual ones will be saved in the respective attributes.
correction = type(data)(data=pathloss_2d)
correction.pathloss_point = pathloss_2d_ps
correction.pathloss_uniform = pathloss_2d_un
correction.wavelength = wavelength_array
correction.pathloss_correction_type = correction_type
return correction
def _corrections_for_lrs(data, pathloss, user_slit_loc):
"""
Calculate the correction arrays for MIRI LRS slit.
Parameters
----------
data : `~stdatamodels.jwst.datamodels.JwstDataModel`
The LRS data being operated on.
pathloss : `~stdatamodels.jwst.datamodels.MirLrsPathlossModel`
The pathloss reference data
user_slit_loc : float
User-provided slit location in units of arcsec, where ``(0, 0)``
is the center and the edges are +/-0.255 arcsec.
Returns
-------
correction : `~stdatamodels.jwst.datamodels.JwstDataModel`
The correction arrays
"""
# Get location of target
xcenter, ycenter, offset_1, offset_2 = get_center(data.meta.exposure.type, data, offsets=True)
log.info(f"Target location relative to aperture center = ({xcenter:.3f}, {ycenter:.3f})")
# Get 1-d wavelength vector from reference file data
wavelength_vector = pathloss.pathloss_table["wavelength"]
# Calculate the 1-d pathloss vector for the source position
pathloss_data = pathloss.pathloss_table["pathloss"]
pathloss_wcs = pathloss.meta.wcsinfo
if user_slit_loc is None:
_, pathloss_vector, is_inside_slit = calculate_pathloss_vector(
pathloss_data, pathloss_wcs, xcenter, ycenter, calc_wave=False
)
else:
log.info(f"Correcting location from provided target center offset: {user_slit_loc} arcsec")
# The slit is oriented with the long axis (the spatial
# axis) horizontal, so the edges in the dispersion direction (the
# narrow axis) would be negative down and positive up. Because the
# slit is only 510 mas across, the edges would be at about
# +/-0.255 arcsec. Hence, the xcenter coordinate remains the same.
ra, dec, wav = data.meta.wcs(offset_1, offset_2)
location = (ra, dec, wav)
# Compute the spatial pixel scale from the WCS. Assume pixels are square.
scale_degrees = compute_scale(
data.meta.wcs, location, disp_axis=data.meta.wcsinfo.dispersion_direction
)
scale_arcsec = scale_degrees * 3600.0
user_slit_loc_pix = user_slit_loc / scale_arcsec
yusr_recenter = ycenter + user_slit_loc_pix
log.info(f"New target location = ({xcenter:.3f}, {yusr_recenter:.3f})")
_, pathloss_vector, is_inside_slit = calculate_pathloss_vector(
pathloss_data, pathloss_wcs, xcenter, yusr_recenter, calc_wave=False
)
if not is_inside_slit:
log.info("Source is outside slit. Correction defaulting to center of the slit.")
xcenter, ycenter = 0.0, 0.0
_, pathloss_vector, is_inside_slit = calculate_pathloss_vector(
pathloss_data, pathloss_wcs, xcenter, ycenter, calc_wave=False
)
# Populate 2-D wavelength array from WCS info
wavelength_array = get_wavelengths(data)
# MIRI LRS pathloss reference file data are in reverse order,
# so flip them here
wavelength_vector = wavelength_vector[::-1]
pathloss_vector = pathloss_vector[::-1]
# Compute the point source pathloss 2D correction
pathloss_2d = interpolate_onto_grid(wavelength_array, wavelength_vector, pathloss_vector)
# Save the corrections. The `data` portion is the correction used.
# The individual ones will be saved in the respective attributes.
correction = datamodels.ImageModel(data=pathloss_2d)
correction.pathloss_point = pathloss_2d
correction.wavelength = wavelength_array
return correction