311 lines
14 KiB
Python
311 lines
14 KiB
Python
|
from copy import deepcopy
|
||
|
from typing import Iterable, List
|
||
|
from abc import abstractmethod, ABC
|
||
|
import numpy as np
|
||
|
from numpy.random import rand
|
||
|
import progressbar
|
||
|
|
||
|
|
||
|
def round_u(x: float) -> int:
|
||
|
return int(x + 0.5)
|
||
|
|
||
|
|
||
|
def reversed_enumerate(x: Iterable):
|
||
|
x = list(x)
|
||
|
i = len(x)
|
||
|
while i > 0:
|
||
|
i -= 1
|
||
|
yield i, x[i]
|
||
|
|
||
|
|
||
|
class ActivationFuncGenerator(ABC):
|
||
|
@abstractmethod
|
||
|
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
|
||
|
|
||
|
@abstractmethod
|
||
|
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
|
||
|
|
||
|
|
||
|
class LossFuncGenerator(ABC):
|
||
|
@abstractmethod
|
||
|
def call(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
pass
|
||
|
|
||
|
@abstractmethod
|
||
|
def dl_dp(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
"""
|
||
|
:param p: predicted value
|
||
|
:param y: target value
|
||
|
:return: a positive number
|
||
|
"""
|
||
|
pass
|
||
|
|
||
|
|
||
|
class Sigmoid(ActivationFuncGenerator):
|
||
|
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
return 1 / (1 + np.exp(-z))
|
||
|
|
||
|
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
if isinstance(z, np.ndarray):
|
||
|
return np.array([np.exp(-z[i]) / np.power((1 + np.exp(-z[i])), 2) for i in range(z.shape[0])])
|
||
|
return np.exp(-z) / np.power((1 + np.exp(-z)), 2)
|
||
|
|
||
|
|
||
|
class CrossEntropy(LossFuncGenerator):
|
||
|
def call(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
return - y * np.log(p) - (1 - y) * np.log(1 - p)
|
||
|
|
||
|
def dl_dp(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
|
||
|
if isinstance(p, np.ndarray) and isinstance(y, np.ndarray):
|
||
|
return np.array([np.float64(- y[i] / p[i] + (1 - y[i]) / (1 - p[i])) for i in range(p.shape[0])])
|
||
|
else:
|
||
|
return np.float64(- y / p + (1 - y) / (1 - p))
|
||
|
|
||
|
def __init__(self):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class Dense:
|
||
|
def __init__(self, units: int, activation_func: ActivationFuncGenerator, params: int):
|
||
|
self.units, self.activation_func = units, activation_func
|
||
|
self.params = params
|
||
|
self.w = rand(units, params) / 10.0
|
||
|
self.b = rand(units) / 10.0
|
||
|
|
||
|
def predict(self, x: np.ndarray, no_activation: bool = False) -> np.ndarray:
|
||
|
assert x.shape == (self.params,)
|
||
|
return (np.matmul(self.w, x) + self.b) if no_activation \
|
||
|
else self.activation_func.call(np.matmul(self.w, x) + self.b)
|
||
|
|
||
|
|
||
|
class Model:
|
||
|
def __init__(self, units: np.ndarray, input_params: int, activation_func: ActivationFuncGenerator,
|
||
|
loss_func: LossFuncGenerator):
|
||
|
self.layers: list[Dense] = []
|
||
|
self.input_params = input_params
|
||
|
self.activation_func = activation_func
|
||
|
self.loss_func = loss_func
|
||
|
f = input_params
|
||
|
for u in units:
|
||
|
self.layers.append(Dense(u, activation_func, f))
|
||
|
f = u
|
||
|
self.total_params = sum([layer.w.size for layer in self.layers])
|
||
|
self.total_units = sum([layer.w.shape[0] for layer in self.layers])
|
||
|
if units[-1] != 1:
|
||
|
print('WARNING: Not a predicting model')
|
||
|
|
||
|
def get_w_tensor(self):
|
||
|
return np.array([d.w for d in self.layers])
|
||
|
|
||
|
def get_w_sum(self):
|
||
|
return sum([np.sum(layer.w) for layer in self.layers])
|
||
|
|
||
|
def new_cache(self):
|
||
|
r = []
|
||
|
for layer in self.layers:
|
||
|
n = np.zeros(layer.w.shape)
|
||
|
r.append(n)
|
||
|
return np.array(r, dtype=object)
|
||
|
|
||
|
def predict(self, feature: np.ndarray) -> np.ndarray:
|
||
|
assert feature.shape == (self.input_params,)
|
||
|
o = feature
|
||
|
for layer in self.layers:
|
||
|
o = layer.predict(o)
|
||
|
return o
|
||
|
|
||
|
def predict_verbose(self, feature: np.ndarray) -> tuple[list[np.ndarray], list[np.ndarray]]:
|
||
|
"""
|
||
|
get every `z` and `a` in the predicting process.
|
||
|
:param feature: feature
|
||
|
:return: a tuple comprising two lists `z` and `a` of size [layers, units]
|
||
|
"""
|
||
|
assert feature.shape == (self.input_params,)
|
||
|
z = []
|
||
|
a = []
|
||
|
o = feature
|
||
|
for layer in self.layers:
|
||
|
z.append(layer.predict(o, no_activation=True))
|
||
|
a.append(layer.predict(o))
|
||
|
o = layer.predict(o)
|
||
|
return z, a
|
||
|
|
||
|
def get_loss(self, x: np.ndarray, y: np.ndarray) -> np.float64:
|
||
|
l = np.float64(0)
|
||
|
for i in range(x.shape[0]):
|
||
|
l += self.loss_func.call(self.predict(x[i])[0], y[i])
|
||
|
l /= x.shape[0]
|
||
|
return l
|
||
|
|
||
|
def evaluate(self, x: np.ndarray, y: np.ndarray) -> np.float64:
|
||
|
pbar = progressbar.ProgressBar(
|
||
|
widgets=['Verifying ', progressbar.Percentage('%(percentage)3d%%'), ' ', progressbar.Bar('#'), ' ',
|
||
|
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
|
||
|
correct, incorrect = 0, 0
|
||
|
for i in range(y.size):
|
||
|
pbar.update(min((i + 1) / y.size * 100.0, 100))
|
||
|
if round_u(float(self.predict(x[i]))) == y[i]:
|
||
|
correct += 1
|
||
|
else:
|
||
|
incorrect += 1
|
||
|
pbar.finish()
|
||
|
print(f'\nTotal: {y.size}, Correct: {correct}, Incorrect: {incorrect}')
|
||
|
print(f'Rate: {correct / y.size * 100.0:.4f}%, Loss: {self.get_loss(x, y)}')
|
||
|
return self.get_loss(x, y)
|
||
|
|
||
|
def train(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
|
||
|
"""
|
||
|
Train the model using given data set.
|
||
|
:param x: array of size (m, n)
|
||
|
:param y: array of size (n, )
|
||
|
:param epoch_count: epoch count
|
||
|
:param alpha: learning rate
|
||
|
:return: the model itself
|
||
|
"""
|
||
|
assert x.shape[1] == self.input_params
|
||
|
print('WARNING: Start training ...')
|
||
|
new_layers = deepcopy(self.layers)
|
||
|
# don't repeatedly calculate prediction, `z` and `a` in each epoch
|
||
|
for e in range(epoch_count):
|
||
|
print(f'Epoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
|
||
|
pbar = progressbar.ProgressBar(
|
||
|
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
|
||
|
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
|
||
|
c = 0
|
||
|
cc = self.total_params * len(x)
|
||
|
for i in range(len(x)):
|
||
|
feature, target = x[i], y[i]
|
||
|
f = len(self.layers) - 1
|
||
|
z, a = self.predict_verbose(feature)
|
||
|
prediction = float(self.predict(feature))
|
||
|
factor = alpha * self.loss_func.dl_dp(prediction, target)
|
||
|
cache = self.new_cache()
|
||
|
for l, layer in reversed_enumerate(self.layers):
|
||
|
for j in range(layer.units):
|
||
|
pbar.update(c / cc * 100.0)
|
||
|
for k in range(layer.params):
|
||
|
c += 1
|
||
|
new_layers[l].w[j, k] -= factor * self.pd(z, a, j, k, l, 0, f, feature, cache)
|
||
|
self.layers = new_layers
|
||
|
pbar.finish()
|
||
|
return self
|
||
|
|
||
|
def pd(self, z: List[np.ndarray], a: List[np.ndarray], t_unit: int, t_param: int, t_layer: int,
|
||
|
c_unit: int, c_layer: int, features: np.ndarray, cache: np.ndarray) -> int:
|
||
|
result = 0
|
||
|
for i in range(cache[t_layer + 1].shape[0]):
|
||
|
result += cache[t_layer + 1]
|
||
|
result = self.activation_func.df_dz(z[c_layer][c_unit])
|
||
|
if c_layer == t_layer and t_unit == c_unit:
|
||
|
if c_layer == 0:
|
||
|
result *= features[t_param]
|
||
|
else:
|
||
|
result *= a[c_layer - 1][t_param]
|
||
|
elif c_layer == t_layer and t_unit != c_unit:
|
||
|
result = 0
|
||
|
else:
|
||
|
total_params = self.layers[c_layer - 1].units
|
||
|
r = 0
|
||
|
for i in range(total_params):
|
||
|
r += self.layers[c_layer].w[c_unit, i] \
|
||
|
* self.pd(z, a, t_unit, t_param, t_layer, i, c_layer - 1, features)
|
||
|
result *= r
|
||
|
cache[t_layer][t_unit, t_param] = result
|
||
|
return result
|
||
|
|
||
|
def train_ng(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
|
||
|
assert x.shape[1] == self.input_params
|
||
|
for e in range(epoch_count):
|
||
|
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
|
||
|
pbar = progressbar.ProgressBar(
|
||
|
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
|
||
|
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
|
||
|
c = 0
|
||
|
cc = self.total_units * len(x)
|
||
|
c += 1
|
||
|
pbar.update(min(c / cc * 100.0, 100))
|
||
|
for i in range(len(x)):
|
||
|
dj_da = [[0] * layer.units for layer in self.layers]
|
||
|
dj_dw = self.new_cache()
|
||
|
dj_db = [[0] * layer.units for layer in self.layers]
|
||
|
layers = deepcopy(self.layers)
|
||
|
feature, target = x[i], y[i]
|
||
|
z, a = self.predict_verbose(feature)
|
||
|
prediction = float(self.predict(feature))
|
||
|
dj_da[-1][0] = self.loss_func.dl_dp(prediction, target)
|
||
|
tp = self.loss_func.dl_dp(prediction, target)
|
||
|
for m in range(layers[-1].params):
|
||
|
dj_dw[-1][0, m] = tp * self.activation_func.df_dz(z[-1][0]) * a[-2][m]
|
||
|
layers[-1].w[0, m] -= alpha * dj_dw[-1][0, m]
|
||
|
dj_db[-1][0] = tp * self.activation_func.df_dz(z[-1][0])
|
||
|
layers[-1].b[0] -= alpha * dj_db[-1][0]
|
||
|
j = len(layers) - 2
|
||
|
for _, layer in reversed_enumerate(layers[1:-1]):
|
||
|
for k in range(layers[j].units):
|
||
|
c += 1
|
||
|
pbar.update(min(c / cc * 100.0, 100))
|
||
|
for l in range(layers[j + 1].units):
|
||
|
tp = dj_da[j + 1][l] * self.activation_func.df_dz(z[j + 1][l]) * self.layers[j + 1].w[l, k]
|
||
|
dj_da[j][k] += tp
|
||
|
for m in range(layers[j].params):
|
||
|
dj_dw[j][k, m] += tp * self.activation_func.df_dz(z[j][k]) * a[j - 1][m]
|
||
|
dj_db[j][k] += tp * self.activation_func.df_dz(z[j][k])
|
||
|
for m in range(layers[j].params):
|
||
|
layers[j].w[k, m] -= alpha * dj_dw[j][k, m]
|
||
|
layers[j].b[k] -= alpha * dj_db[j][k]
|
||
|
j -= 1
|
||
|
for k in range(layers[0].units):
|
||
|
c += 1
|
||
|
# pbar.update(min(c / cc * 100.0, 100))
|
||
|
for l in range(layers[1].units):
|
||
|
tp = dj_da[1][l] * self.activation_func.df_dz(z[1][l]) * self.layers[1].w[l, k]
|
||
|
dj_da[0][k] += tp
|
||
|
for m in range(layers[0].params):
|
||
|
dj_dw[0][k, m] += tp * self.activation_func.df_dz(z[0][k]) * feature[m]
|
||
|
dj_db[0][k] = tp * self.activation_func.df_dz(z[0][k])
|
||
|
for m in range(layers[0].params):
|
||
|
dj_dw[0][k, m] -= alpha * dj_dw[0][k, m]
|
||
|
self.layers = deepcopy(layers)
|
||
|
pbar.finish()
|
||
|
return 0
|
||
|
|
||
|
def train_2(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float, lambda_: float):
|
||
|
print('WARNING: Start training ...')
|
||
|
try:
|
||
|
for e in range(epoch_count):
|
||
|
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
|
||
|
pbar = progressbar.ProgressBar(
|
||
|
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
|
||
|
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
|
||
|
c = 0
|
||
|
cc = x.shape[0] * len(self.layers)
|
||
|
for i in range(x.shape[0]):
|
||
|
layers = deepcopy(self.layers)
|
||
|
feature, target = x[i], y[i]
|
||
|
z, a = self.predict_verbose(feature)
|
||
|
prediction = float(self.predict(feature))
|
||
|
dj_dz = [np.zeros((layer.units,)) for layer in self.layers]
|
||
|
dj_db = [np.zeros((layer.units,)) for layer in self.layers]
|
||
|
dj_dw = [np.zeros((layer.units, layer.params)) for layer in self.layers]
|
||
|
for l in range(len(self.layers) - 1, -1, -1):
|
||
|
c += 1
|
||
|
pbar.update(c / cc * 100.0)
|
||
|
if l == len(self.layers) - 1:
|
||
|
dj_dz[l] = self.loss_func.dl_dp(prediction, target) \
|
||
|
* self.activation_func.df_dz(z[-1])
|
||
|
else:
|
||
|
dj_dz[l] = np.dot(dj_dz[l + 1], self.layers[l + 1].w) * self.activation_func.df_dz(z[l])
|
||
|
dj_db[l] = dj_dz[l]
|
||
|
layers[l].b -= alpha * dj_db[l]
|
||
|
if l == 0:
|
||
|
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
|
||
|
feature.reshape(1, feature.shape[0]))
|
||
|
else:
|
||
|
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
|
||
|
a[l - 1].reshape(1, a[l-1].shape[0]))
|
||
|
layers[l].w -= alpha * (dj_dw[l] + lambda_ * self.layers[l].w / x.shape[0]) # L2 regularization
|
||
|
self.layers = layers
|
||
|
pbar.finish()
|
||
|
except KeyboardInterrupt:
|
||
|
print('\nTraining process interrupted. Data since last the last epoch will be lost.\n')
|