"""JWST-specific Step and Pipeline base classes."""
import logging
import warnings
from copy import deepcopy
from functools import partial
from pathlib import Path
from stdatamodels.exceptions import ValidationWarning
from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import JwstDataModel, read_metadata
from stpipe import Pipeline, crds_client
from stpipe import Step as _Step
from jwst import __version__, __version_commit__
from jwst.datamodels import ModelContainer, ModelLibrary
from jwst.lib import exposure_types
from jwst.lib.suffix import remove_suffix
from jwst.stpipe._cal_logs import _LOG_FORMATTER
log = logging.getLogger(__name__)
__all__ = ["JwstStep", "JwstPipeline"]
[docs]
class JwstStep(_Step):
"""A JWST pipeline step (``jwst.stpipe.Step``)."""
spec = """
output_ext = string(default='.fits') # Output file type
""" # noqa: E501
_log_records_formatter = _LOG_FORMATTER
@classmethod
def _datamodels_open(cls, init, **kwargs):
return datamodels.open(init, **kwargs)
@classmethod
def _get_crds_parameters(cls, dataset):
"""
Get CRDS parameters for the given dataset.
If the input dataset is a filename, achieve this by lazy-loading its metadata.
Parameters
----------
dataset : str
The name of the dataset.
Returns
-------
dict
A dictionary of CRDS parameters.
str
The name of the observatory.
"""
crds_observatory = "jwst"
# list or container: just set to zeroth model
# this is what stpipe does internally for ModelContainer already
if isinstance(dataset, (list, tuple, ModelContainer)):
if len(dataset) == 0:
raise ValueError(f"Input dataset {dataset} is empty")
dataset = dataset[0]
# Already open: use the model's method to get CRDS parameters
if isinstance(dataset, (ModelLibrary, JwstDataModel)):
return (
dataset.get_crds_parameters(),
crds_observatory,
)
# If we get here, we had better have a filename
if isinstance(dataset, str):
dataset = Path(dataset)
if not isinstance(dataset, Path):
raise TypeError(f"Cannot get CRDS parameters for {dataset} of type {type(dataset)}")
# for associations, open as ModelLibrary, which supports lazy-loading
if dataset.suffix.lower() == ".json":
model = ModelLibrary(dataset, asn_n_members=1, asn_exptypes=["science"])
return (model.get_crds_parameters(), crds_observatory)
# for all other cases, use read_metadata directly to lazy-load
return (read_metadata(dataset, flatten=True), crds_observatory)
[docs]
@staticmethod
def get_stpipe_loggers():
"""
Get the names of loggers to configure.
Returns
-------
loggers : tuple of str
Tuple of log names to configure.
"""
# Specify the log names for any dependencies whose
# loggers we want to configure and for the special "py.warnings"
# logger which is the source of warning log messages for python warnings
return ("jwst", "stcal", "stdatamodels", "stpipe", "tweakwcs", "CRDS", "py.warnings")
[docs]
def load_as_level2_asn(self, obj):
"""
Load object as an association.
Loads the specified object into a Level2 association.
If necessary, prepend ``Step.input_dir`` to all members.
Parameters
----------
obj : object
Object to load as a Level2 association
Returns
-------
association : object
Association from ``jwst.associations.lib.rules_level2_base.DMSLevel2bBase``
"""
# Prevent circular import:
from jwst.associations.lib.update_path import update_key_value
from jwst.associations.load_as_asn import LoadAsLevel2Asn
asn = LoadAsLevel2Asn.load(obj, basename=self.output_file)
update_key_value(asn, "expname", (), mod_func=self.make_input_path)
return asn
[docs]
def load_as_level3_asn(self, obj):
"""
Load object as an association.
Loads the specified object into a Level3 association.
If necessary, prepend ``Step.input_dir`` to all members.
Parameters
----------
obj : object
Object to load as a Level3 association
Returns
-------
association : object
Association from ``jwst.associations.lib.rules_level3_base.DMS_Level3_Base``
"""
# Prevent circular import:
from jwst.associations.lib.update_path import update_key_value
from jwst.associations.load_as_asn import LoadAsAssociation
asn = LoadAsAssociation.load(obj)
update_key_value(asn, "expname", (), mod_func=self.make_input_path)
return asn
@staticmethod
def _ramp_type(input_data):
"""
Get the appropriate ramp type for the input data.
Parameters
----------
input_data : str or `~stdatamodels.jwst.datamodels.JwstDataModel`
Input filename or datamodel.
Returns
-------
ramp_type : class
The ramp class appropriate to the data. May be
`~stdatamodels.jwst.datamodels.RampModel`,
`~stdatamodels.jwst.datamodels.SuperstripeRampModel`, or
`~stdatamodels.jwst.datamodels.GuiderRawModel`.
"""
if isinstance(input_data, datamodels.JwstDataModel):
exp_type = str(input_data.meta.exposure.type).lower()
nstripe = input_data.meta.subarray.num_superstripe
else:
metadata = read_metadata(input_data, flatten=True)
exp_type = str(metadata.get("meta.exposure.type", None)).lower()
nstripe = metadata.get("meta.subarray.num_superstripe", None)
if exp_type in exposure_types.FGS_GUIDE_EXP_TYPES:
return datamodels.GuiderRawModel
if nstripe is not None and nstripe > 0:
return datamodels.SuperstripeRampModel
return datamodels.RampModel
[docs]
def prepare_output(
self,
init,
make_copy=None,
open_models=True,
open_as_type=None,
open_as_ramp=False,
**kwargs,
):
"""
Open the input data as a model, making a copy if necessary.
If the input data is a filename or path, it is opened
and the open model is returned.
If it is a list of models, it is opened as a ModelContainer.
In this case, or if the input is a simple datamodel or a
ModelContainer, a deep copy of the model/container is returned,
in order to avoid modifying the input models.
If the input is a ModelLibrary, it is simply returned, in order
to avoid making unnecessary copies for performance-critical
use cases.
All copies are skipped if this step has a parent (i.e. it is
called as part of a pipeline).
Set make_copy explicitly to True or False to override the above
behavior.
Parameters
----------
init : str, list, JwstDataModel, ModelContainer, or ModelLibrary
Input data to open.
make_copy : bool or None, optional
If True, a copy of the input will always be made.
If False, a copy will never be made. If None, a copy is
conditionally made, depending on the input and whether the
step is called in a standalone context.
open_models : bool, optional
If True and the input is a filename or list of filenames,
then datamodels.open will be called to open the input.
If False, the input is returned as is.
open_as_type : class or None, optional
If provided, the input will be opened as the specified class
before returning. Intended for use with simple datamodel input
only: container types and associations should be handled directly
in the calling code.
open_as_ramp : bool, optional
If True, the input will be opened as one of several ramp-type models,
depending on the metadata in the file. If the exposure type indicates
a guider, the input is opened as `~stdatamodels.jwst.datamodels.GuiderRawModel`.
If the input has superstripes, it is opened as
`~stdatamodels.jwst.datamodels.SuperstripeRampModel`. Otherwise,
it is opened as `~stdatamodels.jwst.datamodels.RampModel`. Ignored
if ``open_as_type`` is not None.
**kwargs
Additional keyword arguments to pass to datamodels.open. Used
only if the input is a str or list.
Returns
-------
model : JwstDataModel or ModelContainer or ModelLibrary
The opened datamodel(s).
Raises
------
TypeError
If make_copy=True and the input is a type that cannot be copied.
"""
# Check whether input contains datamodels
copy_needed = False
if isinstance(init, list):
is_datamodel = [isinstance(m, datamodels.JwstDataModel) for m in init]
if any(is_datamodel):
# Make the list into a ModelContainer, since it contains models
init = ModelContainer(init)
copy_needed = True
elif isinstance(init, (datamodels.JwstDataModel, ModelContainer)):
copy_needed = True
# Input might be a filename or path.
# In that case, open it if desired.
if not isinstance(init, (datamodels.JwstDataModel, ModelLibrary, ModelContainer)):
if open_models:
if open_as_type is not None:
# It is assumed the provided class is appropriate for the input.
input_models = open_as_type(init, **kwargs)
elif open_as_ramp:
ramp_type = self._ramp_type(init)
input_models = ramp_type(init, **kwargs)
else:
input_models = datamodels.open(init, **kwargs)
else:
# Return the filename or path -
# the calling code will handle opening it as needed.
input_models = init
elif isinstance(init, datamodels.JwstDataModel):
# Simple data model: update the datamodel type if needed
if open_as_type is not None and type(init) is not open_as_type:
# This will make a shallow copy.
input_models = open_as_type(init, **kwargs)
elif open_as_ramp:
ramp_type = self._ramp_type(init)
if type(init) is ramp_type:
input_models = init
else:
# Try to convert; may raise ValidationWarning
try:
with warnings.catch_warnings():
warnings.filterwarnings(
"error",
message=r"(?s:.*)Array datatype .* not compatible",
category=ValidationWarning,
)
input_models = ramp_type(init, **kwargs)
except ValidationWarning as err:
log.error(err)
# Inform the user and raise a clearer error message
msg = (
"Input data model is not compatible. "
f"The file should be opened as a {ramp_type.__name__} "
"before calling the step."
)
log.error(msg)
raise TypeError(msg) from None
else:
# Otherwise use the init model directly
input_models = init
else:
# ModelContainer or ModelLibrary: use the init model directly.
input_models = init
# Make a deep copy if needed
if make_copy is None:
make_copy = copy_needed and self.parent is None
if make_copy:
try:
input_models = input_models.copy()
except AttributeError:
# This should only happen if make_copy is explicitly set to
# True and the input is a string or a ModelLibrary.
raise TypeError(
f"Copy is not possible for input type {type(input_models)}"
) from None
return input_models
[docs]
def add_asn_id_to_output_name(self, models):
"""
Set up output path name to include the association ID.
The input models are checked for an ASN ID in either a ModelContainer
``asn_table`` attribute or a ModelLibrary ``asn`` attribute.
If not found, the current step and its parents are searched
for an ``asn_id`` attribute.
If an ASN ID is found, the ``_make_output_path`` function is updated
to include it in output filenames.
If no ASN ID is found, ``_make_output_path`` is updated to pass
``asn_id=None``. This will override any previously passed ASN IDs, so
that no ASN ID appears in the output filename.
Parameters
----------
models : `~stdatamodels.jwst.datamodels.JwstDataModel`, \
`~jwst.datamodels.container.ModelContainer`, or \
`~jwst.datamodels.library.ModelLibrary`
The model or models to search for an ASN ID.
Returns
-------
asn_id : str or None
The ASN ID, as found in the models or step.
"""
# Check the input models for an association ID
try:
if isinstance(models, ModelLibrary):
asn_id = models.asn["asn_id"]
elif isinstance(models, ModelContainer):
asn_id = models.asn_table["asn_id"]
else:
asn_id = None
except (AttributeError, KeyError):
asn_id = None
if asn_id is None:
# This will return None if not found
asn_id = self.search_attr("asn_id")
_make_output_path = self.search_attr("_make_output_path", parent_first=True)
self._make_output_path = partial(_make_output_path, asn_id=asn_id)
return asn_id
[docs]
def finalize_result(self, result, reference_files_used):
"""
Update the result with the software version and reference files used.
Parameters
----------
result : `~stdatamodels.jwst.datamodels.JwstDataModel`
The output data model to be updated.
reference_files_used : list of tuple
The names and file paths of reference files used.
"""
if isinstance(result, JwstDataModel):
result.meta.calibration_software_revision = __version_commit__ or "RELEASE"
result.meta.calibration_software_version = __version__
if len(reference_files_used) > 0:
for ref_name, filename in reference_files_used:
if hasattr(result.meta.ref_file, ref_name):
getattr(result.meta.ref_file, ref_name).name = filename
result.meta.ref_file.crds.sw_version = crds_client.get_svn_version()
result.meta.ref_file.crds.context_used = crds_client.get_context_used(
result.crds_observatory
)
if self.parent is None:
log.info(f"Results used CRDS context: {result.meta.ref_file.crds.context_used}")
if self.class_alias:
if not hasattr(result, "cal_logs"):
result.cal_logs = {}
if self.parent is None or not self.parent.class_alias:
setattr(result.cal_logs, self.class_alias, self._log_records)
else: # Capture step log as pipeline log
setattr(
result.cal_logs,
self.parent.class_alias,
deepcopy(self.parent._log_records), # noqa: SLF001
)
[docs]
def remove_suffix(self, name):
"""
Remove the suffix if a known suffix is already in name.
Parameters
----------
name : str
The name to remove the suffix from.
Returns
-------
name : str
The name with the suffix removed.
"""
return remove_suffix(name)
[docs]
def run(self, *args):
"""
Run the step.
Parameters
----------
*args
Arguments passed to `stpipe.Step.run`.
Returns
-------
result : Any
The step output
"""
result = super().run(*args)
if not self.parent:
log.info(f"Results used jwst version: {__version__}")
return result
[docs]
class JwstPipeline(Pipeline, JwstStep):
"""
A JWST pipeline (``jwst.stpipe.Pipeline``).
JwstPipeline needs to inherit from Pipeline, but also
be a subclass of JwstStep so that it will pass checks
when constructing a pipeline using JwstStep class methods.
"""
[docs]
def finalize_result(self, result, _reference_files_used):
"""
Update the result with the software version and reference files used.
Parameters
----------
result : `~stdatamodels.jwst.datamodels.JwstDataModel`
The output data model to be updated.
_reference_files_used : list of tuple
The names and file paths of reference files used.
"""
if isinstance(result, JwstDataModel):
log.info(
"Results used CRDS context: "
f"{crds_client.get_context_used(result.crds_observatory)}"
)
if self.class_alias:
if not hasattr(result, "cal_logs"):
result.cal_logs = {}
setattr(result.cal_logs, self.class_alias, self._log_records)