from spacekit.preprocessor.encode import encode_target_data
from spacekit.preprocessor.transform import arrays_to_tensors, y_tensors, PowerX
from sklearn.model_selection import train_test_split
from spacekit.logger.log import Logger
[docs]
class Prep:
"""Base class for preprocessing data sets prior to training a machine learning model. This class can be used directly or subclassed for additional custom preprocessing. Existing subclasses for HST and JWST skopes are also available.
Parameters
----------
data : pandas.DataFrame
training dataset to be preprocessed
y_target : str, optional
target column name (dependent variable), by default "imgsize_gb"
X_cols : list, optional
feature column names (independent variables), by default []
tensors : bool, optional
convert model inputs into tensors, by default True
normalize : bool, optional
apply normalization, by default True
random : int, optional
random seed for train-test splits, by default None
tsize : float, optional
test size ratio, by default 0.2
encode_targets : bool, optional
encode target values (categorical classifiers), by default False
norm_params : dict, optional
normalization parameters (see apply_normalization for acceptable key-val pairs), by default None
"""
def __init__(
self,
data,
y_target,
X_cols=[],
tensors=True,
normalize=True,
random=None,
tsize=0.2,
encode_targets=True,
norm_params=None,
):
self.data = data
self.y_target = y_target
self.X_cols = self.check_input_cols(X_cols)
self.tensors = tensors
self.normalize = normalize
self.norm_params = norm_params
self.random = random
self.tsize = tsize
self.encode_targets = encode_targets
self.X = self.data[self.X_cols]
self.train_idx = None
self.test_idx = None
self.Tx = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
def check_input_cols(self, X_cols):
if len(X_cols) == 0:
X_cols = list(self.data.columns)
X_cols.remove(self.y_target)
return X_cols
def stratify_split(self, y_target=None, stratify=True):
if y_target is None:
y_target = self.y_target
y = self.data[y_target]
strat = y if stratify is True else None
train, test = train_test_split(self.X, test_size=self.tsize, stratify=strat, random_state=self.random)
self.train_idx, self.test_idx = train.index, test.index
self.data["split"] = "train"
self.data.loc[self.test_idx, "split"] = "test"
def get_X_y(self, group, y_target):
if group == "train":
X_train = self.data.loc[self.train_idx, self.X_cols]
y_train = self.data.loc[self.train_idx, y_target]
return X_train, y_train
elif group == "test":
X_test = self.data.loc[self.test_idx, self.X_cols]
y_test = self.data.loc[self.test_idx, y_target]
return X_test, y_test
else:
raise ValueError("group must be train or test.")
def get_X_train_test(self):
X_train = self.data.loc[self.train_idx, self.X_cols]
X_test = self.data.loc[self.test_idx, self.X_cols]
return X_train, X_test
def get_y_train_test(self, y_target):
y_train = self.data.loc[self.train_idx, y_target]
y_test = self.data.loc[self.test_idx, y_target]
return y_train, y_test
def get_test_index(self, target_col):
return self.data.loc[self.test_idx, target_col]
def set_normalization_params(self):
if self.norm_params is None:
self.norm_params = dict(T=PowerX, cols=[], ncols=[], rename=None, join=1, save_tx=True)
def _prep_data(self, y_target, stratify=True):
"""main calling function"""
if self.train_idx is None:
self.stratify_split(y_target=y_target, stratify=stratify)
self.X_train, self.y_train = self.get_X_y("train", y_target)
self.X_test, self.y_test = self.get_X_y("test", y_target)
# y_train encode, reshape
if self.encode_targets is True:
self.y_train, self.y_test = self.encode_y(self.y_train, self.y_test)
if self.normalize:
self.set_normalization_params()
self.apply_normalization(**self.norm_params)
if self.tensors is True:
train_test_data = [self.X_train, self.y_train, self.X_test, self.y_test]
self.X_train, self.y_train, self.X_test, self.y_test = arrays_to_tensors(*train_test_data)
def encode_y(self, y_train, y_test):
return encode_target_data(y_train, y_test)
def apply_normalization(
self,
T=PowerX,
cols=[],
ncols=[],
rename=None,
join=1,
save_tx=True,
save_as="tx_data.json",
):
if len(cols) == 0:
cols = self.X_cols
if len(ncols) == 0:
ncols = [i for i, c in enumerate(self.X_cols) if c in cols]
self.Tx = T(
self.X,
cols,
ncols=ncols,
save_tx=save_tx,
rename=rename,
join_data=join,
save_as=save_as,
)
self.X_train = T(
self.X_train,
cols,
ncols=ncols,
tx_data=self.Tx.tx_data,
rename=rename,
join_data=join,
).Xt
if self.X_test is not None:
self.X_test = T(
self.X_test,
cols,
ncols=ncols,
tx_data=self.Tx.tx_data,
rename=rename,
join_data=join,
).Xt
[docs]
class HstCalPrep(Prep):
def __init__(
self,
data,
y_target,
X_cols=[],
norm_cols=["n_files", "total_mb"],
rename_cols=["x_files", "x_size"],
tensors=True,
normalize=True,
random=None,
tsize=0.2,
encode_targets=True,
):
self.set_X_cols(X_cols)
super().__init__(
data,
y_target,
X_cols=self.X_cols,
tensors=tensors,
normalize=normalize,
random=random,
tsize=tsize,
encode_targets=encode_targets,
)
self.norm_cols = norm_cols
self.rename_cols = rename_cols
self.mem_bin = data["mem_bin"]
self.memory = data["memory"]
self.wallclock = data["wallclock"]
self.y_bin_train = None
self.y_bin_test = None
self.y_mem_train = None
self.y_mem_test = None
self.y_wall_train = None
self.y_wall_test = None
def set_X_cols(self, X_cols):
if len(X_cols) == 0:
self.X_cols = [
"n_files",
"total_mb",
"drizcorr",
"pctecorr",
"crsplit",
"subarray",
"detector",
"dtype",
"instr",
]
else:
self.X_cols = X_cols
def prep_data(self):
super().stratify_split(y_target="mem_bin", stratify=True)
self.X_train, self.X_test = super().get_X_train_test()
super().apply_normalization(T=PowerX, cols=self.norm_cols, rename=self.rename_cols, join=2)
self.prep_mem_bin()
self.prep_mem_reg()
self.prep_wall_reg()
[docs]
def prep_mem_bin(self):
"""main calling function"""
y_train, y_test = super().get_y_train_test("mem_bin")
y_train, y_test = self.encode_y(y_train, y_test)
self.y_bin_train, self.y_bin_test = y_tensors(y_train, y_test, reshape=True)
def prep_mem_reg(self):
y_train, y_test = super().get_y_train_test("memory")
self.y_mem_train, self.y_mem_test = y_tensors(y_train.values, y_test.values, reshape=True)
def prep_wall_reg(self):
y_train, y_test = super().get_y_train_test("wallclock")
self.y_wall_train, self.y_wall_test = y_tensors(y_train.values, y_test.values, reshape=True)
[docs]
class JwstCalPrep(Prep):
"""Class for preprocessing JWST calibration pipeline metadata prior to training neural networks for estimating memory footprint.
Parameters
----------
data : pandas.DataFrame
training dataset to be preprocessed
y_target : str, optional
target column name (dependent variable), by default "imgsize_gb"
X_cols : list, optional
feature column names (independent variables), by default []
norm_cols : list, optional
columns on which to apply normalization, by default []
exp_mode : str, optional
model training set (image, spec, tac, fgs), by default "image"
tensors : bool, optional
convert model inputs into tensors, by default True
normalize : bool, optional
apply normalization, by default True
random : int, optional
random seed for train-test splits, by default None
tsize : float, optional
test size ratio, by default 0.2
encode_targets : bool, optional
encode target values (categorical classifiers), by default False
"""
def __init__(
self,
data,
y_target="imgsize_gb",
X_cols=[],
norm_cols=[],
exp_mode="image",
tensors=True,
normalize=True,
random=None,
tsize=0.2,
encode_targets=False,
**log_kws,
):
self.exp_mode = exp_mode
self.set_X_cols(X_cols)
self.set_norm_cols(norm_cols=norm_cols)
self.__name__ = "JwstCalPrep"
self.log = Logger(self.__name__, **log_kws).spacekit_logger()
super().__init__(
data,
y_target,
X_cols=self.X_cols,
tensors=tensors,
normalize=normalize,
random=random,
tsize=tsize,
encode_targets=encode_targets,
)
self.target_data = data[self.y_target]
self.y_reg_train = None
self.y_reg_test = None
self.y_bin_train = None
self.y_bin_test = None
def set_X_cols(self, X_cols):
if len(X_cols) == 0:
self.X_cols = dict(
image=[
"instr",
"detector",
"visitype",
"filter",
"pupil",
"channel",
"subarray",
"bkgdtarg",
"nexposur",
"numdthpt",
"offset",
"max_offset",
"mean_offset",
"sigma_offset",
"err_offset",
"sigma1_mean",
"frac",
"targ_frac",
],
spec=[
"instr",
"detector",
"visitype",
"filter",
"pupil",
"grating",
"subarray",
"band",
"nexposur",
"numdthpt",
"targ_max_offset",
"offset",
"max_offset",
"mean_offset",
"sigma_offset",
"err_offset",
"sigma1_mean",
"frac",
],
fgs=[
"instr",
"detector",
"visitype",
"subarray",
"nexposur",
"numdthpt",
"crowdfld",
"gs_mag",
],
tac=[
"instr",
"detector",
"visitype",
"exp_type",
"tsovisit",
"filter",
"grating",
"subarray",
"nexposur",
"numdthpt",
"targ_max_offset",
"offset",
"max_offset",
"mean_offset",
"sigma_offset",
"err_offset",
"sigma1_mean",
"frac",
],
)[self.exp_mode]
else:
self.X_cols = X_cols
def set_norm_cols(self, norm_cols=[]):
if len(norm_cols) == 0:
norm_cols = dict(
image=[
"offset",
"max_offset",
"mean_offset",
"sigma_offset",
"err_offset",
"sigma1_mean",
],
spec=[
"targ_max_offset",
"offset",
"max_offset",
"mean_offset",
"sigma_offset",
"err_offset",
"sigma1_mean",
],
)[self.exp_mode]
self.norm_cols = [c for c in norm_cols if c in self.X_cols]
@property
def memory_classes(self):
return {0: [0, 12], 1: [12, 225], 2: [225, 950], 3: [950, 2000]}
[docs]
def classify_targets(self):
"""Creates temporary target class 'mem_bin' based on max RAM levels specified by
`memory_classes` property.
"""
y = self.y_target
for c, rng in self.memory_classes.items():
self.data.loc[(self.data[y] >= rng[0]) & (self.data[y] < rng[1]), "mem_bin"] = c
[docs]
def prep_data(self, existing_splits=False, stratify=False):
"""Splits data into training (X_train) and test (X_test) sets and applies a PowerTransform
normalization to each.
Parameters
----------
existing_splits : bool, optional
Split the data using values in 'split' column, by default False
stratify : bool, optional
Stratify splits according to target class distribution (mem_bin), by default False
"""
if existing_splits is True:
if "split" not in self.data.columns:
self.log.warning("'split' not found in data columns")
return
self.test_idx = self.data.loc[self.data.split == "test"].index
self.train_idx = self.data.loc[self.data.split == "train"].index
else:
y_target = self.y_target
if stratify is True:
y_target = "mem_bin"
self.classify_targets()
super().stratify_split(y_target=y_target, stratify=stratify)
self.X_train, self.X_test = super().get_X_train_test()
fname = f"tx_data-{self.exp_mode}.json"
super().apply_normalization(T=PowerX, cols=self.norm_cols, rename=None, join=1, save_as=fname)
self.X_train = self.X_train[self.X_cols]
self.X_test = self.X_test[self.X_cols]
[docs]
def prep_targets(self):
"""main calling function"""
y_train, y_test = super().get_y_train_test(self.y_target)
self.y_reg_train, self.y_reg_test = y_tensors(y_train.values, y_test.values, reshape=True)
# TODO
[docs]
class SvmPrep(Prep):
def __init__(
self,
data,
y_target="label",
X_cols=[],
tensors=True,
normalize=False,
random=None,
tsize=0.2,
encode_targets=False,
norm_params=None,
):
self.set_X_cols(X_cols)
super().__init(
data,
y_target,
X_cols=self.X_cols,
tensors=tensors,
normalize=normalize,
random=random,
tsize=tsize,
encode_targets=encode_targets,
norm_params=norm_params,
)
self.norm_cols = ["", ""]
self.label = data["label"]
self.y_train_labels = None
self.y_test_labels = None
def set_X_cols(self, X_cols):
if len(X_cols) == 0:
self.X_cols = [
"numexp",
"rms_ra",
"rms_dec",
"nmatches",
"point",
"segment",
"gaia",
"det",
"wcs",
"cat",
]
else:
self.X_cols = X_cols
def prep_data(self):
super()._prep_data(self.y_target, stratify=True)