aboutsummaryrefslogtreecommitdiff
path: root/code/sunlab/sunflow
diff options
context:
space:
mode:
authorChristian C <cc@localhost>2024-11-11 12:29:32 -0800
committerChristian C <cc@localhost>2024-11-11 12:29:32 -0800
commitb85ee9d64a536937912544c7bbd5b98b635b7e8d (patch)
treecef7bc17d7b29f40fc6b1867d0ce0a742d5583d0 /code/sunlab/sunflow
Initial commit
Diffstat (limited to 'code/sunlab/sunflow')
-rw-r--r--code/sunlab/sunflow/__init__.py6
-rw-r--r--code/sunlab/sunflow/data/__init__.py1
-rw-r--r--code/sunlab/sunflow/data/utilities.py53
-rw-r--r--code/sunlab/sunflow/models/__init__.py7
-rw-r--r--code/sunlab/sunflow/models/adversarial_autoencoder.py344
-rw-r--r--code/sunlab/sunflow/models/autoencoder.py85
-rw-r--r--code/sunlab/sunflow/models/decoder.py127
-rw-r--r--code/sunlab/sunflow/models/discriminator.py132
-rw-r--r--code/sunlab/sunflow/models/encoder.py140
-rw-r--r--code/sunlab/sunflow/models/encoder_discriminator.py96
-rw-r--r--code/sunlab/sunflow/models/utilities.py93
-rw-r--r--code/sunlab/sunflow/plotting/__init__.py1
-rw-r--r--code/sunlab/sunflow/plotting/model_extensions.py289
13 files changed, 1374 insertions, 0 deletions
diff --git a/code/sunlab/sunflow/__init__.py b/code/sunlab/sunflow/__init__.py
new file mode 100644
index 0000000..6e0c959
--- /dev/null
+++ b/code/sunlab/sunflow/__init__.py
@@ -0,0 +1,6 @@
+from ..common import *
+
+from .models import *
+
+from .data import *
+from .plotting import *
diff --git a/code/sunlab/sunflow/data/__init__.py b/code/sunlab/sunflow/data/__init__.py
new file mode 100644
index 0000000..b9a32c0
--- /dev/null
+++ b/code/sunlab/sunflow/data/__init__.py
@@ -0,0 +1 @@
+from .utilities import *
diff --git a/code/sunlab/sunflow/data/utilities.py b/code/sunlab/sunflow/data/utilities.py
new file mode 100644
index 0000000..dcdc36e
--- /dev/null
+++ b/code/sunlab/sunflow/data/utilities.py
@@ -0,0 +1,53 @@
+from sunlab.common import ShapeDataset
+from sunlab.common import MaxAbsScaler
+
+
+def process_and_load_dataset(
+ dataset_file, model_folder, magnification=10, scaler=MaxAbsScaler
+):
+ """# Load a dataset and process a models' Latent Space on the Dataset"""
+ from ..models import load_aae
+ from sunlab.common import import_full_dataset
+
+ model = load_aae(model_folder, normalization_scaler=scaler)
+ dataset = import_full_dataset(
+ dataset_file, magnification=magnification, scaler=model.scaler
+ )
+ latent = model.encoder(dataset.dataset).numpy()
+ assert len(latent.shape) == 2, "Only 1D Latent Vectors Supported"
+ for dim in range(latent.shape[1]):
+ dataset.dataframe[f"Latent-{dim}"] = latent[:, dim]
+ return dataset
+
+
+def process_and_load_datasets(
+ dataset_file_list, model_folder, magnification=10, scaler=MaxAbsScaler
+):
+ from pandas import concat
+ from ..models import load_aae
+
+ dataframes = []
+ datasets = []
+ for dataset_file in dataset_file_list:
+ dataset = process_and_load_dataset(
+ dataset_file, model_folder, magnification, scaler
+ )
+ model = load_aae(model_folder, normalization_scaler=scaler)
+ dataframe = dataset.dataframe
+ for label in ["ActinEdge", "Filopodia", "Bleb", "Lamellipodia"]:
+ if label in dataframe.columns:
+ dataframe[label.lower()] = dataframe[label]
+ if label.lower() not in dataframe.columns:
+ dataframe[label.lower()] = 0
+ latent_columns = [f"Latent-{dim}" for dim in range(model.latent_size)]
+ datasets.append(dataset)
+ dataframes.append(
+ dataframe[
+ dataset.data_columns
+ + dataset.label_columns
+ + latent_columns
+ + ["Frames", "CellNum"]
+ + ["actinedge", "filopodia", "bleb", "lamellipodia"]
+ ]
+ )
+ return datasets, concat(dataframes)
diff --git a/code/sunlab/sunflow/models/__init__.py b/code/sunlab/sunflow/models/__init__.py
new file mode 100644
index 0000000..f111c0a
--- /dev/null
+++ b/code/sunlab/sunflow/models/__init__.py
@@ -0,0 +1,7 @@
+from .autoencoder import Autoencoder
+from .adversarial_autoencoder import AdversarialAutoencoder
+from sunlab.common.data.dataset import Dataset
+from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution
+from sunlab.common.scaler.adversarial_scaler import AdversarialScaler
+from .utilities import create_aae, create_aae_and_dataset
+from .utilities import load_aae, load_aae_and_dataset
diff --git a/code/sunlab/sunflow/models/adversarial_autoencoder.py b/code/sunlab/sunflow/models/adversarial_autoencoder.py
new file mode 100644
index 0000000..4cbb2f8
--- /dev/null
+++ b/code/sunlab/sunflow/models/adversarial_autoencoder.py
@@ -0,0 +1,344 @@
+from sunlab.common.data.dataset import Dataset
+from sunlab.common.scaler.adversarial_scaler import AdversarialScaler
+from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution
+from .encoder import Encoder
+from .decoder import Decoder
+from .discriminator import Discriminator
+from .encoder_discriminator import EncoderDiscriminator
+from .autoencoder import Autoencoder
+from tensorflow.keras import optimizers, metrics, losses
+import tensorflow as tf
+from numpy import ones, zeros, float32, NaN
+
+
+class AdversarialAutoencoder:
+ """# Adversarial Autoencoder
+ - distribution: The distribution used by the adversary to learn on"""
+
+ def __init__(
+ self,
+ model_base_directory,
+ distribution: AdversarialDistribution or None = None,
+ scaler: AdversarialScaler or None = None,
+ ):
+ """# Adversarial Autoencoder Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded
+ - distribution: The distribution the adversary will use
+ - scaler: The scaling function the model will assume on the data"""
+ self.model_base_directory = model_base_directory
+ if distribution is not None:
+ self.distribution = distribution
+ else:
+ self.distribution = None
+ if scaler is not None:
+ self.scaler = scaler(self.model_base_directory)
+ else:
+ self.scaler = None
+
+ def init(
+ self,
+ data=None,
+ data_size=13,
+ autoencoder_layer_size=16,
+ adversary_layer_size=8,
+ latent_size=2,
+ autoencoder_depth=2,
+ dropout=0.0,
+ use_leaky_relu=False,
+ **kwargs,
+ ):
+ """# Initialize AAE model parameters
+ - data_size: int
+ - autoencoder_layer_size: int
+ - adversary_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float
+ - use_leaky_relu: boolean"""
+ self.data_size = data_size
+ self.autoencoder_layer_size = autoencoder_layer_size
+ self.adversary_layer_size = adversary_layer_size
+ self.latent_size = latent_size
+ self.autoencoder_depth = autoencoder_depth
+ self.dropout = dropout
+ self.use_leaky_relu = use_leaky_relu
+ self.save_parameters()
+ self.encoder = Encoder(self.model_base_directory).init()
+ self.decoder = Decoder(self.model_base_directory).init()
+ self.autoencoder = Autoencoder(self.model_base_directory).init(
+ self.encoder, self.decoder
+ )
+ self.discriminator = Discriminator(self.model_base_directory).init()
+ self.encoder_discriminator = EncoderDiscriminator(
+ self.model_base_directory
+ ).init(self.encoder, self.discriminator)
+ if self.distribution is not None:
+ self.distribution = self.distribution(self.latent_size)
+ if (data is not None) and (self.scaler is not None):
+ self.scaler = self.scaler.init(data)
+ self.init_optimizers_and_metrics(**kwargs)
+ return self
+
+ def init_optimizers_and_metrics(
+ self,
+ optimizer=optimizers.Adam,
+ ae_metric=metrics.MeanAbsoluteError,
+ adv_metric=metrics.BinaryCrossentropy,
+ ae_lr=7e-4,
+ adv_lr=3e-4,
+ loss_fn=losses.BinaryCrossentropy,
+ **kwargs,
+ ):
+ """# Set the optimizer, loss function, and metrics"""
+ self.ae_optimizer = optimizer(learning_rate=ae_lr)
+ self.adv_optimizer = optimizer(learning_rate=adv_lr)
+ self.gan_optimizer = optimizer(learning_rate=adv_lr)
+ self.train_ae_metric = ae_metric()
+ self.val_ae_metric = ae_metric()
+ self.train_adv_metric = adv_metric()
+ self.val_adv_metric = adv_metric()
+ self.train_gan_metric = adv_metric()
+ self.val_gan_metric = adv_metric()
+ self.loss_fn = loss_fn()
+
+ def load(self):
+ """# Load the models from their respective files"""
+ self.load_parameters()
+ self.encoder = Encoder(self.model_base_directory).load()
+ self.decoder = Decoder(self.model_base_directory).load()
+ self.autoencoder = Autoencoder(self.model_base_directory).load()
+ self.discriminator = Discriminator(self.model_base_directory).load()
+ self.encoder_discriminator = EncoderDiscriminator(
+ self.model_base_directory
+ ).load()
+ if self.scaler is not None:
+ self.scaler = self.scaler.load()
+ return self
+
+ def save(self, overwrite=False):
+ """# Save each model in the AAE"""
+ self.encoder.save(overwrite=overwrite)
+ self.decoder.save(overwrite=overwrite)
+ self.autoencoder.save(overwrite=overwrite)
+ self.discriminator.save(overwrite=overwrite)
+ self.encoder_discriminator.save(overwrite=overwrite)
+ if self.scaler is not None:
+ self.scaler.save()
+
+ def save_parameters(self):
+ """# Save the AAE parameters in a file"""
+ from pickle import dump
+ from os import makedirs
+
+ makedirs(self.model_base_directory + "/portable/", exist_ok=True)
+ parameters = {
+ "data_size": self.data_size,
+ "autoencoder_layer_size": self.autoencoder_layer_size,
+ "adversary_layer_size": self.adversary_layer_size,
+ "latent_size": self.latent_size,
+ "autoencoder_depth": self.autoencoder_depth,
+ "dropout": self.dropout,
+ "use_leaky_relu": self.use_leaky_relu,
+ }
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "wb"
+ ) as phandle:
+ dump(parameters, phandle)
+
+ def load_parameters(self):
+ """# Load the AAE parameters from a file"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.autoencoder_layer_size = parameters["autoencoder_layer_size"]
+ self.adversary_layer_size = parameters["adversary_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.autoencoder_depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+ return parameters
+
+ def summary(self):
+ """# Summarize each model in the AAE"""
+ self.encoder.summary()
+ self.decoder.summary()
+ self.autoencoder.summary()
+ self.discriminator.summary()
+ self.encoder_discriminator.summary()
+
+ @tf.function
+ def train_step(self, x, y):
+ """# Training Step
+
+ 1. Train the Autoencoder
+ 2. (If distribution is given) Train the discriminator
+ 3. (If the distribution is given) Train the encoder_discriminator"""
+ # Autoencoder Training
+ with tf.GradientTape() as tape:
+ decoded_vector = self.autoencoder(x, training=True)
+ ae_loss_value = self.loss_fn(y, decoded_vector)
+ grads = tape.gradient(ae_loss_value, self.autoencoder.model.trainable_weights)
+ self.ae_optimizer.apply_gradients(
+ zip(grads, self.autoencoder.model.trainable_weights)
+ )
+ self.train_ae_metric.update_state(y, decoded_vector)
+ if self.distribution is not None:
+ # Adversary Trainig
+ with tf.GradientTape() as tape:
+ latent_vector = self.encoder(x)
+ fakepred = self.distribution(x.shape[0])
+ discbatch_x = tf.concat([latent_vector, fakepred], axis=0)
+ discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0)
+ adversary_vector = self.discriminator(discbatch_x, training=True)
+ adv_loss_value = self.loss_fn(discbatch_y, adversary_vector)
+ grads = tape.gradient(
+ adv_loss_value, self.discriminator.model.trainable_weights
+ )
+ self.adv_optimizer.apply_gradients(
+ zip(grads, self.discriminator.model.trainable_weights)
+ )
+ self.train_adv_metric.update_state(discbatch_y, adversary_vector)
+ # Gan Training
+ with tf.GradientTape() as tape:
+ gan_vector = self.encoder_discriminator(x, training=True)
+ adv_vector = tf.convert_to_tensor(ones((x.shape[0], 1), dtype=float32))
+ gan_loss_value = self.loss_fn(gan_vector, adv_vector)
+ grads = tape.gradient(gan_loss_value, self.encoder.model.trainable_weights)
+ self.gan_optimizer.apply_gradients(
+ zip(grads, self.encoder.model.trainable_weights)
+ )
+ self.train_gan_metric.update_state(adv_vector, gan_vector)
+ return (ae_loss_value, adv_loss_value, gan_loss_value)
+ return (ae_loss_value, None, None)
+
+ @tf.function
+ def test_step(self, x, y):
+ """# Test Step - On validation data
+
+ 1. Evaluate the Autoencoder
+ 2. (If distribution is given) Evaluate the discriminator
+ 3. (If the distribution is given) Evaluate the encoder_discriminator"""
+ val_decoded_vector = self.autoencoder(x, training=False)
+ self.val_ae_metric.update_state(y, val_decoded_vector)
+
+ if self.distribution is not None:
+ latent_vector = self.encoder(x)
+ fakepred = self.distribution(x.shape[0])
+ discbatch_x = tf.concat([latent_vector, fakepred], axis=0)
+ discbatch_y = tf.concat([zeros(x.shape[0]), ones(x.shape[0])], axis=0)
+ adversary_vector = self.discriminator(discbatch_x, training=False)
+ self.val_adv_metric.update_state(discbatch_y, adversary_vector)
+
+ gan_vector = self.encoder_discriminator(x, training=False)
+ self.val_gan_metric.update_state(ones(x.shape[0]), gan_vector)
+
+ # Garbage Collect at the end of each epoch
+ def on_epoch_end(self, _epoch, logs=None):
+ """# Cleanup environment to prevent memory leaks each epoch"""
+ import gc
+ from tensorflow.keras import backend as k
+
+ gc.collect()
+ k.clear_session()
+
+ def train(
+ self,
+ dataset: Dataset,
+ epoch_count: int = 1,
+ output=False,
+ output_freq=1,
+ fmt="%i[%.3f]: %.2e %.2e %.2e %.2e %.2e %.2e",
+ ):
+ """# Train the model on a dataset
+
+ - dataset: ataset = Dataset to train the model on, which as the
+ training and validation iterators set up
+ - epoch_count: int = The number of epochs to train
+ - output: boolean = Whether or not to output training information
+ - output_freq: int = The number of epochs between each output"""
+ from time import time
+ from numpy import array as narray
+
+ def fmtter(x):
+ return x if x is not None else -1
+
+ epoch_data = []
+ dataset.reset_iterators()
+
+ self.test_step(dataset.dataset, dataset.dataset)
+ val_ae = self.val_ae_metric.result()
+ val_adv = self.val_adv_metric.result()
+ val_gan = self.val_gan_metric.result()
+ self.val_ae_metric.reset_states()
+ self.val_adv_metric.reset_states()
+ self.val_gan_metric.reset_states()
+ print(
+ fmt
+ % (
+ 0,
+ NaN,
+ val_ae,
+ fmtter(val_adv),
+ fmtter(val_gan),
+ NaN,
+ NaN,
+ NaN,
+ )
+ )
+ for epoch in range(epoch_count):
+ start_time = time()
+
+ for step, (x_batch_train, y_batch_train) in enumerate(dataset.training):
+ ae_lv, adv_lv, gan_lv = self.train_step(x_batch_train, x_batch_train)
+
+ train_ae = self.train_ae_metric.result()
+ train_adv = self.train_adv_metric.result()
+ train_gan = self.train_gan_metric.result()
+ self.train_ae_metric.reset_states()
+ self.train_adv_metric.reset_states()
+ self.train_gan_metric.reset_states()
+
+ for step, (x_batch_val, y_batch_val) in enumerate(dataset.validation):
+ self.test_step(x_batch_val, x_batch_val)
+
+ val_ae = self.val_ae_metric.result()
+ val_adv = self.val_adv_metric.result()
+ val_gan = self.val_gan_metric.result()
+ self.val_ae_metric.reset_states()
+ self.val_adv_metric.reset_states()
+ self.val_gan_metric.reset_states()
+
+ epoch_data.append(
+ (
+ epoch,
+ train_ae,
+ val_ae,
+ fmtter(train_adv),
+ fmtter(val_adv),
+ fmtter(train_gan),
+ fmtter(val_gan),
+ )
+ )
+ if output and (epoch + 1) % output_freq == 0:
+ print(
+ fmt
+ % (
+ epoch + 1,
+ time() - start_time,
+ train_ae,
+ fmtter(train_adv),
+ fmtter(train_gan),
+ val_ae,
+ fmtter(val_adv),
+ fmtter(val_gan),
+ )
+ )
+ self.on_epoch_end(epoch)
+ dataset.reset_iterators()
+ return narray(epoch_data)
diff --git a/code/sunlab/sunflow/models/autoencoder.py b/code/sunlab/sunflow/models/autoencoder.py
new file mode 100644
index 0000000..473d00d
--- /dev/null
+++ b/code/sunlab/sunflow/models/autoencoder.py
@@ -0,0 +1,85 @@
+class Autoencoder:
+ """# Autoencoder Model
+
+ Constructs an encoder-decoder model"""
+
+ def __init__(self, model_base_directory):
+ """# Autoencoder Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded"""
+ self.model_base_directory = model_base_directory
+
+ def init(self, encoder, decoder):
+ """# Initialize an Autoencoder
+
+ - encoder: The encoder to use
+ - decoder: The decoder to use"""
+ from tensorflow import keras
+
+ self.load_parameters()
+ self.model = keras.models.Sequential()
+ self.model.add(encoder.model)
+ self.model.add(decoder.model)
+ self.model._name = "Autoencoder"
+ return self
+
+ def load(self):
+ """# Load an existing Autoencoder"""
+ from os import listdir
+
+ if "autoencoder.keras" not in listdir(f"{self.model_base_directory}/portable/"):
+ return None
+ import tensorflow as tf
+
+ self.model = tf.keras.models.load_model(
+ f"{self.model_base_directory}/portable/autoencoder.keras", compile=False
+ )
+ self.model._name = "Autoencoder"
+ return self
+
+ def save(self, overwrite=False):
+ """# Save the current Autoencoder
+
+ - Overwrite: overwrite any existing autoencoder that has been saved"""
+ from os import listdir
+
+ if overwrite:
+ self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras")
+ return True
+ if "autoencoder.keras" in listdir(f"{self.model_base_directory}/portable/"):
+ return False
+ self.model.save(f"{self.model_base_directory}/portable/autoencoder.keras")
+ return True
+
+ def load_parameters(self):
+ """# Load Autoencoder Model Parameters from File
+ The file needs to have the following parameters defined:
+ - data_size: int
+ - autoencoder_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float (set to 0. if you don't want a dropout layer)
+ - use_leaky_relu: boolean"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.layer_size = parameters["autoencoder_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+
+ def summary(self):
+ """# Returns the summary of the Autoencoder model"""
+ return self.model.summary()
+
+ def __call__(self, *args, **kwargs):
+ """# Callable
+
+ When calling the autoencoder class, return the model's output"""
+ return self.model(*args, **kwargs)
diff --git a/code/sunlab/sunflow/models/decoder.py b/code/sunlab/sunflow/models/decoder.py
new file mode 100644
index 0000000..40ea190
--- /dev/null
+++ b/code/sunlab/sunflow/models/decoder.py
@@ -0,0 +1,127 @@
+class Decoder:
+ """# Decoder Model
+
+ Constructs a decoder model with a certain depth of intermediate layers of
+ fixed size"""
+
+ def __init__(self, model_base_directory):
+ """# Decoder Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded"""
+ self.model_base_directory = model_base_directory
+
+ def init(self):
+ """# Initialize a new Decoder
+
+ Expects a model parameters file to already exist in the initialization
+ base directory when initializing the model"""
+ from tensorflow import keras
+ from tensorflow.keras import layers
+
+ self.load_parameters()
+ assert self.depth >= 0, "Depth must be non-negative"
+ self.model = keras.models.Sequential()
+ if self.depth == 0:
+ self.model.add(
+ layers.Dense(
+ self.data_size,
+ input_shape=(self.latent_size,),
+ activation=None,
+ name="decoder_latent_vector",
+ )
+ )
+ else:
+ self.model.add(
+ layers.Dense(
+ self.layer_size,
+ input_shape=(self.latent_size,),
+ activation=None,
+ name="decoder_dense_1",
+ )
+ )
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ for _d in range(1, self.depth):
+ self.model.add(
+ layers.Dense(
+ self.layer_size, activation=None, name=f"decoder_dense_{_d+1}"
+ )
+ )
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ self.model.add(
+ layers.Dense(
+ self.data_size, activation=None, name="decoder_output_vector"
+ )
+ )
+ self.model._name = "Decoder"
+ return self
+
+ def load(self):
+ """# Load an existing Decoder"""
+ from os import listdir
+
+ if "decoder.keras" not in listdir(f"{self.model_base_directory}/portable/"):
+ return None
+ import tensorflow as tf
+
+ self.model = tf.keras.models.load_model(
+ f"{self.model_base_directory}/portable/decoder.keras", compile=False
+ )
+ self.model._name = "Decoder"
+ return self
+
+ def save(self, overwrite=False):
+ """# Save the current Decoder
+
+ - Overwrite: overwrite any existing decoder that has been saved"""
+ from os import listdir
+
+ if overwrite:
+ self.model.save(f"{self.model_base_directory}/portable/decoder.keras")
+ return True
+ if "decoder.keras" in listdir(f"{self.model_base_directory}/portable/"):
+ return False
+ self.model.save(f"{self.model_base_directory}/portable/decoder.keras")
+ return True
+
+ def load_parameters(self):
+ """# Load Decoder Model Parameters from File
+ The file needs to have the following parameters defined:
+ - data_size: int
+ - autoencoder_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float (set to 0. if you don't want a dropout layer)
+ - use_leaky_relu: boolean"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.layer_size = parameters["autoencoder_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+
+ def summary(self):
+ """# Returns the summary of the Decoder model"""
+ return self.model.summary()
+
+ def __call__(self, *args, **kwargs):
+ """# Callable
+
+ When calling the decoder class, return the model's output"""
+ return self.model(*args, **kwargs)
diff --git a/code/sunlab/sunflow/models/discriminator.py b/code/sunlab/sunflow/models/discriminator.py
new file mode 100644
index 0000000..38bed56
--- /dev/null
+++ b/code/sunlab/sunflow/models/discriminator.py
@@ -0,0 +1,132 @@
+class Discriminator:
+ """# Discriminator Model
+
+ Constructs a discriminator model with a certain depth of intermediate
+ layers of fixed size"""
+
+ def __init__(self, model_base_directory):
+ """# Discriminator Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded"""
+ self.model_base_directory = model_base_directory
+
+ def init(self):
+ """# Initialize a new Discriminator
+
+ Expects a model parameters file to already exist in the initialization
+ base directory when initializing the model"""
+ from tensorflow import keras
+ from tensorflow.keras import layers
+
+ self.load_parameters()
+ assert self.depth >= 0, "Depth must be non-negative"
+ self.model = keras.models.Sequential()
+ if self.depth == 0:
+ self.model.add(
+ layers.Dense(
+ 1,
+ input_shape=(self.latent_size,),
+ activation=None,
+ name="discriminator_output_vector",
+ )
+ )
+ else:
+ self.model.add(
+ layers.Dense(
+ self.layer_size,
+ input_shape=(self.latent_size,),
+ activation=None,
+ name="discriminator_dense_1",
+ )
+ )
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ for _d in range(1, self.depth):
+ self.model.add(
+ layers.Dense(
+ self.layer_size,
+ activation=None,
+ name=f"discriminator_dense_{_d+1}",
+ )
+ )
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ self.model.add(
+ layers.Dense(
+ 1, activation="sigmoid", name="discriminator_output_vector"
+ )
+ )
+ self.model._name = "Discriminator"
+ return self
+
+ def load(self):
+ """# Load an existing Discriminator"""
+ from os import listdir
+
+ if "discriminator.keras" not in listdir(
+ f"{self.model_base_directory}/portable/"
+ ):
+ return None
+ import tensorflow as tf
+
+ self.model = tf.keras.models.load_model(
+ f"{self.model_base_directory}/portable/discriminator.keras", compile=False
+ )
+ self.model._name = "Discriminator"
+ return self
+
+ def save(self, overwrite=False):
+ """# Save the current Discriminator
+
+ - Overwrite: overwrite any existing discriminator that has been
+ saved"""
+ from os import listdir
+
+ if overwrite:
+ self.model.save(f"{self.model_base_directory}/portable/discriminator.keras")
+ return True
+ if "discriminator.keras" in listdir(f"{self.model_base_directory}/portable/"):
+ return False
+ self.model.save(f"{self.model_base_directory}/portable/discriminator.keras")
+ return True
+
+ def load_parameters(self):
+ """# Load Discriminator Model Parameters from File
+ The file needs to have the following parameters defined:
+ - data_size: int
+ - adversary_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float (set to 0. if you don't want a dropout layer)
+ - use_leaky_relu: boolean"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.layer_size = parameters["adversary_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+
+ def summary(self):
+ """# Returns the summary of the Discriminator model"""
+ return self.model.summary()
+
+ def __call__(self, *args, **kwargs):
+ """# Callable
+
+ When calling the discriminator class, return the model's output"""
+ return self.model(*args, **kwargs)
diff --git a/code/sunlab/sunflow/models/encoder.py b/code/sunlab/sunflow/models/encoder.py
new file mode 100644
index 0000000..22d1a9a
--- /dev/null
+++ b/code/sunlab/sunflow/models/encoder.py
@@ -0,0 +1,140 @@
+class Encoder:
+ """# Encoder Model
+
+ Constructs an encoder model with a certain depth of intermediate layers of
+ fixed size"""
+
+ def __init__(self, model_base_directory):
+ """# Encoder Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded"""
+ self.model_base_directory = model_base_directory
+
+ def init(self):
+ """# Initialize a new Encoder
+
+ Expects a model parameters file to already exist in the initialization
+ base directory when initializing the model"""
+ from tensorflow import keras
+ from tensorflow.keras import layers
+
+ # Load in the model parameters
+ self.load_parameters()
+ assert self.depth >= 0, "Depth must be non-negative"
+
+ # Create the model
+ self.model = keras.models.Sequential()
+ # At zero depth, connect input and output layer directly
+ if self.depth == 0:
+ self.model.add(
+ layers.Dense(
+ self.latent_size,
+ input_shape=(self.data_size,),
+ activation=None,
+ name="encoder_latent_vector",
+ )
+ )
+ # Otherwise, add fixed-sized layers between them
+ else:
+ self.model.add(
+ layers.Dense(
+ self.layer_size,
+ input_shape=(self.data_size,),
+ activation=None,
+ name="encoder_dense_1",
+ )
+ )
+ # Use LeakyReLU if specified
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ # Include a droput layer if specified
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ for _d in range(1, self.depth):
+ self.model.add(
+ layers.Dense(
+ self.layer_size, activation=None, name=f"encoder_dense_{_d+1}"
+ )
+ )
+ # Use LeakyReLU if specified
+ if self.use_leaky_relu:
+ self.model.add(layers.LeakyReLU())
+ else:
+ self.model.add(layers.ReLU())
+ # Include a droput layer if specified
+ if self.dropout > 0.0:
+ self.model.add(layers.Dropout(self.dropout))
+ self.model.add(
+ layers.Dense(
+ self.latent_size, activation=None, name="encoder_latent_vector"
+ )
+ )
+ self.model._name = "Encoder"
+ return self
+
+ def load(self):
+ """# Load an existing Encoder"""
+ from os import listdir
+
+ # If the encoder is not found, return None
+ if "encoder.keras" not in listdir(f"{self.model_base_directory}/portable/"):
+ return None
+ # Otherwise, load the encoder
+ # compile=False suppresses warnings about training
+ # If you want to train it, you will need to recompile it
+ import tensorflow as tf
+
+ self.model = tf.keras.models.load_model(
+ f"{self.model_base_directory}/portable/encoder.keras", compile=False
+ )
+ self.model._name = "Encoder"
+ return self
+
+ def save(self, overwrite=False):
+ """# Save the current Encoder
+
+ - Overwrite: overwrite any existing encoder that has been saved"""
+ from os import listdir
+
+ if overwrite:
+ self.model.save(f"{self.model_base_directory}/portable/encoder.keras")
+ return True
+ if "encoder.keras" in listdir(f"{self.model_base_directory}/portable/"):
+ return False
+ self.model.save(f"{self.model_base_directory}/portable/encoder.keras")
+ return True
+
+ def load_parameters(self):
+ """# Load Encoder Model Parameters from File
+ The file needs to have the following parameters defined:
+ - data_size: int
+ - autoencoder_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float (set to 0. if you don't want a dropout layer)
+ - use_leaky_relu: boolean"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.layer_size = parameters["autoencoder_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+
+ def summary(self):
+ """# Returns the summary of the Encoder model"""
+ return self.model.summary()
+
+ def __call__(self, *args, **kwargs):
+ """# Callable
+
+ When calling the encoder class, return the model's output"""
+ return self.model(*args, **kwargs)
diff --git a/code/sunlab/sunflow/models/encoder_discriminator.py b/code/sunlab/sunflow/models/encoder_discriminator.py
new file mode 100644
index 0000000..5efb6af
--- /dev/null
+++ b/code/sunlab/sunflow/models/encoder_discriminator.py
@@ -0,0 +1,96 @@
+class EncoderDiscriminator:
+ """# EncoderDiscriminator Model
+
+ Constructs an encoder-discriminator model"""
+
+ def __init__(self, model_base_directory):
+ """# EncoderDiscriminator Model Initialization
+
+ - model_base_directory: The base folder directory where the model will
+ be saved/ loaded"""
+ self.model_base_directory = model_base_directory
+
+ def init(self, encoder, discriminator):
+ """# Initialize a EncoderDiscriminator
+
+ - encoder: The encoder to use
+ - discriminator: The discriminator to use"""
+ from tensorflow import keras
+
+ self.load_parameters()
+ self.model = keras.models.Sequential()
+ self.model.add(encoder.model)
+ self.model.add(discriminator.model)
+ self.model._name = "EncoderDiscriminator"
+ return self
+
+ def load(self):
+ """# Load an existing EncoderDiscriminator"""
+ from os import listdir
+
+ if "encoder_discriminator.keras" not in listdir(
+ f"{self.model_base_directory}/portable/"
+ ):
+ return None
+ import tensorflow as tf
+
+ self.model = tf.keras.models.load_model(
+ f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras",
+ compile=False,
+ )
+ self.model._name = "EncoderDiscriminator"
+ return self
+
+ def save(self, overwrite=False):
+ """# Save the current EncoderDiscriminator
+
+ - Overwrite: overwrite any existing encoder_discriminator that has been
+ saved"""
+ from os import listdir
+
+ if overwrite:
+ self.model.save(
+ f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras"
+ )
+ return True
+ if "encoder_discriminator.keras" in listdir(
+ f"{self.model_base_directory}/portable/"
+ ):
+ return False
+ self.model.save(
+ f"{self.model_base_directory}/portable/encoder_discriminator" + ".keras"
+ )
+ return True
+
+ def load_parameters(self):
+ """# Load EncoderDiscriminator Model Parameters from File
+ The file needs to have the following parameters defined:
+ - data_size: int
+ - autoencoder_layer_size: int
+ - latent_size: int
+ - autoencoder_depth: int
+ - dropout: float (set to 0. if you don't want a dropout layer)
+ - use_leaky_relu: boolean"""
+ from pickle import load
+
+ with open(
+ f"{self.model_base_directory}/portable/model_parameters.pkl", "rb"
+ ) as phandle:
+ parameters = load(phandle)
+ self.data_size = parameters["data_size"]
+ self.layer_size = parameters["autoencoder_layer_size"]
+ self.latent_size = parameters["latent_size"]
+ self.depth = parameters["autoencoder_depth"]
+ self.dropout = parameters["dropout"]
+ self.use_leaky_relu = parameters["use_leaky_relu"]
+
+ def summary(self):
+ """# Returns the summary of the EncoderDiscriminator model"""
+ return self.model.summary()
+
+ def __call__(self, *args, **kwargs):
+ """# Callable
+
+ When calling the encoder_discriminator class, return the model's
+ output"""
+ return self.model(*args, **kwargs)
diff --git a/code/sunlab/sunflow/models/utilities.py b/code/sunlab/sunflow/models/utilities.py
new file mode 100644
index 0000000..ab0c2a6
--- /dev/null
+++ b/code/sunlab/sunflow/models/utilities.py
@@ -0,0 +1,93 @@
+# Higher-level functions
+
+from sunlab.common.distribution.adversarial_distribution import AdversarialDistribution
+from sunlab.common.scaler.adversarial_scaler import AdversarialScaler
+from sunlab.common.data.utilities import import_dataset
+from .adversarial_autoencoder import AdversarialAutoencoder
+
+
+def create_aae(
+ dataset_file_name,
+ model_directory,
+ normalization_scaler: AdversarialScaler,
+ distribution: AdversarialDistribution or None,
+ magnification=10,
+ latent_size=2,
+):
+ """# Create Adversarial Autoencoder
+
+ - dataset_file_name: str = Path to the dataset file
+ - model_directory: str = Path to save the model in
+ - normalization_scaler: AdversarialScaler = Data normalization Scaler Model
+ - distribution: AdversarialDistribution = Distribution for the Adversary
+ - magnification: int = The Magnification of the Dataset"""
+ dataset = import_dataset(dataset_file_name, magnification)
+ model = AdversarialAutoencoder(
+ model_directory, distribution, normalization_scaler
+ ).init(dataset.dataset, latent_size=latent_size)
+ return model
+
+
+def create_aae_and_dataset(
+ dataset_file_name,
+ model_directory,
+ normalization_scaler: AdversarialScaler,
+ distribution: AdversarialDistribution or None,
+ magnification=10,
+ batch_size=1024,
+ shuffle=True,
+ val_split=0.1,
+ latent_size=2,
+):
+ """# Create Adversarial Autoencoder and Load the Dataset
+
+ - dataset_file_name: str = Path to the dataset file
+ - model_directory: str = Path to save the model in
+ - normalization_scaler: AdversarialScaler = Data normalization Scaler Model
+ - distribution: AdversarialDistribution = Distribution for the Adversary
+ - magnification: int = The Magnification of the Dataset"""
+ model = create_aae(
+ dataset_file_name,
+ model_directory,
+ normalization_scaler,
+ distribution,
+ magnification=magnification,
+ latent_size=latent_size,
+ )
+ dataset = import_dataset(
+ dataset_file_name,
+ magnification,
+ batch_size=batch_size,
+ shuffle=shuffle,
+ val_split=val_split,
+ scaler=model.scaler,
+ )
+ return model, dataset
+
+
+def load_aae(model_directory, normalization_scaler: AdversarialScaler):
+ """# Load Adversarial Autoencoder
+
+ - model_directory: str = Path to save the model in
+ - normalization_scaler: AdversarialScaler = Data normalization Scaler Model
+ """
+ return AdversarialAutoencoder(model_directory, None, normalization_scaler).load()
+
+
+def load_aae_and_dataset(
+ dataset_file_name,
+ model_directory,
+ normalization_scaler: AdversarialScaler,
+ magnification=10,
+):
+ """# Load Adversarial Autoencoder
+
+ - dataset_file_name: str = Path to the dataset file
+ - model_directory: str = Path to save the model in
+ - normalization_scaler: AdversarialScaler = Data normalization Scaler Model
+ - magnification: int = The Magnification of the Dataset"""
+ model = load_aae(model_directory, normalization_scaler)
+ dataset = import_dataset(
+ dataset_file_name, magnification=magnification, scaler=model.scaler
+ )
+ return model, dataset
diff --git a/code/sunlab/sunflow/plotting/__init__.py b/code/sunlab/sunflow/plotting/__init__.py
new file mode 100644
index 0000000..36e00e6
--- /dev/null
+++ b/code/sunlab/sunflow/plotting/__init__.py
@@ -0,0 +1 @@
+from .model_extensions import *
diff --git a/code/sunlab/sunflow/plotting/model_extensions.py b/code/sunlab/sunflow/plotting/model_extensions.py
new file mode 100644
index 0000000..087f8d3
--- /dev/null
+++ b/code/sunlab/sunflow/plotting/model_extensions.py
@@ -0,0 +1,289 @@
+from matplotlib import pyplot as plt
+from sunlab.common.data.shape_dataset import ShapeDataset
+from sunlab.globals import DIR_ROOT
+
+
+def get_nonphysical_masks(
+ model,
+ xrange=[-1, 1],
+ yrange=[-1, 1],
+ bins=[500, 500],
+ equivdiameter_threshold=10,
+ solidity_threshold=0.1,
+ area_threshold=100,
+ perimeter_threshold=10,
+ area_max_threshold=7000,
+ perimeter_max_threshold=350,
+ area_min_threshold=100,
+ perimeter_min_threshold=5,
+ consistency_check=False,
+):
+ """# Generate the Nonphysical Masks in Grid for Model
+
+ Hard Constraints:
+ - Non-negative values
+ - Ratios no greater than 1
+
+ Soft Constraints:
+ - Area/ Perimeter Thresholds"""
+ import numpy as np
+
+ x = np.linspace(xrange[0], xrange[1], bins[0])
+ y = np.linspace(yrange[0], yrange[1], bins[1])
+ X, Y = np.meshgrid(x, y)
+ X, Y = X.reshape((bins[0], bins[1], 1)), Y.reshape((bins[0], bins[1], 1))
+ XY = np.concatenate([X.reshape((-1, 1)), Y.reshape((-1, 1))], axis=-1)
+ dec_v = model.decoder(XY).numpy().reshape((bins[0] * bins[1], 13))
+ lXY = model.scaler.scaler.inverse_transform(dec_v).reshape((bins[0], bins[1], 13))
+ # Hard Limits
+ non_negative_mask = np.all(lXY > 0, axis=-1)
+ solidity_mask = np.abs(lXY[:, :, 6]) <= 1
+ extent_upper_bound_mask = lXY[:, :, 7] <= 1
+ # Soft Extremas
+ area_max_mask = lXY[:, :, 4] < area_max_threshold
+ perimeter_max_mask = lXY[:, :, 9] < perimeter_max_threshold
+ area_min_mask = lXY[:, :, 4] > area_min_threshold
+ perimeter_min_mask = lXY[:, :, 9] > perimeter_min_threshold
+ # Self-Consistency
+ man_solidity_mask = np.abs(lXY[:, :, 0] / lXY[:, :, 4]) <= 1
+ equivalent_diameter_mask = (
+ np.abs(lXY[:, :, 5] - np.sqrt(4 * np.abs(lXY[:, :, 0]) / np.pi))
+ < equivdiameter_threshold
+ )
+ convex_area_mask = lXY[:, :, 0] < lXY[:, :, 4] + area_threshold
+ convex_perimeter_mask = lXY[:, :, 9] < lXY[:, :, 8] + perimeter_threshold
+ mask_info = {
+ "non-negative": non_negative_mask,
+ "solidity": solidity_mask,
+ "extent-max": extent_upper_bound_mask,
+ #
+ "area-max": area_max_mask,
+ "perimeter-max": perimeter_max_mask,
+ "area-min": area_min_mask,
+ "perimeter-min": perimeter_min_mask,
+ #
+ "computed-solidity": man_solidity_mask,
+ "equivalent-diameter": equivalent_diameter_mask,
+ "convex-area": convex_area_mask,
+ "convex-perimeter": convex_perimeter_mask,
+ }
+ if not consistency_check:
+ mask_info = {
+ "non-negative": non_negative_mask,
+ "solidity": solidity_mask,
+ "extent-max": extent_upper_bound_mask,
+ #
+ "area-max": area_max_mask,
+ "perimeter-max": perimeter_max_mask,
+ "area-min": area_min_mask,
+ "perimeter-min": perimeter_min_mask,
+ }
+ mask_list = [mask_info[key] for key in mask_info.keys()]
+ return np.all(mask_list, axis=0), X, Y, mask_info
+
+
+def excavate(input_2d_array):
+ """# Return Boundaries for Masked Array
+
+ Use X, Y directions only"""
+ from copy import deepcopy as dc
+ from numpy import nan_to_num, zeros_like, abs
+
+ data_2d_array = dc(input_2d_array)
+ data_2d_array = nan_to_num(data_2d_array, nan=20)
+ # X-Gradient
+ x_grad = zeros_like(data_2d_array)
+ x_grad[:-1, :] = data_2d_array[1:, :] - data_2d_array[:-1, :]
+ x_grad[(abs(x_grad) > 10)] = 10
+ x_grad[(abs(x_grad) < 10) & (abs(x_grad) > 0)] = 1
+ x_grad[x_grad == 1] = 0.5
+ x_grad[x_grad > 1] = 1
+ # Y-Gradient
+ y_grad = zeros_like(data_2d_array)
+ y_grad[:, :-1] = data_2d_array[:, 1:] - data_2d_array[:, :-1]
+ y_grad[(abs(y_grad) > 10)] = 10
+ y_grad[(abs(y_grad) < 10) & (abs(y_grad) > 0)] = 1
+ y_grad[y_grad == 1] = 0.5
+ y_grad[y_grad > 1] = 1
+ return x_grad, y_grad
+
+
+def excavate_extra(input_2d_array, N=1):
+ """# Return Boundaries for Masked Array
+
+ Use all 8 directions"""
+ from copy import deepcopy as dc
+ from numpy import nan_to_num, zeros_like, abs
+
+ data_2d_array = dc(input_2d_array)
+ data_2d_array = nan_to_num(data_2d_array, nan=20)
+ # X-Gradient
+ x_grad = zeros_like(data_2d_array)
+ x_grad[:-N, :] = data_2d_array[N:, :] - data_2d_array[:-N, :]
+ x_grad[(abs(x_grad) > 10)] = 10
+ x_grad[(abs(x_grad) < 10) & (abs(x_grad) > 0)] = 1
+ x_grad[x_grad == 1] = 0.5
+ x_grad[x_grad > 1] = 1
+ # Y-Gradient
+ y_grad = zeros_like(data_2d_array)
+ y_grad[:, :-N] = data_2d_array[:, N:] - data_2d_array[:, :-N]
+ y_grad[(abs(y_grad) > 10)] = 10
+ y_grad[(abs(y_grad) < 10) & (abs(y_grad) > 0)] = 1
+ y_grad[y_grad == 1] = 0.5
+ y_grad[y_grad > 1] = 1
+ # XY-Gradient
+ xy_grad = zeros_like(data_2d_array)
+ xy_grad[:-N, :-N] = data_2d_array[N:, N:] - data_2d_array[:-N, :-N]
+ xy_grad[(abs(xy_grad) > 10)] = 10
+ xy_grad[(abs(xy_grad) < 10) & (abs(xy_grad) > 0)] = 1
+ xy_grad[xy_grad == 1] = 0.5
+ xy_grad[xy_grad > 1] = 1
+ # X(-Y)-Gradient
+ yx_grad = zeros_like(data_2d_array)
+ yx_grad[:-N, :-N] = data_2d_array[N:, :-N] - data_2d_array[:-N, N:]
+ yx_grad[(abs(yx_grad) > 10)] = 10
+ yx_grad[(abs(yx_grad) < 10) & (abs(yx_grad) > 0)] = 1
+ yx_grad[yx_grad == 1] = 0.5
+ yx_grad[yx_grad > 1] = 1
+ xyn_grad = dc(yx_grad)
+ # (-X)Y-Gradient
+ xny_grad = zeros_like(data_2d_array)
+ xny_grad[:-N, :-N] = data_2d_array[:-N, N:] - data_2d_array[N:, :-N]
+ xny_grad[(abs(xy_grad) > 10)] = 10
+ xny_grad[(abs(xy_grad) < 10) & (abs(xy_grad) > 0)] = 1
+ xny_grad[xy_grad == 1] = 0.5
+ xny_grad[xy_grad > 1] = 1
+ # (-X)(-Y)-Gradient
+ xnyn_grad = zeros_like(data_2d_array)
+ xnyn_grad[:-N, :-N] = data_2d_array[:-N, :-N] - data_2d_array[N:, N:]
+ xnyn_grad[(abs(yx_grad) > 10)] = 10
+ xnyn_grad[(abs(yx_grad) < 10) & (abs(yx_grad) > 0)] = 1
+ xnyn_grad[yx_grad == 1] = 0.5
+ xnyn_grad[yx_grad > 1] = 1
+ return x_grad, y_grad, xy_grad, xyn_grad, xny_grad, xnyn_grad
+
+
+def excavate_outline(arr, thickness=1):
+ """# Generate Transparency Mask with NaNs"""
+ from numpy import sum, abs, NaN
+
+ outline = sum(abs(excavate_extra(arr, thickness)), axis=0)
+ outline[outline == 0] = NaN
+ outline[outline > 0] = 0
+ return outline
+
+
+def get_boundary_outline(
+ aae_model_object,
+ pixel_classification_file=None,
+ include_transition_regions=False,
+ border_thickness=3,
+ bin_count=800,
+ xrange=[-6.5, 6.5],
+ yrange=[-4.5, 4.5],
+ threshold=0.75,
+):
+ """# Get Boundary Outlines"""
+ from copy import deepcopy
+ import numpy as np
+
+ if pixel_classification_file is None:
+ pixel_classification_file = "../../extra_data/PhenotypePixels_65x45_800.npy"
+ base_classification = np.loadtxt(pixel_classification_file)
+ base_classification = base_classification.reshape((bin_count, bin_count, 4))
+ max_classification_probability = np.zeros((bin_count, bin_count, 1))
+ max_classification_probability[:, :, 0] = (
+ np.max(base_classification, axis=-1) < threshold
+ )
+ classes_with_include_transition_regions = np.concatenate(
+ [base_classification, max_classification_probability], axis=-1
+ )
+ if include_transition_regions:
+ phenotype_probabilities = deepcopy(
+ np.argsort(classes_with_include_transition_regions[:, :, :], axis=-1)[
+ :, :, -1
+ ]
+ ).astype(np.float32)
+ else:
+ phenotype_probabilities = deepcopy(
+ np.argsort(classes_with_include_transition_regions[:, :, :-1], axis=-1)[
+ :, :, -1
+ ]
+ ).astype(np.float32)
+ nonphysical_mask, _, _, _ = get_nonphysical_masks(
+ aae_model_object, xrange=xrange, yrange=yrange, bins=[bin_count, bin_count]
+ )
+ nonphysical_mask = nonphysical_mask.astype(np.float32)
+ nonphysical_mask[nonphysical_mask == 0] = np.NaN
+ nonphysical_mask[nonphysical_mask == 1] = 0
+ nonphysical_mask = nonphysical_mask.T
+ phenotype_regions = deepcopy(phenotype_probabilities.T + nonphysical_mask.T)
+ outline = excavate_outline(phenotype_regions, border_thickness)
+ return outline
+
+
+def apply_boundary(
+ model_loc=DIR_ROOT + "models/current_model/",
+ border_thickness=3,
+ include_transition_regions=False,
+ threshold=0.7,
+ alpha=1,
+ _plt=None,
+):
+ """# Apply Boundary to Plot
+
+ Use Pregenerated Boundary by Default for Speed"""
+ from ..models import load_aae
+ from sunlab.common.scaler import MaxAbsScaler
+ import numpy as np
+
+ if _plt is None:
+ _plt = plt
+ if (model_loc == model_loc) and (border_thickness == 3) and (threshold == 0.7):
+ XYM = np.load(DIR_ROOT + "extra_data/OutlineXYM.npy")
+ XY = XYM[:2, :, :]
+ if include_transition_regions:
+ outline = XYM[3, :, :]
+ else:
+ outline = XYM[2, :, :]
+ _plt.pcolor(XY[0, :, :], XY[1, :, :], outline, cmap="gray", alpha=alpha)
+ return
+ model = load_aae(model_loc, MaxAbsScaler)
+ bin_count = 800
+ xrange = [-6.5, 6.5]
+ yrange = [-4.5, 4.5]
+ rng = [xrange, yrange]
+ X = np.linspace(rng[0][0], rng[0][1], bin_count)
+ Y = np.linspace(rng[1][0], rng[1][1], bin_count)
+ XY = np.array(np.meshgrid(X, Y))
+ kwparams = {
+ "bin_count": bin_count,
+ "xrange": xrange,
+ "yrange": yrange,
+ }
+
+ include_tregions = include_transition_regions
+ outline = get_boundary_outline(
+ model,
+ border_thickness=border_thickness,
+ include_transition_regions=include_tregions,
+ threshold=threshold,
+ **kwparams
+ )
+ _plt.pcolor(XY[0, :, :], XY[1, :, :], outline, cmap="gray", alpha=alpha)
+
+
+plt.apply_boundary = apply_boundary
+
+
+def plot_shape_dataset(self, model, *args, **kwargs):
+ """# Plot Shape Dataset"""
+ if self.labels is None:
+ plt.scatter2d(model.encoder(self.dataset), *args, **kwargs)
+ else:
+ plt.scatter2d(model.encoder(self.dataset), self.labels, *args, **kwargs)
+
+
+ShapeDataset.plot = lambda model, *args, **kwargs: plot_shape_dataset(
+ model, *args, **kwargs
+)