Add files via upload

This commit is contained in:
Ariel 2023-07-06 01:09:35 +08:00 committed by GitHub
parent ada39ccf6b
commit 9421b1a229
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 399 additions and 0 deletions

47
MNIST.py Normal file
View File

@ -0,0 +1,47 @@
import numpy as np
import progressbar
def z_score_normalize(a: np.ndarray):
return (a - np.average(a)) / np.std(a)
def load_data(path: str, max_size: int = -1, rescale: bool = False) -> tuple[np.ndarray, np.ndarray]:
"""
Load MNIST data set (csv)
:param path: the path to the .csv file
:param max_size: the size of returned data
:return: an array of features and an array of labels
"""
with open(path) as f:
ls = f.readlines()[1:]
if ls[-1].strip() == '':
ls.pop()
n = len(ls[0].split(',')) - 1
m = len(ls)
count = 0
for i, l in enumerate(ls):
b, *a = [each.strip() for each in l.split(',')]
if b in ('0', '1'):
count += 1
t = min(count, max_size) if max_size >= 0 else count
x = np.zeros((t, n))
y = np.zeros((t,))
j = 0
print(f"Loading '{path}' ...")
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage(), ' ', progressbar.Bar('#'), ' ', progressbar.Timer(), ' ',
progressbar.ETA(), ' '], maxval=100)
for i, l in enumerate(ls):
b, *a = [each.strip() for each in l.split(',')]
if b in ('0', '1'):
if j == t:
break
else:
x[j] = np.array(a)
y[j] = b
j += 1
pbar.update(j * 100.0 / t)
pbar.finish()
print(f'Loaded data in size {t}.\n')
return (z_score_normalize(x) if rescale else x), y

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
progressbar2==4.2.0
numpy~=1.23.0
tensorflow==2.12.0

310
simplenn.py Normal file
View File

