From b85ee9d64a536937912544c7bbd5b98b635b7e8d Mon Sep 17 00:00:00 2001 From: Christian C Date: Mon, 11 Nov 2024 12:29:32 -0800 Subject: Initial commit --- code/sunlab/sunflow/models/__init__.py | 7 + .../sunflow/models/adversarial_autoencoder.py | 344 +++++++++++++++++++++ code/sunlab/sunflow/models/autoencoder.py | 85 +++++ code/sunlab/sunflow/models/decoder.py | 127 ++++++++ code/sunlab/sunflow/models/discriminator.py | 132 ++++++++ code/sunlab/sunflow/models/encoder.py | 140 +++++++++ .../sunlab/sunflow/models/encoder_discriminator.py | 96 ++++++ code/sunlab/sunflow/models/utilities.py | 93 ++++++ 8 files changed, 1024 insertions(+) create mode 100644 code/sunlab/sunflow/models/__init__.py create mode 100644 code/sunlab/sunflow/models/adversarial_autoencoder.py create mode 100644 code/sunlab/sunflow/models/autoencoder.py create mode 100644 code/sunlab/sunflow/models/decoder.py create mode 100644 code/sunlab/sunflow/models/discriminator.py create mode 100644 code/sunlab/sunflow/models/encoder.py create mode 100644 code/sunlab/sunflow/models/encoder_discriminator.py create mode 100644 code/sunlab/sunflow/models/utilities.py (limited to 'code/sunlab/sunflow/models') diff --git a/code/sunlab/sunflow/models/__init__.py b/code/sunlab/sunflow/models/__init__.py new file mode 100644 index 0000000..f111c0a --- /dev/null +++ b/code/sunlab/sunflow/models/__init__.py @@ -0,0 +1,7 @@ +from .autoencoder import Autoencoder +from .adversarial_autoencoder import AdversarialAutoencoder +from sunlab.common.data.dataset import Dataset +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from .utilities import create_aae, create_aae_and_dataset +from .utilities import load_aae, load_aae_and_dataset diff --git a/code/sunlab/sunflow/models/adversarial_autoencoder.py b/code/sunlab/sunflow/models/adversarial_autoencoder.py new file mode 100644 index 0000000..4cbb2f8 --- /dev/null +++ b/code/sunlab/sunflow/models/adversarial_autoencoder.py @@ -0,0 +1,344 @@ +from sunlab.common.data.dataset import Dataset +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from .encoder import Encoder +from .decoder import Decoder +from .discriminator import Discriminator +from .encoder_discriminator import EncoderDiscriminator +from .autoencoder import Autoencoder +from tensorflow.keras import optimizers, metrics, losses +import tensorflow as tf +from numpy import ones, zeros, float32, NaN + + +class AdversarialAutoencoder: + """# Adversarial Autoencoder + - distribution: The distribution used by the adversary to learn on""" + + def __init__( + self, + model_base_directory, + distribution: AdversarialDistribution or None = None, + scaler: AdversarialScaler or None = None, + ): + """# Adversarial Autoencoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded + - distribution: The distribution the adversary will use + - scaler: The scaling function the model will assume on the data""" + self.model_base_directory = model_base_directory + if distribution is not None: + self.distribution = distribution + else: + self.distribution = None + if scaler is not None: + self.scaler = scaler(self.model_base_directory) + else: + self.scaler = None + + def init( + self, + data=None, + data_size=13, + autoencoder_layer_size=16, + adversary_layer_size=8, + latent_size=2, + autoencoder_depth=2, + dropout=0.0, + use_leaky_relu=False, + **kwargs, + ): + """# Initialize AAE model parameters + - data_size: int + - autoencoder_layer_size: int + - adversary_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float + - use_leaky_relu: boolean""" + self.data_size = data_size + self.autoencoder_layer_size = autoencoder_layer_size + self.adversary_layer_size = adversary_layer_size + self.latent_size = latent_size + self.autoencoder_depth = autoencoder_depth + self.dropout = dropout + self.use_leaky_relu = use_leaky_relu + self.save_parameters() + self.encoder = Encoder(self.model_base_directory).init() + self.decoder = Decoder(self.model_base_directory).init() + self.autoencoder = Autoencoder(self.model_base_directory).init( + self.encoder, self.decoder + ) + self.discriminator = Discriminator(self.model_base_directory).init() + self.encoder_discriminator = EncoderDiscriminator( + self.model_base_directory + ).init(self.encoder, self.discriminator) + if self.distribution is not None: + self.distribution = self.distribution(self.latent_size) + if (data is not None) and (self.scaler is not None): + self.scaler = self.scaler.init(data) + self.init_optimizers_and_metrics(**kwargs) + return self + + def init_optimizers_and_metrics( + self, + optimizer=optimizers.Adam, + ae_metric=metrics.MeanAbsoluteError, + adv_metric=metrics.BinaryCrossentropy, + ae_lr=7e-4, + adv_lr=3e-4, + loss_fn=losses.BinaryCrossentropy, + **kwargs, + ): + """# Set the optimizer, loss function, and metrics""" + self.ae_optimizer = optimizer(learning_rate=ae_lr) + self.adv_optimizer = optimizer(learning_rate=adv_lr) + self.gan_optimizer = optimizer(learning_rate=adv_lr) + self.train_ae_metric = ae_metric() + self.val_ae_metric = ae_metric() + self.train_adv_metric = adv_metric() + self.val_adv_metric = adv_metric() + self.train_gan_metric = adv_metric() + self.val_gan_metric = adv_metric() + self.loss_fn = loss_fn() + + def load(self): + """# Load the models from their respective files""" + self.load_parameters() + self.encoder = Encoder(self.model_base_directory).load() + self.decoder = Decoder(self.model_base_directory).load() + self.autoencoder = Autoencoder(self.model_base_directory).load() + self.discriminator = Discriminator(self.model_base_directory).load() + self.encoder_discriminator = EncoderDiscriminator( + self.model_base_directory + ).load() + if self.scaler is not None: + self.scaler = self.scaler.load() + return self + + def save(self, overwrite=False): + """# Save each model in the AAE""" + self.encoder.save(overwrite=overwrite) + self.decoder.save(overwrite=overwrite) + self.autoencoder.save(overwrite=overwrite) + self.discriminator.save(overwrite=overwrite) + self.encoder_discriminator.save(overwrite=overwrite) + if self.scaler is not None: + self.scaler.save() + + def save_parameters(self): + """# Save the AAE parameters in a file""" + from pickle import dump + from os import makedirs + + makedirs(self.model_base_directory + "/portable/", exist_ok=True) + parameters = { + "data_size": self.data_size, + "autoencoder_layer_size": self.autoencoder_layer_size, + "adversary_layer_size": self.adversary_layer_size, + "latent_size": self.latent_size, + "autoencoder_depth": self.autoencoder_depth, + "dropout": self.dropout, + "use_leaky_relu": self.use_leaky_relu, + } + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "wb" + ) as phandle: + dump(parameters, phandle) + + def load_parameters(self): + """# Load the AAE parameters from a file""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.autoencoder_layer_size = parameters["autoencoder_layer_size"] + self.adversary_layer_size = parameters["adversary_layer_size"] + self.latent_size = parameters["latent_size"] + self.autoencoder_depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + return parameters + + def summary(self): + """# Summarize each model in the AAE""" + self.encoder.summary() + self.decoder.summary() + self.autoencoder.summary() + self.discriminator.summary() + self.encoder_discriminator.summary() + + @tf.function + def train_step(self, x, y): + """# Training Step + + 1. Train the Autoencoder + 2. (If distribution is given) Train the discriminator + 3. (If the distribution is given) Train the encoder_discriminator""" + # Autoencoder Training + with tf.GradientTape() as tape: + decoded_vector = self.autoencoder(x, training=True) + ae_loss_value = self.loss_fn(y, decoded_vector) + grads = tape.gradient(ae_loss_value, self.autoencoder.model.trainable_weights) + self.ae_optimizer.apply_gradients( + zip(grads, self.autoencoder.model.trainable_weights) + ) + self.train_ae_metric.update_state(y, decoded_vector) + if self.distribution is not None: + # Adversary Trainig + with tf.GradientTape() as tape: + latent_vector = self.encoder(x) + fakepred = self.distribution(x.shape[0]) + discbatch_x = tf.concat([latent_vector, fakepred], axis=0) + discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0) + adversary_vector = self.discriminator(discbatch_x, training=True) + adv_loss_value = self.loss_fn(discbatch_y, adversary_vector) + grads = tape.gradient( + adv_loss_value, self.discriminator.model.trainable_weights + ) + self.adv_optimizer.apply_gradients( + zip(grads, self.discriminator.model.trainable_weights) + ) + self.train_adv_metric.update_state(discbatch_y, adversary_vector) + # Gan Training + with tf.GradientTape() as tape: + gan_vector = self.encoder_discriminator(x, training=True) + adv_vector = tf.convert_to_tensor(ones((x.shape[0], 1), dtype=float32)) + gan_loss_value = self.loss_fn(gan_vector, adv_vector) + grads = tape.gradient(gan_loss_value, self.encoder.model.trainable_weights) + self.gan_optimizer.apply_gradients( + zip(grads, self.encoder.model.trainable_weights) + ) + self.train_gan_metric.update_state(adv_vector, gan_vector) + return (ae_loss_value, adv_loss_value, gan_loss_value) + return (ae_loss_value, None, None) + + @tf.function + def test_step(self, x, y): + """# Test Step - On validation data + + 1. Evaluate the Autoencoder + 2. (If distribution is given) Evaluate the discriminator + 3. (If the distribution is given) Evaluate the encoder_discriminator""" + val_decoded_vector = self.autoencoder(x, training=False) + self.val_ae_metric.update_state(y, val_decoded_vector) + + if self.distribution is not None: + latent_vector = self.encoder(x) + fakepred = self.distribution(x.shape[0]) + discbatch_x = tf.concat([latent_vector, fakepred], axis=0) + discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0) + adversary_vector = self.discriminator(discbatch_x, training=False) + self.val_adv_metric.update_state(discbatch_y, adversary_vector) + + gan_vector = self.encoder_discriminator(x, training=False) + self.val_gan_metric.update_state(ones(x.shape[0]), gan_vector) + + # Garbage Collect at the end of each epoch + def on_epoch_end(self, _epoch, logs=None): + """# Cleanup environment to prevent memory leaks each epoch""" + import gc + from tensorflow.keras import backend as k + + gc.collect() + k.clear_session() + + def train( + self, + dataset: Dataset, + epoch_count: int = 1, + output=False, + output_freq=1, + fmt="%i[%.3f]: %.2e %.2e %.2e %.2e %.2e %.2e", + ): + """# Train the model on a dataset + + - dataset: ataset = Dataset to train the model on, which as the + training and validation iterators set up + - epoch_count: int = The number of epochs to train + - output: boolean = Whether or not to output training information + - output_freq: int = The number of epochs between each output""" + from time import time + from numpy import array as narray + + def fmtter(x): + return x if x is not None else -1 + + epoch_data = [] + dataset.reset_iterators() + + self.test_step(dataset.dataset, dataset.dataset) + val_ae = self.val_ae_metric.result() + val_adv = self.val_adv_metric.result() + val_gan = self.val_gan_metric.result() + self.val_ae_metric.reset_states() + self.val_adv_metric.reset_states() + self.val_gan_metric.reset_states() + print( + fmt + % ( + 0, + NaN, + val_ae, + fmtter(val_adv), + fmtter(val_gan), + NaN, + NaN, + NaN, + ) + ) + for epoch in range(epoch_count): + start_time = time() + + for step, (x_batch_train, y_batch_train) in enumerate(dataset.training): + ae_lv, adv_lv, gan_lv = self.train_step(x_batch_train, x_batch_train) + + train_ae = self.train_ae_metric.result() + train_adv = self.train_adv_metric.result() + train_gan = self.train_gan_metric.result() + self.train_ae_metric.reset_states() + self.train_adv_metric.reset_states() + self.train_gan_metric.reset_states() + + for step, (x_batch_val, y_batch_val) in enumerate(dataset.validation): + self.test_step(x_batch_val, x_batch_val) + + val_ae = self.val_ae_metric.result() + val_adv = self.val_adv_metric.result() + val_gan = self.val_gan_metric.result() + self.val_ae_metric.reset_states() + self.val_adv_metric.reset_states() + self.val_gan_metric.reset_states() + + epoch_data.append( + ( + epoch, + train_ae, + val_ae, + fmtter(train_adv), + fmtter(val_adv), + fmtter(train_gan), + fmtter(val_gan), + ) + ) + if output and (epoch + 1) % output_freq == 0: + print( + fmt + % ( + epoch + 1, + time() - start_time, + train_ae, + fmtter(train_adv), + fmtter(train_gan), + val_ae, + fmtter(val_adv), + fmtter(val_gan), + ) + ) + self.on_epoch_end(epoch) + dataset.reset_iterators() + return narray(epoch_data) diff --git a/code/sunlab/sunflow/models/autoencoder.py b/code/sunlab/sunflow/models/autoencoder.py new file mode 100644 index 0000000..473d00d --- /dev/null +++ b/code/sunlab/sunflow/models/autoencoder.py @@ -0,0 +1,85 @@ +class Autoencoder: + """# Autoencoder Model + + Constructs an encoder-decoder model""" + + def __init__(self, model_base_directory): + """# Autoencoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self, encoder, decoder): + """# Initialize an Autoencoder + + - encoder: The encoder to use + - decoder: The decoder to use""" + from tensorflow import keras + + self.load_parameters() + self.model = keras.models.Sequential() + self.model.add(encoder.model) + self.model.add(decoder.model) + self.model._name = "Autoencoder" + return self + + def load(self): + """# Load an existing Autoencoder""" + from os import listdir + + if "autoencoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/autoencoder.keras", compile=False + ) + self.model._name = "Autoencoder" + return self + + def save(self, overwrite=False): + """# Save the current Autoencoder + + - Overwrite: overwrite any existing autoencoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras") + return True + if "autoencoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras") + return True + + def load_parameters(self): + """# Load Autoencoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Autoencoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the autoencoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/decoder.py b/code/sunlab/sunflow/models/decoder.py new file mode 100644 index 0000000..40ea190 --- /dev/null +++ b/code/sunlab/sunflow/models/decoder.py @@ -0,0 +1,127 @@ +class Decoder: + """# Decoder Model + + Constructs a decoder model with a certain depth of intermediate layers of + fixed size""" + + def __init__(self, model_base_directory): + """# Decoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Decoder + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + self.model = keras.models.Sequential() + if self.depth == 0: + self.model.add( + layers.Dense( + self.data_size, + input_shape=(self.latent_size,), + activation=None, + name="decoder_latent_vector", + ) + ) + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.latent_size,), + activation=None, + name="decoder_dense_1", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, activation=None, name=f"decoder_dense_{_d+1}" + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + self.data_size, activation=None, name="decoder_output_vector" + ) + ) + self.model._name = "Decoder" + return self + + def load(self): + """# Load an existing Decoder""" + from os import listdir + + if "decoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/decoder.keras", compile=False + ) + self.model._name = "Decoder" + return self + + def save(self, overwrite=False): + """# Save the current Decoder + + - Overwrite: overwrite any existing decoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/decoder.keras") + return True + if "decoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/decoder.keras") + return True + + def load_parameters(self): + """# Load Decoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Decoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the decoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/discriminator.py b/code/sunlab/sunflow/models/discriminator.py new file mode 100644 index 0000000..38bed56 --- /dev/null +++ b/code/sunlab/sunflow/models/discriminator.py @@ -0,0 +1,132 @@ +class Discriminator: + """# Discriminator Model + + Constructs a discriminator model with a certain depth of intermediate + layers of fixed size""" + + def __init__(self, model_base_directory): + """# Discriminator Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Discriminator + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + self.model = keras.models.Sequential() + if self.depth == 0: + self.model.add( + layers.Dense( + 1, + input_shape=(self.latent_size,), + activation=None, + name="discriminator_output_vector", + ) + ) + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.latent_size,), + activation=None, + name="discriminator_dense_1", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, + activation=None, + name=f"discriminator_dense_{_d+1}", + ) + ) + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + 1, activation="sigmoid", name="discriminator_output_vector" + ) + ) + self.model._name = "Discriminator" + return self + + def load(self): + """# Load an existing Discriminator""" + from os import listdir + + if "discriminator.keras" not in listdir( + f"{self.model_base_directory}/portable/" + ): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/discriminator.keras", compile=False + ) + self.model._name = "Discriminator" + return self + + def save(self, overwrite=False): + """# Save the current Discriminator + + - Overwrite: overwrite any existing discriminator that has been + saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/discriminator.keras") + return True + if "discriminator.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/discriminator.keras") + return True + + def load_parameters(self): + """# Load Discriminator Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - adversary_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["adversary_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Discriminator model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the discriminator class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/encoder.py b/code/sunlab/sunflow/models/encoder.py new file mode 100644 index 0000000..22d1a9a --- /dev/null +++ b/code/sunlab/sunflow/models/encoder.py @@ -0,0 +1,140 @@ +class Encoder: + """# Encoder Model + + Constructs an encoder model with a certain depth of intermediate layers of + fixed size""" + + def __init__(self, model_base_directory): + """# Encoder Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self): + """# Initialize a new Encoder + + Expects a model parameters file to already exist in the initialization + base directory when initializing the model""" + from tensorflow import keras + from tensorflow.keras import layers + + # Load in the model parameters + self.load_parameters() + assert self.depth >= 0, "Depth must be non-negative" + + # Create the model + self.model = keras.models.Sequential() + # At zero depth, connect input and output layer directly + if self.depth == 0: + self.model.add( + layers.Dense( + self.latent_size, + input_shape=(self.data_size,), + activation=None, + name="encoder_latent_vector", + ) + ) + # Otherwise, add fixed-sized layers between them + else: + self.model.add( + layers.Dense( + self.layer_size, + input_shape=(self.data_size,), + activation=None, + name="encoder_dense_1", + ) + ) + # Use LeakyReLU if specified + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + # Include a droput layer if specified + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + for _d in range(1, self.depth): + self.model.add( + layers.Dense( + self.layer_size, activation=None, name=f"encoder_dense_{_d+1}" + ) + ) + # Use LeakyReLU if specified + if self.use_leaky_relu: + self.model.add(layers.LeakyReLU()) + else: + self.model.add(layers.ReLU()) + # Include a droput layer if specified + if self.dropout > 0.0: + self.model.add(layers.Dropout(self.dropout)) + self.model.add( + layers.Dense( + self.latent_size, activation=None, name="encoder_latent_vector" + ) + ) + self.model._name = "Encoder" + return self + + def load(self): + """# Load an existing Encoder""" + from os import listdir + + # If the encoder is not found, return None + if "encoder.keras" not in listdir(f"{self.model_base_directory}/portable/"): + return None + # Otherwise, load the encoder + # compile=False suppresses warnings about training + # If you want to train it, you will need to recompile it + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/encoder.keras", compile=False + ) + self.model._name = "Encoder" + return self + + def save(self, overwrite=False): + """# Save the current Encoder + + - Overwrite: overwrite any existing encoder that has been saved""" + from os import listdir + + if overwrite: + self.model.save(f"{self.model_base_directory}/portable/encoder.keras") + return True + if "encoder.keras" in listdir(f"{self.model_base_directory}/portable/"): + return False + self.model.save(f"{self.model_base_directory}/portable/encoder.keras") + return True + + def load_parameters(self): + """# Load Encoder Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the Encoder model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the encoder class, return the model's output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/encoder_discriminator.py b/code/sunlab/sunflow/models/encoder_discriminator.py new file mode 100644 index 0000000..5efb6af --- /dev/null +++ b/code/sunlab/sunflow/models/encoder_discriminator.py @@ -0,0 +1,96 @@ +class EncoderDiscriminator: + """# EncoderDiscriminator Model + + Constructs an encoder-discriminator model""" + + def __init__(self, model_base_directory): + """# EncoderDiscriminator Model Initialization + + - model_base_directory: The base folder directory where the model will + be saved/ loaded""" + self.model_base_directory = model_base_directory + + def init(self, encoder, discriminator): + """# Initialize a EncoderDiscriminator + + - encoder: The encoder to use + - discriminator: The discriminator to use""" + from tensorflow import keras + + self.load_parameters() + self.model = keras.models.Sequential() + self.model.add(encoder.model) + self.model.add(discriminator.model) + self.model._name = "EncoderDiscriminator" + return self + + def load(self): + """# Load an existing EncoderDiscriminator""" + from os import listdir + + if "encoder_discriminator.keras" not in listdir( + f"{self.model_base_directory}/portable/" + ): + return None + import tensorflow as tf + + self.model = tf.keras.models.load_model( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras", + compile=False, + ) + self.model._name = "EncoderDiscriminator" + return self + + def save(self, overwrite=False): + """# Save the current EncoderDiscriminator + + - Overwrite: overwrite any existing encoder_discriminator that has been + saved""" + from os import listdir + + if overwrite: + self.model.save( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras" + ) + return True + if "encoder_discriminator.keras" in listdir( + f"{self.model_base_directory}/portable/" + ): + return False + self.model.save( + f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras" + ) + return True + + def load_parameters(self): + """# Load EncoderDiscriminator Model Parameters from File + The file needs to have the following parameters defined: + - data_size: int + - autoencoder_layer_size: int + - latent_size: int + - autoencoder_depth: int + - dropout: float (set to 0. if you don't want a dropout layer) + - use_leaky_relu: boolean""" + from pickle import load + + with open( + f"{self.model_base_directory}/portable/model_parameters.pkl", "rb" + ) as phandle: + parameters = load(phandle) + self.data_size = parameters["data_size"] + self.layer_size = parameters["autoencoder_layer_size"] + self.latent_size = parameters["latent_size"] + self.depth = parameters["autoencoder_depth"] + self.dropout = parameters["dropout"] + self.use_leaky_relu = parameters["use_leaky_relu"] + + def summary(self): + """# Returns the summary of the EncoderDiscriminator model""" + return self.model.summary() + + def __call__(self, *args, **kwargs): + """# Callable + + When calling the encoder_discriminator class, return the model's + output""" + return self.model(*args, **kwargs) diff --git a/code/sunlab/sunflow/models/utilities.py b/code/sunlab/sunflow/models/utilities.py new file mode 100644 index 0000000..ab0c2a6 --- /dev/null +++ b/code/sunlab/sunflow/models/utilities.py @@ -0,0 +1,93 @@ +# Higher-level functions + +from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution +from sunlab.common.scaler.adversarial_scaler import AdversarialScaler +from sunlab.common.data.utilities import import_dataset +from .adversarial_autoencoder import AdversarialAutoencoder + + +def create_aae( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + distribution: AdversarialDistribution or None, + magnification=10, + latent_size=2, +): + """# Create Adversarial Autoencoder + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - distribution: AdversarialDistribution = Distribution for the Adversary + - magnification: int = The Magnification of the Dataset""" + dataset = import_dataset(dataset_file_name, magnification) + model = AdversarialAutoencoder( + model_directory, distribution, normalization_scaler + ).init(dataset.dataset, latent_size=latent_size) + return model + + +def create_aae_and_dataset( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + distribution: AdversarialDistribution or None, + magnification=10, + batch_size=1024, + shuffle=True, + val_split=0.1, + latent_size=2, +): + """# Create Adversarial Autoencoder and Load the Dataset + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - distribution: AdversarialDistribution = Distribution for the Adversary + - magnification: int = The Magnification of the Dataset""" + model = create_aae( + dataset_file_name, + model_directory, + normalization_scaler, + distribution, + magnification=magnification, + latent_size=latent_size, + ) + dataset = import_dataset( + dataset_file_name, + magnification, + batch_size=batch_size, + shuffle=shuffle, + val_split=val_split, + scaler=model.scaler, + ) + return model, dataset + + +def load_aae(model_directory, normalization_scaler: AdversarialScaler): + """# Load Adversarial Autoencoder + + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + """ + return AdversarialAutoencoder(model_directory, None, normalization_scaler).load() + + +def load_aae_and_dataset( + dataset_file_name, + model_directory, + normalization_scaler: AdversarialScaler, + magnification=10, +): + """# Load Adversarial Autoencoder + + - dataset_file_name: str = Path to the dataset file + - model_directory: str = Path to save the model in + - normalization_scaler: AdversarialScaler = Data normalization Scaler Model + - magnification: int = The Magnification of the Dataset""" + model = load_aae(model_directory, normalization_scaler) + dataset = import_dataset( + dataset_file_name, magnification=magnification, scaler=model.scaler + ) + return model, dataset -- cgit v1.2.1