from torch.utils.tensorboard import SummaryWriter
from codecarbon import EmissionsTracker
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from .profilers import ProfilerPyJoules, MeasureHook, MeasureHookLoss
[docs]class GA:
def __init__(self):
self.hook_register = {}
self.hook_handles = []
self.named_layer = False
self.model = None
self.loss = None
self.emission_tracker = None
[docs] def attach_model(self, model, loss=None, named_layer=True, profiler='pyjoules', disable_measurements=[]):
'''
Attach a model to the GA profiler. You can add also a loss function.
The profiler will then track the energy consumption of the attached model.
:param model: PyTorch model you want to track.
:param loss: Loss function to be passed for tracking the consumption of the loss computation.
:param named_layers: Set to False to not track individual named layers.
:param profiler: Typed of profiler used to track the energy consumption. Currently only 'pyjoules' is implemented.
:param disable_measurements: Set which hardware components you are not intrested in tracking between cpu, ram and gpu.
:type model: torch.nn.module
:type loss: torch.nn.module
:type named_layers: boolean
:type profiler: string
:type disable_measurements: list
'''
if self.model is not None:
raise RuntimeError('Already a model attached, use detach_model() before attaching the profiler to a new one.')
self.model = model
device = next(model.parameters()).device
device_type = device.type
device_idx = device.index if device.index else 0
if named_layer:
self.named_layer = True
for name, layer in model.named_children():
self._register_layer_hook(name, layer, profiler, device_type, device_idx, disable_measurements)
self._register_layer_hook(model.__class__.__name__, model, profiler, device_type, device_idx, disable_measurements)
if loss is not None:
self.loss = loss
self._register_layer_hook('loss', self.loss, profiler, device_type, device_idx, disable_measurements)
def _register_layer_hook(self, name, layer, profiler, device_type, device_idx, disable_measurements):
if profiler=='pyjoules':
profiler = ProfilerPyJoules(device=device_type, index=device_idx, disable=disable_measurements)
if name=='loss':
forward_hook = MeasureHookLoss(name, profiler)
backward_hook = MeasureHook(name, profiler)
else:
forward_hook = MeasureHook(name, profiler)
backward_hook = MeasureHook(name, profiler)
self.hook_register[name] = (forward_hook, backward_hook)
handle_pre_fw = layer.register_forward_pre_hook(forward_hook.pre_hook)
handle_post_fw = layer.register_forward_hook(forward_hook.post_hook)
handle_pre_bw = layer.register_full_backward_pre_hook(backward_hook.pre_hook)
handle_post_bw = layer.register_full_backward_hook(backward_hook.post_hook)
self.hook_handles.extend([handle_pre_fw, handle_post_fw, handle_pre_bw, handle_post_bw])
def _check_model_attached(self):
if self.model is None:
raise RuntimeError('No model attached to profiler.')
[docs] def detach_model(self):
'''
Detach the current model from the GA profiler.
'''
self._check_model_attached()
for handle in self.hook_handles:
handle.remove()
self.hook_register = {}
self.hook_handles = []
self.model = None
[docs] def get_full_measurements(self):
'''
Get the full measurements collected by the profiler.
:returns: The energy consumptions for each pass and model component.
:rtype: dict
'''
self._check_model_attached()
measurements = {}
for name, (forward_hook, backward_hook) in self.hook_register.items():
ga_fw = forward_hook.get_layer_measurements()
ga_bw = backward_hook.get_layer_measurements()
measurements[name] = {'forward':ga_fw, 'backward':ga_bw}
return measurements
[docs] def get_mean_measurements(self):
'''
Get the mean of the measurements collected by the profiler.
:returns: The mean energy consumption for each model component.
:rtype: dict
'''
self._check_model_attached()
measurements = self.get_full_measurements()
for _, ly_dict in measurements.items():
ly_dict['forward'] = sum(ly_dict['forward'])/len(ly_dict['forward']) if len(ly_dict['forward'])>0 else 0
ly_dict['backward'] = sum(ly_dict['backward'])/len(ly_dict['backward']) if len(ly_dict['backward'])>0 else 0
return measurements
[docs] def get_sum_measurements(self):
'''
Get the sum of the measurements collected by the profiler.
:returns: The full energy consumption for each model component.
:rtype: dict
'''
self._check_model_attached()
measurements = self.get_full_measurements()
for _, ly_dict in measurements.items():
ly_dict['forward'] = sum(ly_dict['forward']) if len(ly_dict['forward'])>0 else 0
ly_dict['backward'] = sum(ly_dict['backward']) if len(ly_dict['backward'])>0 else 0
return measurements
[docs] def get_losses(self):
self._check_model_attached()
if self.loss is None:
raise RuntimeError('Loss has not been attached to the profiler.')
return self.hook_register['loss'][0].get_losses()
[docs] def to_pandas(self):
'''
Convert the energy measurements into a pandas.DataFrame object.
:returns: The dataframe of the energy consumption for each model component.
:rtype: pandas.DataFrame
'''
measurements = self.get_full_measurements()
pd_dict = {}
for column in measurements.keys():
pd_dict[f'{column}_forward'] = measurements[column]['forward']
pd_dict[f'{column}_backward'] = measurements[column]['backward']
return pd.DataFrame.from_dict(pd_dict, orient='index').T
[docs] def to_csv(self, filename):
'''
Save the energy measurements into a csv file.
:param filename: Name of the csv file.
:type filename: string
'''
if filename[-4:]!='.csv':
filename = filename + '.csv'
df = self.to_pandas()
df.to_csv(filename)
[docs] def visualize_data(self, layers='all', complete_model=True, loss=False, phase='total', kind='line', smoothing=0.3, figsize=None, filename=None):
'''
Generate a matplotlib plot for the energy measurements.
:param layers: Pass in a list which named layers you want to display. Pass 'all' if you want to see all of them.
:param complete_model: Set to False if you don't want to see the data for the complete model.
:param loss: Set to True to display also the loss function data.
:param phase: Select which phase to display between 'total', 'forward' or 'backward'.
:param kind: Select which type of plot to generate between 'line', 'violin' or 'box'.
:param smoothing: Only used for lineplots. Value between 0 and 1 used to add smoothing to the displayed data.
:param figsize: Size of the figure generated.
:param filename: Pass a filename if you want to save the image created.
:type layers: list or string
:type complete_model: boolean
:type loss: boolean
:type phase: string
:type kind: string
:type smoothing: float
:type figsize: Tuple
:type filename: string
:returns: The matplotlib axes containing the plot.
:rtype: Axes
'''
df = self.to_pandas()
df = df.dropna(axis=1)
if not complete_model:
modelname = self.model.__class__.__name__
if f'{modelname}_forward' in df.columns:
df.drop(labels=[f'{modelname}_forward'], axis=1, inplace=True)
if f'{modelname}_backward' in df.columns:
df.drop(labels=[f'{modelname}_backward'], axis=1, inplace=True)
if not loss:
if f'loss_forward' in df.columns:
df.drop(labels=[f'loss_forward'], axis=1, inplace=True)
if f'loss_backward' in df.columns:
df.drop(labels=[f'loss_backward'], axis=1, inplace=True)
if layers!='all':
for column, _ in self.model.named_children():
if f'{column}' not in layers and f'{column}_forward' in df.columns:
df.drop(labels=[f'{column}_forward'], axis=1, inplace=True)
if f'{column}' not in layers and f'{column}_backward' in df.columns:
df.drop(labels=[f'{column}_backward'], axis=1, inplace=True)
if phase=='total':
for layer, _ in self.model.named_children():
if (layers=='all' or (layer in layers)) and self.named_layer:
df[layer]=[0.0 for _ in range(len(df.index))]
if f'{layer}_forward' in df.columns:
df[layer] = df[layer] + df[f'{layer}_forward']
df.drop(labels=[f'{layer}_forward'], axis=1, inplace=True)
if f'{layer}_backward' in df.columns:
df[layer] = df[layer] + df[f'{layer}_backward']
df.drop(labels=[f'{layer}_backward'], axis=1, inplace=True)
if complete_model:
modelname = self.model.__class__.__name__
df[modelname]=[0.0 for _ in range(len(df.index))]
if f'{modelname}_forward' in df.columns:
df[modelname] = df[modelname] + df[f'{modelname}_forward']
df.drop(labels=[f'{modelname}_forward'], axis=1, inplace=True)
if f'{modelname}_backward' in df.columns:
df[modelname] = df[modelname] + df[f'{modelname}_backward']
df.drop(labels=[f'{modelname}_backward'], axis=1, inplace=True)
if loss and self.loss is not None:
df['loss']=[0.0 for _ in range(len(df.index))]
if f'loss_forward' in df.columns:
df['loss'] = df['loss'] + df[f'loss_forward']
df.drop(labels=[f'loss_forward'], axis=1, inplace=True)
if f'loss_backward' in df.columns:
df['loss'] = df['loss'] + df[f'loss_backward']
df.drop(labels=[f'loss_backward'], axis=1, inplace=True)
elif phase=='forward':
columns = df.columns
for column in columns:
if column.endswith('_backward'):
df.drop(labels=[column], axis=1, inplace=True)
elif phase=='backward':
columns = df.columns
for column in columns:
if column.endswith('_forward'):
df.drop(labels=[column], axis=1, inplace=True)
if len(df.index)==0:
return None
if figsize is not None:
fig, ax = plt.subplots(figsize=figsize)
else:
fig, ax = plt.subplots()
if kind=='line':
if smoothing>0:
if smoothing>1:
raise ValueError('smoothing can only be between 0 and 1.')
for column in df:
rows_count = len(df.index)
window_length = ((rows_count-4)*smoothing)+4
df[column]=signal.savgol_filter(df[column], window_length, 3, mode="nearest")
df.plot(xlabel='Iteration', ylabel='Energy (J)', ax=ax)
elif kind=='box':
df.plot(kind='box', ax=ax)
ax.set_ylabel('Energy (J)')
elif kind=='violin':
ax.violinplot([df[column].tolist() for column in df])
ax.set_xticks([x for x in range(1,len(df.columns)+1)])
ax.set_xticklabels(df.columns)
ax.set_ylabel('Energy (J)')
else:
raise ValueError('kind parameter can only be line, box or violin.')
fig.tight_layout()
if filename:
fig.savefig(filename)
return ax
[docs] def start_tracker_emissions(self, save_to_file=False):
'''
Start a tracker for carbon emission. Implemented using codecarbon.
:param save_to_file: Save the tracking results in a file called 'emissions'.
:type save_to_file: boolean
'''
if self.emission_tracker is not None:
print(self.emission_tracker)
raise RuntimeError('Emission tracker already running.')
self.emission_tracker = EmissionsTracker(save_to_file=save_to_file)
self.emission_tracker.start()
[docs] def stop_tracker_emissions(self):
'''
Stop the tracker for carbon emission.
:returns: The total emissions produced by the model since the tracker was started.
:rtype: float
'''
if self.emission_tracker is None:
raise RuntimeError('Emission tracker is not running. Use start_tracker_emissions().')
emissions: float = self.emission_tracker.stop()
self.emission_tracker = None
return emissions
[docs] def set_tensorboard_stats(self, writer=None, experiment=0, named_layer=True, sample_size=500):
'''
Sets the tensorboard data that can be viewed through the tensorboard dashboard.
:param writer: Optionally pass an already existing tensorboard writer.
:param experiment: Set an identifier for the displayed data.
:param named_layer: Indicate if the named layers should also be displayed in tensorboard.
:param sample_size: Size of the sample size used to define a loss step. Used to compute the grap for energy per loss step.
:type writer: torch.utils.tensorboard.SummaryWriter
:type experiment: int
:type named_layer: boolean
:type sample_size: int
'''
if writer is None:
writer = SummaryWriter(f'runs/experiment_{experiment}')
df = self.to_pandas()
df = df.dropna(axis=1)
if named_layer and self.named_layer:
for layer, _ in self.model.named_children():
df[layer]=[0.0 for _ in range(len(df.index))]
if f'{layer}_forward' in df.columns:
df[layer] = df[layer] + df[f'{layer}_forward']
if f'{layer}_backward' in df.columns:
df[layer] = df[layer] + df[f'{layer}_backward']
else:
for column, _ in self.model.named_children():
if f'{column}_forward' in df.columns:
df.drop(labels=[f'{column}_forward'], axis=1, inplace=True)
if f'{column}_backward' in df.columns:
df.drop(labels=[f'{column}_backward'], axis=1, inplace=True)
modelname = self.model.__class__.__name__
df[modelname]=[0.0 for _ in range(len(df.index))]
if f'{modelname}_forward' in df.columns:
df[modelname] = df[modelname] + df[f'{modelname}_forward']
if f'{modelname}_backward' in df.columns:
df[modelname] = df[modelname] + df[f'{modelname}_backward']
if self.loss is not None:
df['loss']=[0.0 for _ in range(len(df.index))]
if f'loss_forward' in df.columns:
df['loss'] = df['loss'] + df[f'loss_forward']
if f'loss_backward' in df.columns:
df['loss'] = df['loss'] + df[f'loss_backward']
if len(df.index)==0:
return None
fw_columns = [column for column in df.columns if column.endswith('_forward')]
bw_columns = [column for column in df.columns if column.endswith('_backward')]
t_columns = [column for column in df.columns if not column.endswith('_forward') and not column.endswith('_backward')]
loss_column = np.array(self.get_losses())
loss_mean = np.mean(loss_column[:(len(loss_column)//sample_size)*sample_size].reshape(-1, sample_size), axis=1)
loss_mean_diff = -1*np.diff(loss_mean)
energy_column = df.iloc[:]['NeuralNetwork'].to_numpy()
energy_sum = np.sum(energy_column[:(len(energy_column)//sample_size)*sample_size].reshape(-1, sample_size), axis=1)[1:]
energy_loss_ratio = energy_sum / loss_mean_diff
energy_loss_ratio[energy_loss_ratio < 0] = 0
for i in range(len(energy_loss_ratio)):
writer.add_scalar('Energy per 1 Unit Loss Decrease', energy_loss_ratio[i], i)
writer.add_scalar('Average Loss per Epoch', loss_mean[i], i)
for i in df.index:
writer.add_scalars('Energy Consumption', dict([(key, df.iloc[i][key].item()) for key in t_columns]), i)
writer.add_scalars('Forward Pass', dict([(key, df.iloc[i][key].item()) for key in fw_columns]), i)
if len(bw_columns)>0:
writer.add_scalars('Backward Pass', dict([(key, df.iloc[i][key].item()) for key in bw_columns]), i)
for layer in t_columns:
writer.add_text(f'Total energy consumption: {layer}', f'{round(df[layer].sum(), 2)} J')
writer.close()
[docs] def reset(self, emission_tracker=True):
'''
Reset all the measurements.
:param emission_tracker: Indicates if the emissions tracker needs to be reset as well.
:type emission_tracker: boolean
'''
self._check_model_attached()
for _, (forward_hook, backward_hook) in self.hook_register.items():
forward_hook.reset_layer_measurements()
backward_hook.reset_layer_measurements()
if emission_tracker and self.emission_tracker is not None:
self.stop_tracker_emissions()