from torch.utils.tensorboard import SummaryWriter
from codecarbon import EmissionsTracker
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal

from .profilers import ProfilerPyJoules, MeasureHook, MeasureHookLoss

[docs]class GA: def __init__(self): self.hook_register = {} self.hook_handles = [] self.named_layer = False self.model = None self.loss = None self.emission_tracker = None
[docs] def attach_model(self, model, loss=None, named_layer=True, profiler='pyjoules', disable_measurements=[]): ''' Attach a model to the GA profiler. You can add also a loss function. The profiler will then track the energy consumption of the attached model. :param model: PyTorch model you want to track. :param loss: Loss function to be passed for tracking the consumption of the loss computation. :param named_layers: Set to False to not track individual named layers. :param profiler: Typed of profiler used to track the energy consumption. Currently only 'pyjoules' is implemented. :param disable_measurements: Set which hardware components you are not intrested in tracking between cpu, ram and gpu. :type model: torch.nn.module :type loss: torch.nn.module :type named_layers: boolean :type profiler: string :type disable_measurements: list ''' if self.model is not None: raise RuntimeError('Already a model attached, use detach_model() before attaching the profiler to a new one.') self.model = model device = next(model.parameters()).device device_type = device.type device_idx = device.index if device.index else 0 if named_layer: self.named_layer = True for name, layer in model.named_children(): self._register_layer_hook(name, layer, profiler, device_type, device_idx, disable_measurements) self._register_layer_hook(model.__class__.__name__, model, profiler, device_type, device_idx, disable_measurements) if loss is not None: self.loss = loss self._register_layer_hook('loss', self.loss, profiler, device_type, device_idx, disable_measurements)
def _register_layer_hook(self, name, layer, profiler, device_type, device_idx, disable_measurements): if profiler=='pyjoules': profiler = ProfilerPyJoules(device=device_type, index=device_idx, disable=disable_measurements) if name=='loss': forward_hook = MeasureHookLoss(name, profiler) backward_hook = MeasureHook(name, profiler) else: forward_hook = MeasureHook(name, profiler) backward_hook = MeasureHook(name, profiler) self.hook_register[name] = (forward_hook, backward_hook) handle_pre_fw = layer.register_forward_pre_hook(forward_hook.pre_hook) handle_post_fw = layer.register_forward_hook(forward_hook.post_hook) handle_pre_bw = layer.register_full_backward_pre_hook(backward_hook.pre_hook) handle_post_bw = layer.register_full_backward_hook(backward_hook.post_hook) self.hook_handles.extend([handle_pre_fw, handle_post_fw, handle_pre_bw, handle_post_bw]) def _check_model_attached(self): if self.model is None: raise RuntimeError('No model attached to profiler.')
[docs] def detach_model(self): ''' Detach the current model from the GA profiler. ''' self._check_model_attached() for handle in self.hook_handles: handle.remove() self.hook_register = {} self.hook_handles = [] self.model = None
[docs] def get_full_measurements(self): ''' Get the full measurements collected by the profiler. :returns: The energy consumptions for each pass and model component. :rtype: dict ''' self._check_model_attached() measurements = {} for name, (forward_hook, backward_hook) in self.hook_register.items(): ga_fw = forward_hook.get_layer_measurements() ga_bw = backward_hook.get_layer_measurements() measurements[name] = {'forward':ga_fw, 'backward':ga_bw} return measurements
[docs] def get_mean_measurements(self): ''' Get the mean of the measurements collected by the profiler. :returns: The mean energy consumption for each model component. :rtype: dict ''' self._check_model_attached() measurements = self.get_full_measurements() for _, ly_dict in measurements.items(): ly_dict['forward'] = sum(ly_dict['forward'])/len(ly_dict['forward']) if len(ly_dict['forward'])>0 else 0 ly_dict['backward'] = sum(ly_dict['backward'])/len(ly_dict['backward']) if len(ly_dict['backward'])>0 else 0 return measurements
[docs] def get_sum_measurements(self): ''' Get the sum of the measurements collected by the profiler. :returns: The full energy consumption for each model component. :rtype: dict ''' self._check_model_attached() measurements = self.get_full_measurements() for _, ly_dict in measurements.items(): ly_dict['forward'] = sum(ly_dict['forward']) if len(ly_dict['forward'])>0 else 0 ly_dict['backward'] = sum(ly_dict['backward']) if len(ly_dict['backward'])>0 else 0 return measurements
[docs] def get_losses(self): self._check_model_attached() if self.loss is None: raise RuntimeError('Loss has not been attached to the profiler.') return self.hook_register['loss'][0].get_losses()
[docs] def to_pandas(self): ''' Convert the energy measurements into a pandas.DataFrame object. :returns: The dataframe of the energy consumption for each model component. :rtype: pandas.DataFrame ''' measurements = self.get_full_measurements() pd_dict = {} for column in measurements.keys(): pd_dict[f'{column}_forward'] = measurements[column]['forward'] pd_dict[f'{column}_backward'] = measurements[column]['backward'] return pd.DataFrame.from_dict(pd_dict, orient='index').T
[docs] def to_csv(self, filename): ''' Save the energy measurements into a csv file. :param filename: Name of the csv file. :type filename: string ''' if filename[-4:]!='.csv': filename = filename + '.csv' df = self.to_pandas() df.to_csv(filename)
[docs] def visualize_data(self, layers='all', complete_model=True, loss=False, phase='total', kind='line', smoothing=0.3, figsize=None, filename=None): ''' Generate a matplotlib plot for the energy measurements. :param layers: Pass in a list which named layers you want to display. Pass 'all' if you want to see all of them. :param complete_model: Set to False if you don't want to see the data for the complete model. :param loss: Set to True to display also the loss function data. :param phase: Select which phase to display between 'total', 'forward' or 'backward'. :param kind: Select which type of plot to generate between 'line', 'violin' or 'box'. :param smoothing: Only used for lineplots. Value between 0 and 1 used to add smoothing to the displayed data. :param figsize: Size of the figure generated. :param filename: Pass a filename if you want to save the image created. :type layers: list or string :type complete_model: boolean :type loss: boolean :type phase: string :type kind: string :type smoothing: float :type figsize: Tuple :type filename: string :returns: The matplotlib axes containing the plot. :rtype: Axes ''' df = self.to_pandas() df = df.dropna(axis=1) if not complete_model: modelname = self.model.__class__.__name__ if f'{modelname}_forward' in df.columns: df.drop(labels=[f'{modelname}_forward'], axis=1, inplace=True) if f'{modelname}_backward' in df.columns: df.drop(labels=[f'{modelname}_backward'], axis=1, inplace=True) if not loss: if f'loss_forward' in df.columns: df.drop(labels=[f'loss_forward'], axis=1, inplace=True) if f'loss_backward' in df.columns: df.drop(labels=[f'loss_backward'], axis=1, inplace=True) if layers!='all': for column, _ in self.model.named_children(): if f'{column}' not in layers and f'{column}_forward' in df.columns: df.drop(labels=[f'{column}_forward'], axis=1, inplace=True) if f'{column}' not in layers and f'{column}_backward' in df.columns: df.drop(labels=[f'{column}_backward'], axis=1, inplace=True) if phase=='total': for layer, _ in self.model.named_children(): if (layers=='all' or (layer in layers)) and self.named_layer: df[layer]=[0.0 for _ in range(len(df.index))] if f'{layer}_forward' in df.columns: df[layer] = df[layer] + df[f'{layer}_forward'] df.drop(labels=[f'{layer}_forward'], axis=1, inplace=True) if f'{layer}_backward' in df.columns: df[layer] = df[layer] + df[f'{layer}_backward'] df.drop(labels=[f'{layer}_backward'], axis=1, inplace=True) if complete_model: modelname = self.model.__class__.__name__ df[modelname]=[0.0 for _ in range(len(df.index))] if f'{modelname}_forward' in df.columns: df[modelname] = df[modelname] + df[f'{modelname}_forward'] df.drop(labels=[f'{modelname}_forward'], axis=1, inplace=True) if f'{modelname}_backward' in df.columns: df[modelname] = df[modelname] + df[f'{modelname}_backward'] df.drop(labels=[f'{modelname}_backward'], axis=1, inplace=True) if loss and self.loss is not None: df['loss']=[0.0 for _ in range(len(df.index))] if f'loss_forward' in df.columns: df['loss'] = df['loss'] + df[f'loss_forward'] df.drop(labels=[f'loss_forward'], axis=1, inplace=True) if f'loss_backward' in df.columns: df['loss'] = df['loss'] + df[f'loss_backward'] df.drop(labels=[f'loss_backward'], axis=1, inplace=True) elif phase=='forward': columns = df.columns for column in columns: if column.endswith('_backward'): df.drop(labels=[column], axis=1, inplace=True) elif phase=='backward': columns = df.columns for column in columns: if column.endswith('_forward'): df.drop(labels=[column], axis=1, inplace=True) if len(df.index)==0: return None if figsize is not None: fig, ax = plt.subplots(figsize=figsize) else: fig, ax = plt.subplots() if kind=='line': if smoothing>0: if smoothing>1: raise ValueError('smoothing can only be between 0 and 1.') for column in df: rows_count = len(df.index) window_length = ((rows_count-4)*smoothing)+4 df[column]=signal.savgol_filter(df[column], window_length, 3, mode="nearest") df.plot(xlabel='Iteration', ylabel='Energy (J)', ax=ax) elif kind=='box': df.plot(kind='box', ax=ax) ax.set_ylabel('Energy (J)') elif kind=='violin': ax.violinplot([df[column].tolist() for column in df]) ax.set_xticks([x for x in range(1,len(df.columns)+1)]) ax.set_xticklabels(df.columns) ax.set_ylabel('Energy (J)') else: raise ValueError('kind parameter can only be line, box or violin.') fig.tight_layout() if filename: fig.savefig(filename) return ax
[docs] def start_tracker_emissions(self, save_to_file=False): ''' Start a tracker for carbon emission. Implemented using codecarbon. :param save_to_file: Save the tracking results in a file called 'emissions'. :type save_to_file: boolean ''' if self.emission_tracker is not None: print(self.emission_tracker) raise RuntimeError('Emission tracker already running.') self.emission_tracker = EmissionsTracker(save_to_file=save_to_file) self.emission_tracker.start()
[docs] def stop_tracker_emissions(self): ''' Stop the tracker for carbon emission. :returns: The total emissions produced by the model since the tracker was started. :rtype: float ''' if self.emission_tracker is None: raise RuntimeError('Emission tracker is not running. Use start_tracker_emissions().') emissions: float = self.emission_tracker.stop() self.emission_tracker = None return emissions
[docs] def set_tensorboard_stats(self, writer=None, experiment=0, named_layer=True, sample_size=500): ''' Sets the tensorboard data that can be viewed through the tensorboard dashboard. :param writer: Optionally pass an already existing tensorboard writer. :param experiment: Set an identifier for the displayed data. :param named_layer: Indicate if the named layers should also be displayed in tensorboard. :param sample_size: Size of the sample size used to define a loss step. Used to compute the grap for energy per loss step. :type writer: torch.utils.tensorboard.SummaryWriter :type experiment: int :type named_layer: boolean :type sample_size: int ''' if writer is None: writer = SummaryWriter(f'runs/experiment_{experiment}') df = self.to_pandas() df = df.dropna(axis=1) if named_layer and self.named_layer: for layer, _ in self.model.named_children(): df[layer]=[0.0 for _ in range(len(df.index))] if f'{layer}_forward' in df.columns: df[layer] = df[layer] + df[f'{layer}_forward'] if f'{layer}_backward' in df.columns: df[layer] = df[layer] + df[f'{layer}_backward'] else: for column, _ in self.model.named_children(): if f'{column}_forward' in df.columns: df.drop(labels=[f'{column}_forward'], axis=1, inplace=True) if f'{column}_backward' in df.columns: df.drop(labels=[f'{column}_backward'], axis=1, inplace=True) modelname = self.model.__class__.__name__ df[modelname]=[0.0 for _ in range(len(df.index))] if f'{modelname}_forward' in df.columns: df[modelname] = df[modelname] + df[f'{modelname}_forward'] if f'{modelname}_backward' in df.columns: df[modelname] = df[modelname] + df[f'{modelname}_backward'] if self.loss is not None: df['loss']=[0.0 for _ in range(len(df.index))] if f'loss_forward' in df.columns: df['loss'] = df['loss'] + df[f'loss_forward'] if f'loss_backward' in df.columns: df['loss'] = df['loss'] + df[f'loss_backward'] if len(df.index)==0: return None fw_columns = [column for column in df.columns if column.endswith('_forward')] bw_columns = [column for column in df.columns if column.endswith('_backward')] t_columns = [column for column in df.columns if not column.endswith('_forward') and not column.endswith('_backward')] loss_column = np.array(self.get_losses()) loss_mean = np.mean(loss_column[:(len(loss_column)//sample_size)*sample_size].reshape(-1, sample_size), axis=1) loss_mean_diff = -1*np.diff(loss_mean) energy_column = df.iloc[:]['NeuralNetwork'].to_numpy() energy_sum = np.sum(energy_column[:(len(energy_column)//sample_size)*sample_size].reshape(-1, sample_size), axis=1)[1:] energy_loss_ratio = energy_sum / loss_mean_diff energy_loss_ratio[energy_loss_ratio < 0] = 0 for i in range(len(energy_loss_ratio)): writer.add_scalar('Energy per 1 Unit Loss Decrease', energy_loss_ratio[i], i) writer.add_scalar('Average Loss per Epoch', loss_mean[i], i) for i in df.index: writer.add_scalars('Energy Consumption', dict([(key, df.iloc[i][key].item()) for key in t_columns]), i) writer.add_scalars('Forward Pass', dict([(key, df.iloc[i][key].item()) for key in fw_columns]), i) if len(bw_columns)>0: writer.add_scalars('Backward Pass', dict([(key, df.iloc[i][key].item()) for key in bw_columns]), i) for layer in t_columns: writer.add_text(f'Total energy consumption: {layer}', f'{round(df[layer].sum(), 2)} J') writer.close()
[docs] def reset(self, emission_tracker=True): ''' Reset all the measurements. :param emission_tracker: Indicates if the emissions tracker needs to be reset as well. :type emission_tracker: boolean ''' self._check_model_attached() for _, (forward_hook, backward_hook) in self.hook_register.items(): forward_hook.reset_layer_measurements() backward_hook.reset_layer_measurements() if emission_tracker and self.emission_tracker is not None: self.stop_tracker_emissions()