#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
import os
import re
import pickle
from copy import copy
import sys
import inspect
import platform
import numpy as np
from time import time as now
from NumPyNet.layers.activation_layer import Activation_layer
from NumPyNet.layers.avgpool_layer import Avgpool_layer
from NumPyNet.layers.batchnorm_layer import BatchNorm_layer
from NumPyNet.layers.connected_layer import Connected_layer
from NumPyNet.layers.convolutional_layer import Convolutional_layer
from NumPyNet.layers.cost_layer import Cost_layer
from NumPyNet.layers.dropout_layer import Dropout_layer
from NumPyNet.layers.input_layer import Input_layer
from NumPyNet.layers.l1norm_layer import L1Norm_layer
from NumPyNet.layers.l2norm_layer import L2Norm_layer
from NumPyNet.layers.logistic_layer import Logistic_layer
from NumPyNet.layers.lstm_layer import LSTM_layer
from NumPyNet.layers.maxpool_layer import Maxpool_layer
from NumPyNet.layers.rnn_layer import RNN_layer
from NumPyNet.layers.route_layer import Route_layer
from NumPyNet.layers.shortcut_layer import Shortcut_layer
from NumPyNet.layers.shuffler_layer import Shuffler_layer
from NumPyNet.layers.simple_rnn_layer import SimpleRNN_layer
from NumPyNet.layers.softmax_layer import Softmax_layer
from NumPyNet.layers.upsample_layer import Upsample_layer
from NumPyNet.layers.yolo_layer import Yolo_layer
from NumPyNet.optimizer import Optimizer
from NumPyNet.parser import net_config
from NumPyNet.exception import DataVariableError
from NumPyNet.exception import LayerError
from NumPyNet.exception import MetricsError
from NumPyNet.exception import NetworkError
from NumPyNet.utils import _redirect_stdout
__author__ = ['Mattia Ceccarelli', 'Nico Curti']
__email__ = ['mattia.ceccarelli3@studio.unibo.it', 'nico.curti2@unibo.it']
CRLF = '\r\x1B[K' if platform.system() != 'Windows' else '\r'
[docs]class Network(object):
'''
Neural Network object
Parameters
----------
batch : int
Batch size
input_shape : tuple
Input dimensions
train : bool (default=True)
Turn on/off the parameters tuning
Notes
-----
.. warning::
Up to now the trainable variable is useless since the layer
doesn't take it into account!
'''
LAYERS = {'activation' : Activation_layer,
'avgpool' : Avgpool_layer,
'batchnorm' : BatchNorm_layer,
'connected' : Connected_layer,
'convolutional' : Convolutional_layer,
'cost' : Cost_layer,
'dropout' : Dropout_layer,
'input' : Input_layer,
'l1norm' : L1Norm_layer,
'l2norm' : L2Norm_layer,
'logistic' : Logistic_layer,
'lstm' : LSTM_layer,
'maxpool' : Maxpool_layer,
'rnn' : RNN_layer,
'route' : Route_layer,
'shortcut' : Shortcut_layer,
'shuffler' : Shuffler_layer,
'simplernn' : SimpleRNN_layer,
'softmax' : Softmax_layer,
'upsample' : Upsample_layer,
'yolo' : Yolo_layer,
}
def __init__(self, batch, input_shape=None, train=True):
self.batch = batch
self.train = train
if input_shape is not None:
try:
self.w, self.h, self.c = input_shape
except:
raise ValueError('Network model : incorrect input_shape. Expected a 3D array (width, height, channel). Given {}'.format(input_shape))
self._net = [ Input_layer(input_shape=(self.batch, self.w, self.h, self.c)) ]
else:
self._net = []
self.metrics = None
self._fitted = False
[docs] def add(self, layer):
'''
Add a new layer to the network model.
Layers are progressively appended to the tail of the model.
Parameters
----------
layer : Layer object
Layer object to append to the current architecture
Returns
-------
self
Notes
-----
.. note::
If the architecture is empty a default InputLayer is used
to start the model.
.. warning::
The input layer type must be one of the types stored into the
LAYERS dict, otherwise a LayerError is raised.
'''
try:
type_layer = layer.__class__.__name__.lower().split('_layer')[0]
except:
raise LayerError('Incorrect Layer type found. Given {}'.format(type_layer.__class__.__name__))
if type_layer not in self.LAYERS.keys():
raise LayerError('Incorrect Layer type found.')
if type_layer == 'input':
self._net.append(layer)
elif type_layer == 'route':
prev_layers = []
for idx in layer.input_layers:
prev_layers.append(self._net[idx]) # i need layers' info to init route
self._net.append(layer(prev_layers))
else:
self._net.append(layer(self._net[-1]))
return self
def __iter__(self):
'''
Start the iteration along the architecture
'''
self.layer_index = 0
return self
def __next__(self):
'''
Get the next layer
'''
if self.layer_index < self.num_layers:
self.layer_index += 1
return self._net[self.layer_index - 1]
else:
raise StopIteration
[docs] def next(self):
'''
Get the next layer
Notes
-----
This should fix python2* problems with __iter__ and __next__
'''
return self.__next__()
[docs] def summary(self):
'''
Print the network model summary
Returns
-------
None
'''
print('layer filters size input output')
for i, layer in enumerate(self._net):
print('{:>4d} {}'.format(i, layer), end='\n') # flush=True
[docs] def load(self, cfg_filename, weights=None):
'''
Load network model from config file in INI fmt
Parameters
----------
cfg_filename : str
Filename or path of the neural network configuration file in INI format
weights : str (default=None)
Filename of the weights
Returns
-------
self
'''
model = net_config(cfg_filename)
self.batch = model.get('net0', 'batch', 1)
self.w = model.get('net0', 'width', 416)
self.h = model.get('net0', 'height', 416)
self.c = model.get('net0', 'channels', 3)
# TODO: add other network parameters
input_shape = (self.batch, self.w, self.h, self.c)
self._net = [ Input_layer(input_shape=input_shape) ]
print('layer filters size input output')
for i, layer in enumerate(model):
layer_t = re.split(r'\d+', layer)[0]
params = model.get_params(layer)
if layer_t == 'shortcut':
_from = model.get(layer, 'from', 0)
self._net.append( self.LAYERS[layer_t](input_shape=input_shape, **params)([self._net[-1], self._net[_from]]) )
elif layer_t == 'route':
_layers = model.get(layer, 'layers', [])
self._net.append( self.LAYERS[layer_t](input_shape=input_shape, **params)(self._net[_layers]) )
else:
self._net.append( self.LAYERS[layer_t](input_shape=input_shape, **params)(self._net[-1]) )
input_shape = self._net[-1].out_shape
print('{:>4d} {}'.format(i, self._net[-1]), end='\n') # flush=True
sys.stdout.flush() # compatibility with pythonn 2.7
# if model.get(layer, 'batch_normalize', 0): # wrong because it add a new layer and so the shortcut is broken
# self._net.append( BatchNorm_layer()(self._net[-1]) )
# print('{:>4d} {}'.format(i, self._net[-1]), flush=True, end='\n')
if weights is not None:
self.load_weights(weights)
return self
[docs] def load_weights(self, weights_filename):
'''
Load weight from filename in binary fmt
Parameters
----------
weights_filename : str
Filename of the input weights file
Returns
-------
self
Notes
-----
.. note::
The weights are read and set to each layer which has
the load_weights member function.
'''
with open(weights_filename, 'rb') as fp:
major, minor, revision = np.fromfile(fp, dtype=np.int, count=3)
full_weights = np.fromfile(fp, dtype=np.float, count=-1)
pos = 0
for layer in self:
if hasattr(layer, 'load_weights'):
pos = layer.load_weights(full_weights, pos)
self._fitted = True
return self
[docs] def save_weights(self, filename):
'''
Dump current network weights
Parameters
----------
filename : str
Filename of the output weights file
Returns
-------
self
Notes
-----
.. note::
The weights are extracted from each layer which has
the save_weights member function.
'''
full_weights = []
for layer in self:
if hasattr(layer, 'save_weights'):
full_weights += layer.save_weights()
full_weights = np.asarray(full_weights, dtype=np.float)
version = np.array([1, 0, 0], dtype=np.int)
with open(filename, 'wb') as fp:
version.tofile(fp, sep='')
full_weights.tofile(fp, sep='') # for binary format
return self
[docs] def load_model(self, model_filename):
'''
Load network model object as pickle
Parameters
----------
model_filename : str
Filename or path of the model (binary) file
Returns
-------
self
Notes
-----
.. note::
The model loading is performed using pickle.
If the model was previously dumped with the save_model
function everything should be ok.
'''
with open(model_filename, 'rb') as fp:
tmp_dict = pickle.load(fp)
self.__dict__.clear()
self.__dict__.update(tmp_dict)
self._fitted = True
return self
[docs] def save_model(self, model_filename):
'''
Dump the current network model as pickle
Parameters
----------
model_filename : str
Filename or path for the model dumping
Returns
-------
self
Notes
-----
.. note::
The model is dumped using pickle binary format.
'''
with open(model_filename, 'wb') as fp:
pickle.dump(self.__dict__, fp, 2)
return self
[docs] def compile(self, optimizer=Optimizer, metrics=None):
'''
Compile the neural network model setting the optimizer
to each layer and the evaluation metrics
Parameters
----------
optimizer : Optimizer
Optimizer object to use during the training
metrics : list (default=None)
List of metrics functions to use for the model evaluation.
Notes
-----
.. note::
The optimizer is copied into each layer object
which requires a parameters optimization.
'''
for layer in self:
if hasattr(layer, 'optimizer'):
layer.optimizer = copy(optimizer)
if isinstance(layer, RNN_layer):
layer.input_layer.optimizer = copy(optimizer)
layer.self_layer.optimizer = copy(optimizer)
layer.output_layer.optimizer = copy(optimizer)
if isinstance(layer, LSTM_layer):
layer.uf.optimizer = copy(optimizer)
layer.ui.optimizer = copy(optimizer)
layer.ug.optimizer = copy(optimizer)
layer.uo.optimizer = copy(optimizer)
layer.wf.optimizer = copy(optimizer)
layer.wi.optimizer = copy(optimizer)
layer.wg.optimizer = copy(optimizer)
layer.wo.optimizer = copy(optimizer)
if metrics is not None:
self._check_metrics(metrics)
def _check_metrics(self, metrics):
'''
Check the signature of the given metric functions.
The right signature must have only two required arguments (y_true, y_pred)
plus other possible arguments with default values.
The checked function are added to the list of network metric functions.
Parameters
----------
metrics : list
List of metrics as functions
Returns
-------
check : bool
True if everything is ok
'''
# getfullargspec works only in python3.*
argspec = inspect.getfullargspec if int(sys.version[0]) >= 3 else inspect.getargspec
for func in metrics:
if not callable(func):
raise MetricsError('Metrics {} is not a callable object'.format(func.__name__))
infos = argspec(func)
num_defaults = len(infos.defaults) if infos.defaults else 0
if len(infos.args) - num_defaults != 2:
raise MetricsError('Metrics {0} is not a valid metrics function. '
'The required signature is only func (y_true, y_pred, **kwargs). '
'Try to use a partial to overcome this kind of issue.')
self.metrics = metrics
return True
def _evaluate_metrics(self, y_true, y_pred):
'''
Evaluate the training metrics
Parameters
----------
y_true : 2d array-like
Ground truth (correct) labels expressed as image.
y_pred : 2d array-like
Predicted labels, as returned by the NN
Returns
-------
None
Notes
-----
.. note::
The resulting metrics are just printed in stdout.
'''
results = {func.__name__ : func(y_true, y_pred) for func in self.metrics}
print(' '.join(' {}: {:1.3f}'.format(k, v) for k, v in results.items()))
[docs] def fit(self, X, y, max_iter=100, shuffle=True, verbose=True):
'''
Fit/training function
Parameters
----------
X : array-like
Input data
y : array-like
Ground truth or labels
max_iter : int (default=100)
Maximum number of iterations/epochs to perform
shuffle : bool (default=True)
Turn on/off the random shuffling of the data
verbose : bool (default=True)
Turn on/off the verbosity given by the training progress bar
Returns
-------
self
'''
num_data = X.shape[0]
begin = now()
self._fitted = True
indices = np.arange(0, num_data).astype('int64')
num_batches = num_data // self.batch
with _redirect_stdout(verbose):
for _ in range(max_iter):
start = now()
print('Epoch {:d}/{:d}'.format(_ + 1, max_iter)) # flush=True)
sys.stdout.flush() # compatibility with python 2.7
loss = 0.
seen = 0
if shuffle:
np.random.shuffle(indices)
batches = np.lib.stride_tricks.as_strided(indices, shape=(num_batches, self.batch), strides=(self.batch * 8, 8))
for i, idx in enumerate(batches):
_input = X[idx, ...]
_truth = y[idx, ...]
_ = self._forward(X=_input, truth=_truth, trainable=True)
self._backward(X=_input, trainable=True)
loss += self._get_loss()
seen += len(idx)
done = int(50 * (i + 1) / len(batches))
print('{}{:>3d}/{:<3d} |{}{}| ({:1.1f} sec/iter) loss: {:3.3f}'.format( CRLF,
seen,
num_data,
r'█' * done,
'-' * (50 - done),
now() - start,
loss / seen
), end='') # flush=True
sys.stdout.flush() # compatibility with pythonn 2.7
start = now()
if self.metrics is not None:
y_pred = self.predict(X, truth=None, verbose=False)
self._evaluate_metrics(y, y_pred)
print('\n', end='') # flush=True)
sys.stdout.flush() # compatibility with pythonn 2.7
end = now()
print('Training on {:d} epochs took {:1.1f} sec'.format(max_iter, end - begin))
return self
[docs] def fit_generator(self, Xy_generator, max_iter=100):
'''
Fit function using a train generator
Parameters
----------
Xy_generator : DataGenerator
Data generator object
max_iter : int (default=100)
Maximum number of iterations/epochs to perform
Returns
-------
self
References
----------
DataGenerator object in data.py
'''
Xy_generator.start()
for _ in range(max_iter):
grabbed = False
while not grabbed:
data, label, grabbed = Xy_generator.load_data()
self.fit(data, label, max_iter=1, shuffle=False) # data already shuffled
Xy_generator.stop()
self._fitted = True
return self
[docs] def predict(self, X, truth=None, verbose=True):
'''
Predict the given input
Parameters
----------
X : array-like
Input data
truth : array-like (default=None)
Ground truth or labels
verbose : bool (default=True)
Turn on/off the verbosity given by the training progress bar
Returns
-------
output : array-like
Output of the model as numpy array
'''
if not self._fitted:
raise NetworkError('This Network model instance is not fitted yet. Please use the "fit" function before the predict')
num_data = len(X)
_truth = None
batches = np.array_split(range(num_data), indices_or_sections=num_data // self.batch)
begin = now()
start = begin
loss = 0.
seen = 0
output = []
with _redirect_stdout(verbose):
for i, idx in enumerate(batches):
_input = X[idx, ...]
if truth is not None:
_truth = truth[idx, ...]
predict = self._forward(X=_input, truth=_truth, trainable=False)
output.append(predict)
loss += self._get_loss()
seen += len(idx)
done = int(50 * (i + 1) / len(batches))
print('{}{:>3d}/{:<3d} |{}{}| ({:1.1f} sec/iter) loss: {:3.3f}'.format( CRLF,
seen,
num_data,
r'█' * done,
'-' * (50 - done),
now() - start,
loss / seen
), end='') # flush=True,
sys.stdout.flush() # compatibility with pythonn 2.7
start = now()
print('\n', end='') # flush=True)
sys.stdout.flush() # compatibility with pythonn 2.7
end = now()
print('Prediction on {:d} samples took {:1.1f} sec'.format(num_data, end - begin))
return np.concatenate(output)
[docs] def evaluate(self, X, truth, verbose=False):
'''
Return output and loss of the model
Parameters
----------
X : array-like
Input data
truth : array-like
Ground truth or labels
verbose : bool (default=False)
Turn on/off the verbosity given by the training progress bar
Returns
-------
loss : float
The current loss of the model
output : array-like
Output of the model as numpy array
'''
output = self.predict(X, truth=truth, verbose=verbose)
loss = self._get_loss() / len(X)
return (loss, output)
def _forward(self, X, truth=None, trainable=True):
'''
Forward function.
Apply the forward method on all layers
Parameters
----------
X : array-like
Input data
truth : array-like (default=None)
Ground truth or labels
trainable : bool (default=True)
Switch if train or not the model
Returns
-------
y : array-like
Output of the model
Notes
-----
.. warning::
Up to now the trainable variable is useless since the layer
doesn't take it into account!
'''
# TODO: add trainable to forward and backward of each layer signature
y = X[:]
for layer in self:
forward_args = layer.forward.__code__.co_varnames
if 'truth' in forward_args and truth is not None:
layer.forward(inpt=y[:], truth=truth)
elif 'network' in forward_args:
layer.forward(network=self)
else:
layer.forward(inpt=y[:])
y = layer.output[:]
return y
def _backward(self, X, trainable=True):
'''
BackPropagate the error
Parameters
----------
X : array-like
Input data
trainable : bool (default=True)
Switch if train or not the model
Returns
-------
self
Notes
-----
.. warning::
Up to now the trainable variable is useless since the layer
doesn't take it into account!
'''
for i in reversed(range(1, self.num_layers)):
input = self._net[i - 1].output[:]
delta = self._net[i - 1].delta[:]
backward_args = self._net[i].backward.__code__.co_varnames
if 'inpt' in backward_args:
self._net[i].backward(inpt=input[:], delta=delta[:])
elif 'network' in backward_args:
self._net[i].backward(delta=delta[:], network=self)
else:
self._net[i].backward(delta=delta[:])
if hasattr(self._net[i], 'update'):
self._net[i].update()
self._net[0].backward(delta=delta[:])
return self
def _get_loss(self):
'''
Extract the loss value as the last cost in the network model
Returns
-------
loss : float
The value of the loss/cost stored in the latest layer
of the neural network ables to compute a cost function.
Notes
-----
.. note::
If no layers with the cost evaluation capability is found
the return value is None.
'''
for i in reversed(range(1, self.num_layers)):
if hasattr(self._net[i], 'cost'):
return self._net[i].cost
return None
@property
def out_shape(self):
'''
Get the output shape
'''
return self._net[0].out_shape[1:]
@property
def input_shape(self):
'''
Get the input shape
'''
return (self.w, self.h, self.c)
@property
def num_layers(self):
'''
Get the number of layers in the model
'''
return len(self._net)
def __getitem__(self, pos):
'''
Get the layer element
Parameters
----------
pos : int
Index of the layer in the neural network structure
'''
if pos < 0 or pos >= self.num_layers:
raise ValueError('Network model : layer out of range. The model has {:d} layers'.format(self.num_layers))
return self._net[pos]
if __name__ == '__main__':
batch = 32
w, h, c = (512, 512, 3)
config_filename = os.path.join(os.path.dirname(__file__), '..', 'cfg', 'yolov3.cfg')
net = Network(batch=batch)
net.load(config_filename)
print(net.input_shape)
#net.add(Input_layer(input_shape=(batch, w, h, c)))
#net.add(Convolutional_layer(input_shape=(batch, w, h, c), filters=64, size=3, stride=1))
#net.add(Convolutional_layer(input_shape=(batch, w, h, c), filters=16, size=3, stride=1))
net.summary()