Source code for tensiometer.synthetic_probability.lr_schedulers

"""
This file contains learning rate schedulers for tensorflow optimization.
"""

###############################################################################
# initial imports and set-up:

import numpy as np

# tensorflow imports:
import tensorflow as tf
from tensorflow.python.keras.callbacks import Callback
from tensorflow.python.platform import tf_logging as logging

###############################################################################
# Exponential decay:


class ExponentialDecayAnnealer():

    def __init__(self, start, end, roll_off_step, steps):
        self.start = start
        self.end = end
        self.roll_off_step = float(roll_off_step)
        self.steps = float(steps)
        self.n = 0

    def step(self):
        self.n += 1
        return self.start / (
            1. + (self.end / (self.start - self.end))**((self.n - self.roll_off_step) /
                                                        (self.roll_off_step - self.steps)))


[docs] class ExponentialDecayScheduler(Callback): def __init__(self, lr_max, lr_min, roll_off_step, steps): """ Exponentially decaying learning rate. :param lr_max: maximum learning rate :param lr_min: minimum earning rate :param roll_off_step: step at which the scheduler starts rolling off :param steps: total number of steps """ super(ExponentialDecayScheduler, self).__init__() self.step = 0 self.Annealer = ExponentialDecayAnnealer(lr_max, lr_min, roll_off_step, steps) self.lrs = [] def on_train_begin(self, logs=None): """ """ self.step = 0 self.set_lr(self.Annealer.start) def on_train_batch_begin(self, batch, logs=None): """ """ self.lrs.append(self.get_lr()) def on_train_batch_end(self, batch, logs=None): """ """ self.step += 1 self.set_lr(self.Annealer.step()) def get_lr(self): """ """ try: return tf.keras.backend.get_value(self.model.optimizer.lr) except AttributeError: return None def set_lr(self, lr): """ """ try: tf.keras.backend.set_value(self.model.optimizer.lr, lr) except AttributeError: pass # ignore
############################################################################### # Power law decay: class PowerLawDecayAnnealer(): def __init__(self, start, end, power, steps): self.start = start self.end = end self.p = power self.steps = float(steps) self.n = 0 def step(self): self.n += 1 return self.start / (self.n / (self.steps * (self.end / self.start)**(1 / self.p)))**self.p
[docs] class PowerLawDecayScheduler(Callback): def __init__(self, lr_max, lr_min, power, steps): """ Power law decaying learning rate. :param lr_max: maximum learning rate :param lr_min: minimum earning rate :param power: power law index :param steps: total number of steps """ super(PowerLawDecayScheduler, self).__init__() self.step = 0 self.Annealer = PowerLawDecayAnnealer(lr_max, lr_min, power, steps) self.lrs = [] def on_train_begin(self, logs=None): """ """ self.step = 0 self.set_lr(self.Annealer.start) def on_train_batch_begin(self, batch, logs=None): """ """ self.lrs.append(self.get_lr()) def on_train_batch_end(self, batch, logs=None): """ """ self.step += 1 self.set_lr(self.Annealer.step()) def get_lr(self): """ """ try: return tf.keras.backend.get_value(self.model.optimizer.lr) except AttributeError: return None def set_lr(self, lr): """ """ try: tf.keras.backend.set_value(self.model.optimizer.lr, lr) except AttributeError: pass # ignore
############################################################################### # Step decay: class StepDecayAnnealer(): def __init__(self, start=None, change_every=None, steps=None, steps_per_epoch=None, boundaries=None, values=None): self.start = start self.steps_per_epoch = steps_per_epoch if boundaries is not None: self.boundaries = boundaries self.values = values else: self.ch = change_every total_changes = int(steps / self.ch) self.end = self.start / (10**total_changes) self.steps = float(steps) self.boundaries = [self.ch * i for i in range(1, total_changes)] self.values = [self.start / (10**i) for i in range(0, total_changes)] self.n = 0 def step(self): self.n += 1 for i in range(len(self.boundaries)): if self.n < self.boundaries[i] * self.steps_per_epoch: return self.values[i] else: pass return self.values[-1]
[docs] class StepDecayScheduler(Callback): def __init__(self, lr_max=None, change_every=None, steps=None, steps_per_epoch=None, boundaries=None, values=None): """ Learning rate step function changes. :param lr_max: :param change_every: :param steps: :param steps_per_epoch: :param boundaries: :param values: """ super(StepDecayScheduler, self).__init__() self.step = 0 self.Annealer = StepDecayAnnealer(lr_max, change_every, steps, steps_per_epoch, boundaries, values) self.lrs = [] def on_train_begin(self, logs=None): """ """ self.step = 0 self.set_lr(self.Annealer.start) def on_train_batch_begin(self, batch, logs=None): """ """ self.lrs.append(self.get_lr()) def on_train_batch_end(self, batch, logs=None): """ """ self.step += 1 self.set_lr(self.Annealer.step()) def get_lr(self): """ """ try: return tf.keras.backend.get_value(self.model.optimizer.lr) except AttributeError: return None def set_lr(self, lr): """ """ try: tf.keras.backend.set_value(self.model.optimizer.lr, lr) except AttributeError: pass # ignore
############################################################################### # Adaptive learning rate (loss slope) and early stopping:
[docs] class LRAdaptLossSlopeEarlyStop(Callback): def __init__( self, monitor="val_loss", factor=1. / np.sqrt(10.), patience=25, cooldown=10, verbose=0, min_lr=1e-5, threshold=0.0, **kwargs, ): """ Adaptive reduction of learning rate when likelihood improvement stalls for a given number of epochs. :param monitor: :param factor: :param patience: :param cooldown: :param verbose: :param min_lr: """ super().__init__() self.monitor = monitor if factor >= 1.0: raise ValueError("LRAdaptLossSlopeEarlyStop does not support a factor >= 1.0. Got {factor}") self.factor = factor self.min_lr = min_lr self.patience = patience self.verbose = verbose self.cooldown = cooldown self.threshold = threshold self._reset() def _reset(self): self.cooldown_counter = 0 self.wait = 0 self.last_losses = [] def on_train_begin(self, logs=None): """ """ self._reset() def on_epoch_end(self, epoch, logs=None): """ """ logs = logs or {} logs["lr"] = tf.keras.backend.get_value(self.model.optimizer.lr) current = logs.get(self.monitor) if current is None: logging.warning( "Learning rate reduction is conditioned on metric `%s` " "which is not available. Available metrics are: %s", self.monitor, ",".join(list(logs.keys())), ) else: if self.cooldown_counter > 0: self.cooldown_counter -= 1 self.wait = 0 else: self.last_losses.append(current) self.wait += 1 if self.wait >= self.patience: a = np.polyfit(np.arange(self.patience), self.last_losses[-self.patience:], 1)[0] # fits a line to `val_loss` in the last `patience` epochs if a > self.threshold: # tests if val_loss is going up old_lr = tf.keras.backend.get_value(self.model.optimizer.lr) if old_lr > np.float32(self.min_lr): new_lr = old_lr * self.factor new_lr = max(new_lr, self.min_lr) tf.keras.backend.set_value(self.model.optimizer.lr, new_lr) if self.verbose > 0: tf.print( f"\nEpoch {epoch +1}: " "LRAdaptLossSlopeEarlyStop reducing " f"learning rate to {new_lr}.") self.cooldown_counter = self.cooldown self.wait = 0 self.last_losses = [] else: self.model.stop_training = True