测试 code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# -*- coding: UTF-8 -*- import numpy as np from datetime import datetime from tensorflow.examples.tutorials.mnist import input_data # 全连接层实现类 class FullConnectedLayer(object): def __init__(self, input_size, output_size, activator): """ 构造函数 input_size: 本层输入向量的维度 output_size: 本层输出向量的维度 activator: 激活函数 """ self.input_size = input_size self.output_size = output_size self.activator = activator # 权重数组W self.W = np.random.uniform(-0.1, 0.1, (output_size, input_size)) # 偏置项b self.b = np.zeros((output_size, 1)) # 输出向量 self.output = np.zeros((output_size, 1)) def forward(self, input_array): """ 前向计算 input_array: 输入向量,维度必须等于input_size """ # 式2 input_array.shape = (len(input_array), 1) self.input = input_array wx = np.dot(self.W, input_array) # m*n * n*1 = m,1 但是却自动变成行向量了。 # wx.shape = (len(wx), 1) # 此处强制转一下 self.output = self.activator.forward(wx + self.b) def backward(self, delta_array): """ 反向计算W和b的梯度 delta_array: 从上一层传递过来的误差项 """ # 式8 self.delta = self.activator.backward(self.input) * np.dot( self.W.T, delta_array) self.W_grad = np.dot(delta_array, self.input.T) self.b_grad = delta_array def update(self, learning_rate): """ 使用梯度下降算法更新权重 """ self.W += learning_rate * self.W_grad self.b += learning_rate * self.b_grad # Sigmoid激活函数类 class SigmoidActivator(object): def forward(self, weighted_input): return 1.0 / (1.0 + np.exp(-weighted_input)) def backward(self, output): return output * (1 - output) # 神经网络类 class Network(object): def __init__(self, layers): """ 构造函数 """ self.layers = [] for i in range(len(layers) - 1): self.layers.append( FullConnectedLayer( layers[i], layers[i+1], SigmoidActivator() ) ) def predict(self, sample): """ 使用神经网络实现预测 sample: 输入样本 """ output = sample for layer in self.layers: layer.forward(output) output = layer.output return output def train(self, labels, data_set, rate, epoch): """ 训练函数 labels: 样本标签 data_set: 输入样本 rate: 学习速率 epoch: 训练轮数 """ for i in range(epoch): for d in range(len(data_set)): self.train_one_sample(labels[d], data_set[d], rate) def train_one_sample(self, label, sample, rate): self.predict(sample) self.calc_gradient(label) self.update_weight(rate) def calc_gradient(self, label): label.shape = (len(label), 1) # label 转成列向量 delta = self.layers[-1].activator.backward( self.layers[-1].output ) * (label - self.layers[-1].output) for layer in self.layers[::-1]: layer.backward(delta) delta = layer.delta return delta def update_weight(self, rate): for layer in self.layers: layer.update(rate) def get_result(vec): max_value_index = 0 max_value = 0 for i in range(len(vec)): if vec[i] > max_value: max_value = vec[i] max_value_index = i return max_value_index def evaluate(network, test_data_set, test_labels): error = 0 total = len(test_data_set) for i in range(total): label = get_result(test_labels[i]) predict = get_result(network.predict(test_data_set[i])) if label != predict: error += 1 return float(error) / float(total) def train_and_evaluate(): last_error_ratio = 1.0 epoch = 0 mnist = input_data.read_data_sets('MNIST_data/', one_hot=True) train = mnist.train.next_batch(6000) test = mnist.test.next_batch(1000) train_data_set = train[0] train_labels = train[1] test_data_set = test[0] test_labels = test[1] network = Network([784, 300, 10]) print("network build finished.") while True: epoch += 1 network.train(train_labels, train_data_set, 0.3, 1) print('%s epoch %d finished' % (datetime.now(), epoch)) if epoch % 10 == 0: error_ratio = evaluate(network, test_data_set, test_labels) print('%s after epoch %d, error ratio is %f' % (datetime.now(), epoch, error_ratio)) if error_ratio > last_error_ratio: break else: last_error_ratio = error_ratio if __name__ == '__main__': train_and_evaluate() |
测试成功。