import os
import sys
import glob
import shutil
import pandas as pd
import numpy as np
from argparse import ArgumentParser
from spacekit.logger.log import Logger
from spacekit.extractor.scrape import JsonScraper
from spacekit.preprocessor import FALSEVALS
from spacekit.preprocessor.scrub import HstSvmScrubber, JwstCalScrubber
from spacekit.generator.draw import DrawMosaics
from spacekit.skopes.jwst.cal.config import KEYPAIR_DATA, L3_TYPES
from spacekit.analyzer.track import timer, record_metrics, xtimer
[docs]
class SvmAlignmentIngest:
"""Class for ingesting and preprocessing HST single visit mosaic alignment classifier datasets
Parameters
----------
input_path : str or Path, optional
path on local disk to the input data, by default None
outpath : str or Path, optional
path on local disk to save outputs, by default None
"""
def __init__(self, input_path=None, outpath=None):
self.input_path = os.getcwd() if input_path is None else input_path
self.batch_out = os.getcwd() if outpath is None else outpath
self.log_dir = None
self.clean = True
self.visit_data = []
self.data_paths = []
self.json_pattern = "*_total*_svm_*.json"
self.crpt = 0
self.draw = 1
self.img_outputs = os.path.join(self.batch_out, "img")
def start(self, func, ps="prep", visit=None, **args):
t0, start = timer()
func.__call__(**args)
wall, clock = timer(t0=t0, clock=start)
record_metrics(self.log_dir, visit, wall, clock, ps=ps)
def prep_svm_batch(self, batch_name="drz", drz_ver="3.4.1"):
if drz_ver:
batch_name += f"_{''.join(drz_ver.split('.'))}"
self.run_preprocessing(self, fname=batch_name, output_path=self.batch_out)
def prep_single_visit(self, visit_path):
visit = str(os.path.basename(visit_path))
drz_file = glob.glob(f"{visit_path}/*total*.fits")
if len(drz_file) > 0:
dets = [drz.split("/")[-1].split("_")[4] for drz in drz_file]
try:
input_path = os.path.dirname(visit_path)
for det in dets:
_, _ = self.run_preprocessing(
input_path,
fname=f"{visit}_{det.lower()}_data",
output_path=self.batch_out,
visit=visit,
)
if self.clean is True:
shutil.rmtree(visit_path)
except Exception as e:
print(e)
sys.exit(1)
[docs]
def run_preprocessing(
self,
h5=None,
fname="svm_data",
visit=None,
):
"""Scrapes SVM data from raw files, preprocesses dataframe for MLP classifier and generates png images for image CNN.
#TODO: if no JSON files found, look for results_*.csv file instead and preprocess via alternative method
Parameters
----------
input_path : str
path to SVM dataset directory
h5 : str, optional
load from existing hdf5 file, by default None
fname : str, optional
base filename to give the output files, by default "svm_data"
output_path : str, optional
where to save output files. Defaults to current working directory, by default None
json_pattern : str, optional
glob-based search pattern, by default "*_total*_svm_*.json"
visit: str, optional
single visit name (e.g. "id8f34") matching subdirectory of input_path; will search and preprocess this visit only
(rather than all visits contained in the input_path), by default None
crpt : int, optional
set to 1 if using synthetic corruption data, by default 0
draw : int, optional
generate png images from dataset, by default 1
Returns
-------
dataframe
preprocessed Pandas dataframe
"""
os.makedirs(self.batch_out, exist_ok=True)
fname = os.path.basename(fname).split(".")[0]
# 1: SCRAPE JSON FILES and make dataframe
if h5 is None:
search_path = os.path.join(self.input_path, visit) if visit else self.input_path
patterns = self.json_pattern.split(",")
jsc = JsonScraper(
search_path=search_path,
search_patterns=patterns,
file_basename=fname,
crpt=self.crpt,
output_path=self.batch_out,
)
jsc.json_harvester()
else:
jsc = JsonScraper(h5_file=h5).load_h5_file()
# 2: Scrape Fits Files and SCRUB DATAFRAME
scrub = HstSvmScrubber(
self.input_path,
data=jsc.data,
output_path=self.batch_out,
output_file=fname,
crpt=self.crpt,
)
scrub.preprocess_data()
# 3: DRAW IMAGES
if self.draw:
img_outputs = os.path.join(self.batch_out, "img")
mos = DrawMosaics(
self.input_path,
output_path=img_outputs,
fname=scrub.data_path,
pattern="",
gen=3,
size=(24, 24),
crpt=self.crpt,
)
mos.generate_total_images()
self.visit_data.append(scrub.df)
self.data_paths.append(scrub.data_path)
print(f"DATA PATH: {scrub.data_path}\n")
print(scrub.df_visit)
return scrub.df, scrub.data_path
def concat_prepped(dpath):
datafiles = glob.glob(f"{dpath}/??????_*data.csv")
for i, fpath in enumerate(datafiles):
df_visit = pd.read_csv(fpath, index_col="index")
if i == 0:
df = df_visit
else:
df = pd.concat([df, df_visit], axis=0)
if "index" not in df.columns:
df["index"] = df.index
df.to_csv(f"{dpath}/preprocessed.csv", index=False)
def concat_raw(dpath):
rawfiles = glob.glob(f"{dpath}/raw_*_data.csv")
for i, raw in enumerate(rawfiles):
df_raw = pd.read_csv(raw, index_col="index")
if i == 0:
df = df_raw
else:
df = pd.concat([df, df_raw], axis=0)
if "index" not in df.columns:
df["index"] = df.index
df.to_csv(f"{dpath}/raw_combined.csv", index=False)
def final_cleanup(df, dpath):
print("Cleaning up...")
csvfiles = glob.glob(f"{dpath}/*_data.csv")
h5files = glob.glob(f"{dpath}/*_data.h5")
rawfiles = glob.glob(f"{dpath}/raw_*_data.csv")
filegroups = [csvfiles, h5files, rawfiles]
for grp in filegroups:
if len(df) == len(grp):
for f in grp:
os.remove(f)
print(f"Cleaned up {len(grp)} files")
else:
print(f"{len(df)} in DF does not match {len(grp)} in filegroup. Skipping cleanup")
[docs]
class JwstCalIngest:
"""Loads raw JWST Calibration Pipeline metadata from local disk (`input_path`)
and runs initial ML preprocessing steps necessary prior to model training. The resulting
dataframes will be "ingested" into any pre-existing training sets located in `outpath`.
This outpath acts as the primary database containing several "tables" (dataframes stored
in .csv files). This class is designed to run on single or multiple files at a time
(limit specificity using 'pfx`).
Input file naming convention: YYYY-MM-DD_%d.csv (%d = day of year) ex: 2024-02-21_052.csv
Alternate formats currently not supported because filenames are used to store date info.
Examples:
- To ingest multiple files from November 2023, set `pfx="2023-11"`.
- To ingest only one file from January 3, 2024, set `pfx="2024-01-03"`.
- You can also pass in a wildcard: `pfx="*_3"` would search for all data collected on days 300-365 of any year, while `pfx="2023*_3"` would do the same but only for the year 2023.
The contents of raw metadata files are expected to contain:
- 1) columns consistent with Fits header keyword-values used in JWST Cal model training (see `spacekit.skopes.jwst.cal.config`)
- 2) rows of Level 1/1b exposures (inputs/features) along with Level 3 products
- 3) imagesize (memory footprint) for each L3 product (outputs/target)
Parameters
----------
input_path : str (path), optional
directory path to csv files on local disk, by default None (current working directory)
pfx : str, optional
filename start pattern (e.g. "2023" or "*-12-), by default ""
outpath : str (path), optional
directory path to save (and/or update) preprocessed files on local disk, by default None (current working directory)
save_l1 : bool, optional
save matched level 1 input data to separate file, by default True
"""
def __init__(self, input_path=None, pfx="", outpath=None, save_l1=False, **log_kws):
self.input_path = input_path.rstrip("/") if input_path is not None else os.getcwd()
self.pfx = "" if pfx is None else pfx
self.save_l1 = save_l1
self.set_outpath(value=outpath)
self.exp_types = ["IMAGE", "SPEC", "TAC", "FGS"]
self.files = []
self.idxcol = "Dataset"
self.dag = "DagNodeName"
self.df = None
self.l1_dags = []
self.l3_dags = []
self.data = {}
self.raw = {}
self.product_matches = None
self.exmatches = {}
self.rem = {}
self.param_cols = ["pid", "OBSERVTN", "FILTER", "GRATING", "PUPIL", "SUBARRAY", "FXD_SLIT", "EXP_TYPE", "BAND"]
self.scrb = None
self.__name__ = "JwstCalIngest"
self.log = Logger(self.__name__, **log_kws).spacekit_logger()
@property
def float_cols(self):
return self._float_cols()
def _float_cols(self):
return ["CRVAL1", "CRVAL2", "RA_REF", "DEC_REF", "GS_RA", "GS_DEC", "GS_MAG", "TARG_RA", "TARG_DEC"]
[docs]
def set_outpath(self, value=None):
"""Initialize class variables relating to file paths on local disk where ingested data will be stored.
If nothing is passed into the `value` kwarg, the default base path for outputs will be the same as inputs.
Parameters
----------
value : str or Path, optional
custom path to a directory where output files will be saved, by default None
"""
if value is None:
value = str(self.input_path)
self.outpath = value.rstrip("/")
os.makedirs(self.outpath, exist_ok=True)
self.ingest_file = os.path.join(self.outpath, "ingest.csv")
self.trainpath = self.outpath + "/train-{}.csv"
self.rempath = self.outpath + "/rem-{}.csv"
self.rawpath = self.outpath + "/raw-{}.csv"
@xtimer
def run_ingest(self):
"""Main calling function to run the entire ingest script."""
self.ingest_data()
if len(self.files) == 0:
return
self.initial_scrub()
self.load_priors()
self.scrub_exposures()
self.extrapolate()
self.save_ingest_data()
self.save_training_sets()
if self.l3 is not None:
self.log.error(f"Houston, we have problem: {len(self.l3)} disconnected L3 product(s) floating in space")
sys.exit(1)
[docs]
def ingest_data(self):
"""Loads all relevant files to be ingested into a single dataframe, adding columns for date, year and day of year (`doy`)
based on the file names to demarcate the file from which each dataset originated. Additionally, only observations relating to
jwst calibration levels 1 and 3 are kept, while the rest are dropped.
"""
if len(self.files) == 0:
self.read_files()
for f in self.files:
df = pd.read_csv(f, index_col=self.idxcol)
df = self.drop_level2(df)
filedate, day = os.path.basename(f).split("_")
df["date"] = filedate
df["year"] = filedate.split("-")[0]
df["doy"] = int(day.split(".")[0])
if self.df is None:
self.df = df
else:
self.df = pd.concat([self.df, df], axis=0)
if self.df is not None:
self.log.info(f"{len(self.df)} datasets loaded from {len(self.files)} file(s)")
[docs]
def read_files(self):
"""Collects a list of filenames to be ingested from local disk according to the glob pattern
combining `input_path` and `pfx` ending with `.csv`. A warning is issued if no files matching the pattern
are found. The list of files are stored in the class attribute `files`.
"""
pattern = f"{self.input_path}/{self.pfx}*.csv"
files = sorted(glob.glob(pattern))
self.files = [f for f in files if f not in self.files]
if len(self.files) < 1:
self.log.warning(f"No files found using pattern: {pattern}")
else:
self.log.debug(f"Files ready for ingest: {self.files}")
[docs]
def drop_level2(self, df):
"""Determines which `dag` column values relate to Level 1 and Level 3 according to their names,
then drops any rows from the DataFrame that do not match these values. Note: starting on 6/13/2025,
a change in the data collection process added a new `dag` value 'ESTIMATE_LEVEL_3_MEMORY' which
is unrelated to the actual processing of a dataset on its designated server node and therefore rows matching
this value are also removed.
Parameters
----------
df : pandas.DataFrame
dataframe to search and modify
Returns
-------
pandas.DataFrame
dataframe with only L1 and L3 datasets
"""
alldags = sorted(list(df[self.dag].value_counts().index))
l1_dags = [d for d in alldags if "1" in d]
l3_dags = [d for d in alldags if "3" in d and "MEMORY" not in d]
dags_l1_l3 = l1_dags + l3_dags
df = df.loc[df[self.dag].isin(dags_l1_l3)]
self.l1_dags.extend([l for l in l1_dags if l not in self.l1_dags])
self.l3_dags.extend([l for l in l3_dags if l not in self.l3_dags])
return df
[docs]
def initial_scrub(self):
"""Initial preprocessing renames and adds several columns, sets the df index to Dataset, recasts datatypes,
and drops the following:
- older duplicates and exposure types known to be unrelated to Level 3 processing
- redundant MIRI IFU products (only 1 channel per dataset is kept)
- mosaics (estimates for L3 datasets used to create a mosaic accurately reflect compute requirements)
"""
if self.df is None:
return
self.df["dname"] = self.df.index
self.df["dname"] = self.df["dname"].apply(lambda x: self.strip_file_suffix(x))
self.df.rename({"ImageSize": "imagesize", self.dag: "dag"}, axis=1, inplace=True)
self.dag = "dag"
self.df["pid"] = self.df["dname"].apply(lambda x: self.extract_pid(x))
self.df = self.recast_dtypes(self.df)
n0 = len(self.df)
self.df = self.df.sort_values(by=["date", "imagesize"]).drop_duplicates(subset="dname", keep="last")
self.log.info(f"Dropped {n0 - len(self.df)} duplicates.")
nonsci = self.df.loc[~self.df["EXP_TYPE"].isin(L3_TYPES)]
self.df.drop(nonsci.index, axis=0, inplace=True)
self.log.info(f"Dropped {len(nonsci)} non-L3 exposure types")
self.set_params()
self.reduce_mirifu_channels()
self.drop_mosaics()
self.df["Dataset"] = self.df["dname"]
self.df.set_index("Dataset", inplace=True)
[docs]
def reduce_mirifu_channels(self):
"""Append channel info to `params` string; drop MIRI IFU L3 products from channels 2,4 (keep only 1, 3).
Channels 1-2 use the same input exposures, and the same goes for channels 3-4.
NOTE: The memory footprint for each L3 product is the same regardless of channel or subchannel ('band'),
so the inclusion of L3 products from both channels 1 and 3 is likely to be redundant for ML training purposes.
The resulting metadata features will show some variability between ch1/2 and 3/4 L3 products because the input exposures
are distinct. Further analysis is needed to determine if such variability simply adds noise to the training set, and a decision should be made at training time whether or not to include both channels. Adjustments to inference preprocessing may need to be made so that the model simply ignores channel/subchannel altogether and treats the entire group of inputs as pertaining to a single L3 product (jw_PID_OBS_TRG_miri_). The memory footprint estimates for each individual channel/subchannel combination can be inferred from a single inference output and applied to all relevant 'subproducts'.
"""
for d, c in dict(zip(["MIRIFUSHORT", "MIRIFULONG"], ["-12", "-34"])).items():
self.df.loc[self.df["DETECTOR"] == d, "params"] = self.df.loc[self.df["DETECTOR"] == d].params.values + c
self.mm = self.df.loc[
(self.df["EXP_TYPE"] == "MIR_MRS") & (self.df[self.dag].isin(self.l3_dags)) & (self.df["CHANNEL"].isin(["2", "4"]))
]
if len(self.mm) > 0:
drops = self.mm.index
self.log.info(f"Ignoring MIRI IFU channels 2,4 for {len(drops) / 2} L3 products")
self.df.drop(drops, axis=0, inplace=True)
[docs]
def load_and_recast(self, dpath, idxcol=None):
"""Loads in a dataframe from file on local disk generated by a prior ingest and recasts data types as needed
for certain columns where that information is lost during a save.
Parameters
----------
dpath : str or Path
path on local disk where file is stored
idxcol : str, optional
custom index column name, by default None
Returns
-------
pandas.DataFrame
df loaded with columns recasted as necessary
"""
if not os.path.exists(dpath):
self.log.warning(f"File does not exist at specified path: {dpath}")
return
idxcol = self.idxcol if idxcol is None else idxcol
df = pd.read_csv(dpath, index_col=idxcol)
return self.recast_dtypes(df)
[docs]
def recast_dtypes(self, df):
"""When loading a saved dataframe, some datatypes need to be recast appropriately
in order to be able to edit existing / insert new values.
Parameters
----------
df : pandas.DataFrame
dataframe to be recast
Returns
-------
pandas.DataFrame
recasted dataframe
"""
df["OBSERVTN"] = df["OBSERVTN"].apply(lambda x: self.validate_obs(x))
df["PROGRAM"] = df["PROGRAM"].apply(lambda x: "{:0>5}".format(x))
for col in self.float_cols:
df[col] = df[col].apply(lambda x: self.convert_to_float(x))
df["year"] = df["year"].astype("int64")
df["date"] = pd.to_datetime(df["date"], yearfirst=True)
df["TARG_RA"] = df["TARG_RA"].apply(lambda x: np.round(x, 8))
# fine-grained matching param without affecting original values
df["targra"] = df["TARG_RA"].apply(lambda x: np.round(x, 6))
return df
[docs]
def load_priors(self):
"""Loads previously ingested but unmatched datasets from 'ingest.csv' file located in
`output_path` on local disk. Checks the `params` column and extracts any that match
the current ingest dataframe in order to attempt a new match. This is necessary for some
datasets which take multiple days to complete processing.
"""
if not os.path.exists(self.ingest_file):
self.log.debug("Prior data not found -- skipping.")
return
l3params = self.df.loc[self.df.dag.isin(self.l3_dags)].params.unique()
self.log.info("Checking prior data")
di = self.load_and_recast(self.ingest_file)
di = di.sort_values(by="date").drop_duplicates(subset="dname", keep="last")
ds = di.loc[di.params.isin(l3params)].copy()
if len(ds) > 0:
self.log.info(f"Prior data loaded successfully: {len(ds)} exposures added.")
try:
self.df = pd.concat([self.df, ds], axis=0)
except Exception as e:
self.log.error(str(e))
self.update_dags()
self.df = self.df.sort_values(by="date").drop_duplicates(subset="dname", keep="last")
di.drop(ds.index, axis=0, inplace=True)
di[self.idxcol] = di.index
di.to_csv(self.ingest_file, index=False)
[docs]
def update_dags(self):
"""Update the lists of l1 and l3 dag values once a collection of datasets from multiple files are ingested
(including priors loaded from ingest.csv).
"""
alldags = sorted(list(self.df[self.dag].value_counts().index))
self.l1_dags = [d for d in alldags if "1" in d]
self.l3_dags = [d for d in alldags if "3" in d and "MEMORY" not in d]
[docs]
def set_params(self):
"""Creates a new dataframe column containing a concatenated string of keywords that uniquely identify a group of
related L1 inputs and their L3 output. This is used (in combination with other columns such as targ_ra/dec to match
L1 exposures with their L3 product). WFSC params are generated separately.
"""
wftypes = ["PRIME_WFSC_SENSING_ONLY", "PRIME_WFSC_ROUTINE", "PRIME_WFSC_SENSING_CONTROL"]
wfcols = ["pid", "OBSERVTN", "FILTER", "PUPIL", "DETECTOR"]
wfsc = self.df.loc[self.df["VISITYPE"].isin(wftypes)].copy()
self.df.drop(wfsc.index, axis=0, inplace=True)
if len(self.df) > 0:
params = list(
map(lambda x: "-".join([str(y) for y in x if str(y) not in FALSEVALS]), self.df[self.param_cols].values)
)
self.df["params"] = pd.DataFrame(params, index=self.df.index)
if len(wfsc) > 0:
wfparams = list(map(lambda x: "-".join([str(y) for y in x if str(y) not in FALSEVALS]), wfsc[wfcols].values))
wfsc["params"] = pd.DataFrame(wfparams, index=wfsc.index)
self.df = pd.concat([self.df, wfsc], axis=0)
@staticmethod
def save_kwargs(path):
if not os.path.exists(path):
return dict(index=False)
return dict(mode="a", index=False, header=False)
@staticmethod
def strip_file_suffix(x):
if x.endswith("fits"):
x = "_".join(x.split("_")[:-1])
return x
@staticmethod
def extract_pid(x):
if not isinstance(x, str):
return x
pid = x[2:7]
if pid[0] == "0":
pid = pid[1:]
return int(pid)
@staticmethod
def validate_obs(x):
return "{:0>3}".format(x)
@staticmethod
def convert_to_float(x):
if x != "NONE":
return float(x)
else:
return np.nan
[docs]
@staticmethod
def mark_mosaics(x):
"""Identify mosaic L3 products based on the dataset's name format.
Parameters
----------
x : str
Dataset name
Returns
-------
bool
True if the dataset name is a mosiac otherwise False
"""
if len(x.split("-")) < 2:
return False
elif x.split("-")[1][0] != "c":
return False
return True
[docs]
def drop_mosaics(self):
"""Separate mosaic L3 products and save to `mosaics.csv` on local disk."""
self.df["mosaic"] = self.df["dname"].apply(lambda x: self.mark_mosaics(x))
mosaics = self.df.loc[self.df["mosaic"]].copy()
if len(mosaics) > 0:
mosaics.drop("mosaic", axis=1, inplace=True)
mpath = f"{self.outpath}/mosaics.csv"
mosaics[self.idxcol] = mosaics.index
mosaics.to_csv(mpath, **self.save_kwargs(mpath))
self.log.info(f"Mosaic data saved to: {mpath}")
self.log.info(f"Dropping {len(mosaics.index)} mosaics from ingest data")
self.df.drop(mosaics.index, axis=0, inplace=True)
self.df.drop("mosaic", axis=1, inplace=True)
[docs]
def scrub_exposures(self):
"""Preprocess the L1 input exposures through the JWST Scrubber. See JwstCalScrubber for details."""
self.scrb = JwstCalScrubber(
self.input_path, data=self.df.loc[self.df[self.dag].isin(self.l1_dags)], encoding_pairs=KEYPAIR_DATA, mode="df"
)
for exp_type in self.exp_types:
inputs = self.scrb.scrub_inputs(exp_type=exp_type)
if inputs is not None:
inputs["dname"] = inputs.index
self.data[exp_type] = inputs
(self.img, self.spec, self.tac, self.fgs) = self.get_unencoded()
self.raw = dict(zip(["IMAGE", "SPEC", "TAC", "FGS"], [self.img, self.spec, self.tac, self.fgs]))
[docs]
def get_unencoded(self):
"""Retrieve the raw (unencoded) L3 products generated by the JWST Scrubber using preprocessed L1 exposure groups.
Returns
-------
dict
Dictionary of each exp_type's dataframe of raw (unencoded) L3 products generated based on groups of L1 input exposures run through the JWST Scubber.
"""
data = [self.scrb.imgpix, self.scrb.specpix, self.scrb.tacpix, self.scrb.fgspix]
return map(lambda x: pd.DataFrame.from_dict(x, orient="index"), data)
[docs]
def match_query(self, info, extra_param=None):
"""Queries the dataframe for L3 products matching the shared metadata attributes for a group of L1 input exposures. If a value is passed into the `extra_param` kwarg, the query is further restricted to include products with a value matching this additional parameter. If this initial query returns 0 results, a second broader query without the additional param is automatically run. By default, the query attempts to find L3 products within the dataframe whose `params` column value matches that of the L1 inputs' `params` column.
Parameters
----------
info : dict
Key-value pairs of metadata pertaining to all L1 input exposures associated with a single L3 product.
extra_param : str, optional
Column name to match against an additional parameter value within the dataframe, by default None
Returns
-------
list
L3 products matching the specified metadata (and query parameters if requested).
"""
if extra_param:
l3 = self.df.loc[
(self.df["params"] == info["params"])
& (self.df[self.dag].isin(self.l3_dags))
& (self.df[extra_param] == info[extra_param])
]
if len(l3) == 0: # drop extra search param
l3 = self.match_query(info)
else:
l3 = self.df.loc[(self.df["params"] == info["params"]) & (self.df[self.dag].isin(self.l3_dags))]
return l3
[docs]
def match_product_groups(self, exp_type):
"""Matching L3 product with its associated L1 input exposures.
1. If TARGNAME: match using params (PID-OBS-OPTELEM-SUBARRAY-EXP_TYPE) + TARGNAME
2. Elif fixed target: match using params + targra (TARG_RA rounded to 6 sig. digits)
3. Else: match params + gs_mag
Parameters
----------
exp_type : str
model-based 'exp_type' grouping: IMAGE, SPEC, TAC, or FGS
"""
self.exmatches[exp_type] = {}
for k, v in self.scrb.expdata[exp_type].items():
exposures = list(v.keys())
self.df.loc[self.df.dname.isin(exposures), "expmode"] = exp_type
info = self.df.loc[exposures[0]]
qp = "TARGNAME"
if info[qp] == "NONE" or isinstance(info[qp], float):
# TARG_RA rounded to 6 decimals for PTF
qp = "targra" if info["VISITYPE"] == "PRIME_TARGETED_FIXED" else "GS_MAG"
l3 = self.match_query(info, extra_param=qp)
if len(l3) == 0:
self.log.debug(f"No matching products identified: {k}")
continue
else:
if len(l3) > 1:
if qp == "TARGNAME" and info["VISITYPE"] == "PRIME_TARGETED_FIXED":
l3 = self.match_query(info, extra_param="targra")
if len(l3) > 1: # FALLBACK
# check if miri ifu (l3 products identical for each band)
self.log.warning(f"MULTI MATCH ELIMINATION: {k}")
pnames = sorted(list(l3.index))
self.exmatches[exp_type][info["params"]] = pnames
self.df.loc[self.df.dname.isin(pnames), "expmode"] = exp_type
l3 = l3.loc[l3["dname"] == pnames[0]]
pname = l3.iloc[0]["dname"]
imagesize = l3.iloc[0]["imagesize"]
self.data[exp_type].loc[k, "pname"] = pname
self.data[exp_type].loc[k, "imagesize"] = imagesize
self.data[exp_type].loc[k, "date"] = l3.iloc[0]["date"]
self.df.loc[pname, "pname"] = pname
self.df.loc[self.df.dname.isin(exposures), "pname"] = pname
self.df.loc[self.df.pname == pname, "expmode"] = exp_type
[docs]
def drop_unmatched(self):
"""Store any unmatched inputs into the `self.raw` attribute then remove them from the training set. Reports a log of the percentage of L3 products successfully matched during this ingest run (anything less than 100% indicates an error)."""
for exp in list(self.data.keys()):
extracols = [c for c in ["imagesize", "date", "pname"] if c in self.data[exp].columns]
self.raw[exp] = pd.concat([self.raw[exp], self.data[exp][extracols]], axis=1)
try:
if "imagesize" in self.raw[exp].columns:
self.rem[exp] = self.raw[exp].loc[self.raw[exp]["imagesize"].isna()].copy()
else:
self.rem[exp] = self.raw[exp].copy()
n = self.df.loc[(self.df.dag.isin(self.l3_dags)) & (self.df.expmode == exp)]["expmode"].size
self.data[exp].drop(self.rem[exp].index, axis=0, inplace=True)
self.raw[exp].drop(self.rem[exp].index, axis=0, inplace=True)
if n > 0:
self.log.info(f"[{exp}] L3 matched: {len(self.data[exp])} | {np.round((len(self.data[exp]) / n) * 100)}%")
except KeyError:
continue
[docs]
def convert_imagesize_units(self, data=None):
"""Converts the `imagesize` (memory footprint) column to Gigabyte units and stores the values in a new column named `imgsize_gb` for each exp_type in the `self.data` attribute (image, spec, etc). If the `data` kwarg is None, this change is also applied to the raw (unencoded) versions (`self.raw`). Otherwise the conversion is made to the dataframe passed into the `data` kwarg.
Parameters
----------
data : pandas.DataFrame, optional
Apply the unit conversion to a particular dataframe instead of the default `self.data`, by default None
Returns
-------
pd.DataFrame
Dataframe with additional column 'imgsize_gb` containing the GB values converted from `imagesize` column.
"""
if data is not None:
data["imgsize_gb"] = data["imagesize"].apply(lambda x: x / 10**6)
return data
for exp in self.exp_types:
try:
if "imagesize" in self.data[exp].columns:
self.data[exp]["imgsize_gb"] = self.data[exp]["imagesize"].apply(lambda x: x / 10**6)
self.raw[exp]["imgsize_gb"] = self.data[exp]["imgsize_gb"]
except KeyError:
continue
[docs]
def update_repro(self):
"""Sometimes an L3 product is reprocessed and will not have any matching L1 inputs.
Updates the imagesize and date attributes of the previous record (if found) with that of the new one.
"""
l3 = self.df.loc[(self.df.pname.isna()) & (self.df.dag.isin(self.l3_dags))]
if len(l3) == 0:
return
self.log.info(f"Identified {len(l3)} potential reprocessed products eligible for update")
dp = self.load_and_recast(f"{self.outpath}/training.csv")
if dp is None:
self.log.warning("Could not update repro data - file not found.")
return
dp = dp.sort_values(by="date").drop_duplicates(subset="pname", keep="last")
updates = {}
notrepro = []
pnames = list(l3.index)
for pname in pnames:
try:
expmode = dp.loc[pname]["expmode"]
if expmode not in updates:
updates[expmode] = dict()
updates[expmode][pname] = dict(
imagesize=l3.loc[pname].imagesize, doy=l3.loc[pname].doy, date=l3.loc[pname].date, year=l3.loc[pname].year
)
except KeyError:
notrepro.append(pname)
continue
for exp_type, repro_data in updates.items():
data = pd.read_csv(self.trainpath.format(exp_type.lower()), index_col=self.idxcol)
for name, revised in repro_data.items():
for k, v in revised.items():
dp.loc[name, k] = v
data.loc[data.pname == name, "imagesize"] = revised["imagesize"]
data.loc[data.pname == name, "date"] = revised["date"]
data = self.convert_imagesize_units(data=data)
data[self.idxcol] = data.index
data.to_csv(self.trainpath.format(exp_type.lower()), index=False)
self.log.info(f"Updated {len(repro_data)} reprocessed {exp_type} products.")
dp[self.idxcol] = dp.index
dp.to_csv(f"{self.outpath}/training.csv", index=False)
l3 = l3.loc[~l3.dname.isin(notrepro)]
if len(l3) > 0:
self.df.drop(l3.index, axis=0, inplace=True)
self.log.info(f"Training file updated and {len(l3)} L3 repro products removed from dataframe.")
else:
self.log.warning("0 repro candidates matched.")
[docs]
def save_training_sets(self):
"""Adds preprocessed ML training data for each model type to its respective file on local disk: `train-{exp_type}.csv`.
The raw (unencoded) versions are also saved to local disk as `raw-{exp_type}.csv`.
Any remaining L1 inputs that did not have a matching L3 product are saved to `rem-{exp-type}.csv` primarily for debugging purposes.
"""
for exp in self.exp_types:
if exp in self.data.keys() and len(self.data[exp]) > 0:
fpath = self.trainpath.format(exp.lower())
self.data[exp][self.idxcol] = self.data[exp]["pname"]
self.data[exp].to_csv(fpath, **self.save_kwargs(fpath))
self.log.info(f"{exp} training data saved to: {fpath}")
wpath = self.rawpath.format(exp.lower())
self.raw[exp][self.idxcol] = self.raw[exp]["pname"]
self.raw[exp].to_csv(wpath, **self.save_kwargs(wpath))
if exp in self.rem.keys() and len(self.rem[exp]) > 0:
rpath = self.rempath.format(exp.lower())
self.rem[exp][self.idxcol] = self.rem[exp].index
self.rem[exp].to_csv(rpath, index=False)
self.log.info(f"Remaining {exp} data saved to: {rpath}")
[docs]
def save_ingest_data(self):
"""Adds unmatched L1 inputs into 'ingest.csv', matched L3 products to 'training.csv'.
If `save_l1` attribute is True, matched L1 input exposures are saved to a separate file 'level1.csv'.
"""
self.df[self.idxcol] = self.df.index
if "pname" not in self.df.columns:
di = self.df.loc[self.df.dag.isin(self.l1_dags)]
else:
di = self.df.loc[self.df.pname.isna()].copy()
di.drop(["pname"], axis=1, inplace=True)
di.to_csv(self.ingest_file, **self.save_kwargs(self.ingest_file))
self.log.info(f"Remaining Ingest data saved to: {self.ingest_file}")
dp = self.df.drop(di.index, axis=0)
if len(dp) > 0:
if self.save_l1 is True:
l1 = dp.loc[dp.dag.isin(self.l1_dags)]
l1_path = f"{self.outpath}/level1.csv"
l1.to_csv(l1_path, **self.save_kwargs(l1_path))
self.log.info(f"{len(l1)} L1 products added to: {l1_path}")
dp = dp.loc[dp.dag.isin(self.l3_dags)]
ppath = f"{self.outpath}/training.csv"
dp.to_csv(ppath, **self.save_kwargs(ppath))
self.log.info(f"{len(dp)} L3 products added to: {ppath}")
[docs]
def hst_svm_ingest(**kwargs):
"""Main calling function for runnning HST SVM Alignment Data Ingest."""
visit_path = kwargs.pop("visit_path", None)
batch_name = kwargs.pop("batch_name", None)
drz_ver = kwargs.pop("drz_ver", None)
svi = SvmAlignmentIngest(**kwargs)
if visit_path is not None:
svi.prep_single_visit(visit_path)
else:
svi.prep_svm_batch(batch_name=batch_name, drz_ver=drz_ver)
[docs]
def jwst_cal_ingest(**kwargs):
"""Main calling function for running JWST Calibration Data Ingest."""
jc = JwstCalIngest(**kwargs)
jc.run_ingest()
if __name__ == "__main__":
parser = ArgumentParser(prog="spacekit.preprocessor.ingest")
subparsers = parser.add_subparsers(title="skope", help="application skope")
parser_jcal = subparsers.add_parser(
"jcal",
add_help=False,
parents=[parser],
help="jwst calibration training data",
usage="spacekit.preprocessor.ingest skope [options]",
)
parser_hsvm = subparsers.add_parser(
"hsvm",
add_help=False,
parents=[parser],
help="hst svm alignment training data",
usage="spacekit.preprocessor.ingest skope [options]",
)
for subparser in [parser_jcal, parser_hsvm]:
subparser.add_argument("-i", "--input_path", default=os.getcwd(), type=str, help="data filepath to be ingested")
subparser.add_argument("-o", "--outpath", type=str, default=None, help="path to save ingested data on local disk")
parser_jcal.add_argument("-p", "--pfx", type=str, default=None, help="file name prefix to limit search on local disk")
parser_jcal.add_argument("-s", "--save_l1", type=bool, default=False, help="save matched level 1 input data to separate file")
parser_jcal.set_defaults(func=jwst_cal_ingest)
parser_hsvm.add_argument("-b", "--batch_name", type=str, default=None)
parser_hsvm.add_argument("-d", "--drz_ver", type=str, default="")
parser_hsvm.add_argument("-z", "--visit_path", type=str, default=None)
parser_hsvm.set_defaults(func=hst_svm_ingest)
kwargs = {**vars(parser.parse_args())}
func = kwargs.pop("func")
func(**kwargs)