@ -0,0 +1,310 @@
from copy import deepcopy
from typing import Iterable, List
from abc import abstractmethod, ABC
import numpy as np
from numpy.random import rand
import progressbar
def round_u(x: float) -> int:
return int(x + 0.5)
def reversed_enumerate(x: Iterable):
x = list(x)
i = len(x)
while i > 0:
i -= 1
yield i, x[i]
class ActivationFuncGenerator(ABC):
@abstractmethod
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
@abstractmethod
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray: pass
class LossFuncGenerator(ABC):
@abstractmethod
def call(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
pass
@abstractmethod
def dl_dp(self, p: float | np.float64 | np.ndarray, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
"""
:param p: predicted value
:param y: target value
:return: a positive number
"""
pass
class Sigmoid(ActivationFuncGenerator):
def call(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
return 1 / (1 + np.exp(-z))
def df_dz(self, z: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
if isinstance(z, np.ndarray):
return np.array([np.exp(-z[i]) / np.power((1 + np.exp(-z[i])), 2) for i in range(z.shape[0])])
return np.exp(-z) / np.power((1 + np.exp(-z)), 2)
class CrossEntropy(LossFuncGenerator):
def call(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
return - y * np.log(p) - (1 - y) * np.log(1 - p)
def dl_dp(self, p: float | np.float64, y: float | np.float64 | np.ndarray) -> np.float64 | np.ndarray:
if isinstance(p, np.ndarray) and isinstance(y, np.ndarray):
return np.array([np.float64(- y[i] / p[i] + (1 - y[i]) / (1 - p[i])) for i in range(p.shape[0])])
else:
return np.float64(- y / p + (1 - y) / (1 - p))
def __init__(self):
pass
class Dense:
def __init__(self, units: int, activation_func: ActivationFuncGenerator, params: int):
self.units, self.activation_func = units, activation_func
self.params = params
self.w = rand(units, params) / 10.0
self.b = rand(units) / 10.0
def predict(self, x: np.ndarray, no_activation: bool = False) -> np.ndarray:
assert x.shape == (self.params,)
return (np.matmul(self.w, x) + self.b) if no_activation \
else self.activation_func.call(np.matmul(self.w, x) + self.b)
class Model:
def __init__(self, units: np.ndarray, input_params: int, activation_func: ActivationFuncGenerator,
loss_func: LossFuncGenerator):
self.layers: list[Dense] = []
self.input_params = input_params
self.activation_func = activation_func
self.loss_func = loss_func
f = input_params
for u in units:
self.layers.append(Dense(u, activation_func, f))
f = u
self.total_params = sum([layer.w.size for layer in self.layers])
self.total_units = sum([layer.w.shape[0] for layer in self.layers])
if units[-1] != 1:
print('WARNING: Not a predicting model')
def get_w_tensor(self):
return np.array([d.w for d in self.layers])
def get_w_sum(self):
return sum([np.sum(layer.w) for layer in self.layers])
def new_cache(self):
r = []
for layer in self.layers:
n = np.zeros(layer.w.shape)
r.append(n)
return np.array(r, dtype=object)
def predict(self, feature: np.ndarray) -> np.ndarray:
assert feature.shape == (self.input_params,)
o = feature
for layer in self.layers:
o = layer.predict(o)
return o
def predict_verbose(self, feature: np.ndarray) -> tuple[list[np.ndarray], list[np.ndarray]]:
"""
get every `z` and `a` in the predicting process.
:param feature: feature
:return: a tuple comprising two lists `z` and `a` of size [layers, units]
"""
assert feature.shape == (self.input_params,)
z = []
a = []
o = feature
for layer in self.layers:
z.append(layer.predict(o, no_activation=True))
a.append(layer.predict(o))
o = layer.predict(o)
return z, a
def get_loss(self, x: np.ndarray, y: np.ndarray) -> np.float64:
l = np.float64(0)
for i in range(x.shape[0]):
l += self.loss_func.call(self.predict(x[i])[0], y[i])
l /= x.shape[0]
return l
def evaluate(self, x: np.ndarray, y: np.ndarray) -> np.float64:
pbar = progressbar.ProgressBar(
widgets=['Verifying ', progressbar.Percentage('%(percentage)3d%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
correct, incorrect = 0, 0
for i in range(y.size):
pbar.update(min((i + 1) / y.size * 100.0, 100))
if round_u(float(self.predict(x[i]))) == y[i]:
correct += 1
else:
incorrect += 1
pbar.finish()
print(f'\nTotal: {y.size}, Correct: {correct}, Incorrect: {incorrect}')
print(f'Rate: {correct / y.size * 100.0:.4f}%, Loss: {self.get_loss(x, y)}')
return self.get_loss(x, y)
def train(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
"""
Train the model using given data set.
:param x: array of size (m, n)
:param y: array of size (n, )
:param epoch_count: epoch count
:param alpha: learning rate
:return: the model itself
"""
assert x.shape[1] == self.input_params
print('WARNING: Start training ...')
new_layers = deepcopy(self.layers)
# don't repeatedly calculate prediction, `z` and `a` in each epoch
for e in range(epoch_count):
print(f'Epoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = self.total_params * len(x)
for i in range(len(x)):
feature, target = x[i], y[i]
f = len(self.layers) - 1
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
factor = alpha * self.loss_func.dl_dp(prediction, target)
cache = self.new_cache()
for l, layer in reversed_enumerate(self.layers):
for j in range(layer.units):
pbar.update(c / cc * 100.0)
for k in range(layer.params):
c += 1
new_layers[l].w[j, k] -= factor * self.pd(z, a, j, k, l, 0, f, feature, cache)
self.layers = new_layers
pbar.finish()
return self
def pd(self, z: List[np.ndarray], a: List[np.ndarray], t_unit: int, t_param: int, t_layer: int,
c_unit: int, c_layer: int, features: np.ndarray, cache: np.ndarray) -> int:
result = 0
for i in range(cache[t_layer + 1].shape[0]):
result += cache[t_layer + 1]
result = self.activation_func.df_dz(z[c_layer][c_unit])
if c_layer == t_layer and t_unit == c_unit:
if c_layer == 0:
result *= features[t_param]
else:
result *= a[c_layer - 1][t_param]
elif c_layer == t_layer and t_unit != c_unit:
result = 0
else:
total_params = self.layers[c_layer - 1].units
r = 0
for i in range(total_params):
r += self.layers[c_layer].w[c_unit, i] \
* self.pd(z, a, t_unit, t_param, t_layer, i, c_layer - 1, features)
result *= r
cache[t_layer][t_unit, t_param] = result
return result
def train_ng(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float):
assert x.shape[1] == self.input_params
for e in range(epoch_count):
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = self.total_units * len(x)
c += 1
pbar.update(min(c / cc * 100.0, 100))
for i in range(len(x)):
dj_da = [[0] * layer.units for layer in self.layers]
dj_dw = self.new_cache()
dj_db = [[0] * layer.units for layer in self.layers]
layers = deepcopy(self.layers)
feature, target = x[i], y[i]
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
dj_da[-1][0] = self.loss_func.dl_dp(prediction, target)
tp = self.loss_func.dl_dp(prediction, target)
for m in range(layers[-1].params):
dj_dw[-1][0, m] = tp * self.activation_func.df_dz(z[-1][0]) * a[-2][m]
layers[-1].w[0, m] -= alpha * dj_dw[-1][0, m]
dj_db[-1][0] = tp * self.activation_func.df_dz(z[-1][0])
layers[-1].b[0] -= alpha * dj_db[-1][0]
j = len(layers) - 2
for _, layer in reversed_enumerate(layers[1:-1]):
for k in range(layers[j].units):
c += 1
pbar.update(min(c / cc * 100.0, 100))
for l in range(layers[j + 1].units):
tp = dj_da[j + 1][l] * self.activation_func.df_dz(z[j + 1][l]) * self.layers[j + 1].w[l, k]
dj_da[j][k] += tp
for m in range(layers[j].params):
dj_dw[j][k, m] += tp * self.activation_func.df_dz(z[j][k]) * a[j - 1][m]
dj_db[j][k] += tp * self.activation_func.df_dz(z[j][k])
for m in range(layers[j].params):
layers[j].w[k, m] -= alpha * dj_dw[j][k, m]
layers[j].b[k] -= alpha * dj_db[j][k]
j -= 1
for k in range(layers[0].units):
c += 1
# pbar.update(min(c / cc * 100.0, 100))
for l in range(layers[1].units):
tp = dj_da[1][l] * self.activation_func.df_dz(z[1][l]) * self.layers[1].w[l, k]
dj_da[0][k] += tp
for m in range(layers[0].params):
dj_dw[0][k, m] += tp * self.activation_func.df_dz(z[0][k]) * feature[m]
dj_db[0][k] = tp * self.activation_func.df_dz(z[0][k])
for m in range(layers[0].params):
dj_dw[0][k, m] -= alpha * dj_dw[0][k, m]
self.layers = deepcopy(layers)
pbar.finish()
return 0
def train_2(self, x: np.ndarray, y: np.ndarray, epoch_count: int, alpha: float, lambda_: float):
print('WARNING: Start training ...')
try:
for e in range(epoch_count):
print(f'\nEpoch {e + 1}/{epoch_count}, loss {self.get_loss(x, y)}')
pbar = progressbar.ProgressBar(
widgets=[progressbar.Percentage('%(percentage)3.4f%%'), ' ', progressbar.Bar('#'), ' ',
progressbar.Timer(), ' ', progressbar.ETA(), ' '], maxval=100)
c = 0
cc = x.shape[0] * len(self.layers)
for i in range(x.shape[0]):
layers = deepcopy(self.layers)
feature, target = x[i], y[i]
z, a = self.predict_verbose(feature)
prediction = float(self.predict(feature))
dj_dz = [np.zeros((layer.units,)) for layer in self.layers]
dj_db = [np.zeros((layer.units,)) for layer in self.layers]
dj_dw = [np.zeros((layer.units, layer.params)) for layer in self.layers]
for l in range(len(self.layers) - 1, -1, -1):
c += 1
pbar.update(c / cc * 100.0)
if l == len(self.layers) - 1:
dj_dz[l] = self.loss_func.dl_dp(prediction, target) \
* self.activation_func.df_dz(z[-1])
else:
dj_dz[l] = np.dot(dj_dz[l + 1], self.layers[l + 1].w) * self.activation_func.df_dz(z[l])
dj_db[l] = dj_dz[l]
layers[l].b -= alpha * dj_db[l]
if l == 0:
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
feature.reshape(1, feature.shape[0]))
else:
dj_dw[l] = np.matmul(dj_dz[l].reshape(dj_dz[l].shape[0], 1),
a[l - 1].reshape(1, a[l-1].shape[0]))
layers[l].w -= alpha * (dj_dw[l] + lambda_ * self.layers[l].w / x.shape[0]) # L2 regularization
self.layers = layers
pbar.finish()
except KeyboardInterrupt:
print('\nTraining process interrupted. Data since last the last epoch will be lost.\n')

39
testnn.py Normal file
View File

@ -0,0 +1,39 @@
import simplenn
import MNIST
import numpy as np
from time import sleep
from copy import deepcopy
sigmoid = simplenn.Sigmoid()
cross_entropy = simplenn.CrossEntropy()
x, y = MNIST.load_data('./data/mnist_train.csv', rescale=True)
m, n = x.shape
units = np.array([25, 15, 1])
model = simplenn.Model(units, n, sigmoid, cross_entropy)
# model.evaluate(x, y)
# sleep(5)
print('\n============Train===========')
model.train_2(x, y, 20, 0.001, 1)
print('\n=====Evaluate(training)=====')
model.evaluate(x, y)
print('\n=======Evaluate(test)=======')
x, y = MNIST.load_data('./data/mnist_test.csv', rescale=True)
model.evaluate(x, y)
# x, y = MNIST.load_data('./data/mnist_train.csv', max_size=100)
# m, n = x.shape
# units = np.array([25, 15, 1])
# model = simplenn.Model(units, n, sigmoid, cross_entropy)
# model1 = deepcopy(model)
# model1.train_ng(x, y, 20, 0.6)
# model1.evaluate(x, y)
# x = np.array([[0, 0],
# [1, 1]])
# y = np.array([0, 1])
#
# units = np.array([3, 1])
# model = simplenn.Model(units, 2, sigmoid, cross_entropy)
# model.train_ng(x, y, 500, 0.6)
# model.evaluate(x, y)