simplenn/simplenn.py

311 lines
14 KiB
Python

from copy import deepcopy
from typing import Iterable, List
from abc import abstractmethod, ABC
import numpy as np
from numpy.random import rand
import progressbar
def round_u(x: float) -> int:
return int(x + 0.5)
def reversed_enumerate(x: Iterable):
x = list(x)
i = len(x)
while i > 0:
i -= 1
yield i, x[i]
class ActivationFuncGenerator(ABC):
@abstractmethod
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
@abstractmethod
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
class LossFuncGenerator(ABC):
@abstractmethod
def call(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
pass
@abstractmethod
def dl_dp(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
"""
:param p: predicted value
:param y: target value
:return: a positive number
"""
pass
class Sigmoid(ActivationFuncGenerator):
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
return 1 / (1 + np.exp(-z))
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
if isinstance(z, np.ndarray):
return np.array([np.exp(-z[i]) / np.power((1 + np.exp(-z[i])), 2) for i in range(z.shape[0])])
return np.exp(-z) / np.power((1 + np.exp(-z)), 2)
class CrossEntropy(LossFuncGenerator):
def call(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
return - y * np.log(p) - (1 - y) * np.log(1 - p)
def dl_dp(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
if isinstance(p, np.ndarray) and isinstance(y, np.ndarray):
return np.array([np.float64(- y[i] / p[i] + (1 - y[i]) / (1 - p[i])) for i in range(p.shape[0])])
else:
return np.float64(- y / p + (1 - y) / (1 - p))
def __init__(self):
pass
class Dense:
def __init__(self, units: int, activation_func: ActivationFuncGenerator, params: int):
self.units, self.activation_func = units, activation_func
self.params = params
self.w = rand(units, params) / 10.0
self.b = rand(units) / 10.0
def predict(self, x: np.ndarray, no_activation: bool = False) -> np.ndarray:
assert x.shape == (self.params,)
return (np.matmul(self.w, x) + self.b) if no_activation \
else self.activation_func.call(np.matmul(self.w, x) + self.b)
class Model:
def __init__(self, units: np.ndarray, input_params: int, activation_func: ActivationFuncGenerator,
loss_func: LossFuncGenerator):
self.layers: list[Dense] = []
self.input_params = input_params
self.activation_func = activation_func
self.loss_func = loss_func
f = input_params
for u in units:
self.layers.append(Dense(u, activation_func, f))
f = u
self.total_params = sum([layer.w.size for layer in self.layers])
self.total_units = sum([layer.w.shape[0] for layer in self.layers])
if units[-1] != 1:
print('WARNING: Not a predicting model')
def get_w_tensor(self):
return np.array([d.w for d in self.layers])
def get_w_sum(self):
return sum([np.sum(layer.w) for layer in self.layers])
def new_cache(self):
r = []
for layer in self.layers:
n = np.zeros(layer.w.shape)
r.append(n)
return np.array(r, dtype=object)
def predict(self, feature: np.ndarray) -> np.ndarray:
assert feature.shape == (self.input_params,)
o = feature
for layer in self.layers:
o = layer.predict(o)
return o
def predict_verbose(self, feature: np.ndarray) -> tuple[list[np.ndarray], list[np.ndarray]]:
"""
get every `z` and `a` in the predicting process.
:param feature: feature
:return: a tuple comprising two lists `z` and `a` of size [layers, units]
"""
assert feature.shape == (self.input_params,)
z = []
a = []
o = feature
for layer in self.layers:
z.append(layer.predict(o, no_activation=True))
a.append(layer.predict(o))
o = layer.predict(o)
return z, a
def get_loss(self, x: np.ndarray, y: np.ndarray) -> np.float64:
l = np.float64(0)
for i in range(x.shape[0]):
l += self.loss_func.call(self.predict(x[i])[0], y[i])
l /= x.shape[0]
return l
def evaluate(self, x: np.ndarray, y: np.ndarray) -> np.float64:
pbar = progressbar.ProgressBar(
widgets=['Verifying ', progressbar.Percentage('%(percentage)3d%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
correct, incorrect = 0, 0
for i in range(y.size):
pbar.update(min((i + 1) / y.size * 100.0, 100))
if round_u(float(self.predict(x[i]))) == y[i]:
correct += 1
else:
incorrect += 1
pbar.finish()
print(f'\nTotal: {y.size}, Correct: {correct}, Incorrect: {incorrect}')
print(f'Rate: {correct / y.size * 100.0:.4f}%, Loss: {self.get_loss(x, y)}')
return self.get_loss(x, y)
def train(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
"""
Train the model using given data set.
:param x: array of size (m, n)
:param y: array of size (n, )
:param epoch_count: epoch count
:param alpha: learning rate
:return: the model itself
"""
assert x.shape[1] == self.input_params
print('WARNING: Start training ...')
new_layers = deepcopy(self.layers)
# don't repeatedly calculate prediction, `z` and `a` in each epoch
for e in range(epoch_count):
print(f'Epoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = self.total_params * len(x)
for i in range(len(x)):
feature, target = x[i], y[i]
f = len(self.layers) - 1
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
factor = alpha * self.loss_func.dl_dp(prediction, target)
cache = self.new_cache()
for l, layer in reversed_enumerate(self.layers):
for j in range(layer.units):
pbar.update(c / cc * 100.0)
for k in range(layer.params):
c += 1
new_layers[l].w[j, k] -= factor * self.pd(z, a, j, k, l, 0, f, feature, cache)
self.layers = new_layers
pbar.finish()
return self
def pd(self, z: List[np.ndarray], a: List[np.ndarray], t_unit: int, t_param: int, t_layer: int,
c_unit: int, c_layer: int, features: np.ndarray, cache: np.ndarray) -> int:
result = 0
for i in range(cache[t_layer + 1].shape[0]):
result += cache[t_layer + 1]
result = self.activation_func.df_dz(z[c_layer][c_unit])
if c_layer == t_layer and t_unit == c_unit:
if c_layer == 0:
result *= features[t_param]
else:
result *= a[c_layer - 1][t_param]
elif c_layer == t_layer and t_unit != c_unit:
result = 0
else:
total_params = self.layers[c_layer - 1].units
r = 0
for i in range(total_params):
r += self.layers[c_layer].w[c_unit, i] \
* self.pd(z, a, t_unit, t_param, t_layer, i, c_layer - 1, features)
result *= r
cache[t_layer][t_unit, t_param] = result
return result
def train_ng(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
assert x.shape[1] == self.input_params
for e in range(epoch_count):
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = self.total_units * len(x)
c += 1
pbar.update(min(c / cc * 100.0, 100))
for i in range(len(x)):
dj_da = [[0] * layer.units for layer in self.layers]
dj_dw = self.new_cache()
dj_db = [[0] * layer.units for layer in self.layers]
layers = deepcopy(self.layers)
feature, target = x[i], y[i]
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
dj_da[-1][0] = self.loss_func.dl_dp(prediction, target)
tp = self.loss_func.dl_dp(prediction, target)
for m in range(layers[-1].params):
dj_dw[-1][0, m] = tp * self.activation_func.df_dz(z[-1][0]) * a[-2][m]
layers[-1].w[0, m] -= alpha * dj_dw[-1][0, m]
dj_db[-1][0] = tp * self.activation_func.df_dz(z[-1][0])
layers[-1].b[0] -= alpha * dj_db[-1][0]
j = len(layers) - 2
for _, layer in reversed_enumerate(layers[1:-1]):
for k in range(layers[j].units):
c += 1
pbar.update(min(c / cc * 100.0, 100))
for l in range(layers[j + 1].units):
tp = dj_da[j + 1][l] * self.activation_func.df_dz(z[j + 1][l]) * self.layers[j + 1].w[l, k]
dj_da[j][k] += tp
for m in range(layers[j].params):
dj_dw[j][k, m] += tp * self.activation_func.df_dz(z[j][k]) * a[j - 1][m]
dj_db[j][k] += tp * self.activation_func.df_dz(z[j][k])
for m in range(layers[j].params):
layers[j].w[k, m] -= alpha * dj_dw[j][k, m]
layers[j].b[k] -= alpha * dj_db[j][k]
j -= 1
for k in range(layers[0].units):
c += 1
# pbar.update(min(c / cc * 100.0, 100))
for l in range(layers[1].units):
tp = dj_da[1][l] * self.activation_func.df_dz(z[1][l]) * self.layers[1].w[l, k]
dj_da[0][k] += tp
for m in range(layers[0].params):
dj_dw[0][k, m] += tp * self.activation_func.df_dz(z[0][k]) * feature[m]
dj_db[0][k] = tp * self.activation_func.df_dz(z[0][k])
for m in range(layers[0].params):
dj_dw[0][k, m] -= alpha * dj_dw[0][k, m]
self.layers = deepcopy(layers)
pbar.finish()
return 0
def train_2(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float, lambda_: float):
print('WARNING: Start training ...')
try:
for e in range(epoch_count):
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = x.shape[0] * len(self.layers)
for i in range(x.shape[0]):
layers = deepcopy(self.layers)
feature, target = x[i], y[i]
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
dj_dz = [np.zeros((layer.units,)) for layer in self.layers]
dj_db = [np.zeros((layer.units,)) for layer in self.layers]
dj_dw = [np.zeros((layer.units, layer.params)) for layer in self.layers]
for l in range(len(self.layers) - 1, -1, -1):
c += 1
pbar.update(c / cc * 100.0)
if l == len(self.layers) - 1:
dj_dz[l] = self.loss_func.dl_dp(prediction, target) \
* self.activation_func.df_dz(z[-1])
else:
dj_dz[l] = np.dot(dj_dz[l + 1], self.layers[l + 1].w) * self.activation_func.df_dz(z[l])
dj_db[l] = dj_dz[l]
layers[l].b -= alpha * dj_db[l]
if l == 0:
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
feature.reshape(1, feature.shape[0]))
else:
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
a[l - 1].reshape(1, a[l-1].shape[0]))
layers[l].w -= alpha * (dj_dw[l] + lambda_ * self.layers[l].w / x.shape[0]) # L2 regularization
self.layers = layers
pbar.finish()
except KeyboardInterrupt:
print('\nTraining process interrupted. Data since last the last epoch will be lost.\n')