主要介绍基于 计算图(即正向传播、反向传播)与 链式求导法则 计算梯度,以及基于此对参数作随机梯度下降,并封装一个简单的线性回归模型以作调试,最后绘制loss图像。
与 python使用 数值微分法 求梯度,实现单层线性回归 相比,难点在于作出计算图,并写出线性计算、损失函数的forward、backward。
且模型封装更加合理,接近实际,有助于理解pytoch的各项函数。
此外,该方法比数值微分法快。
模型
y = X W + b y = XW + b y=XW+b
y为标量,X列数为2. 损失函数使用均方误差。
构建数据
def build_data(weights, bias, num_examples):
x = np.random.randn(num_examples, len(weights))
y = x.dot(weights) + bias
# 给y加个噪声
y += np.random.rand(1)
return x, y
def data_iter(features, labels, batch_size):
num_examples = len(features)
# 按样本数量构造索引
indices = list(range(num_examples))
# 打乱索引数组
np.random.shuffle(indices)
for i in range(0, num_examples, batch_size):
batch_indices = np.array(indices[i:min(i + batch_size, num_examples)])
yield features[batch_indices], labels[batch_indices]
线性层的forward、backward
class Affine:
def __init__(self, W, b):
self.W = W
self.b = b
self.x = None
self.original_x_shape = None
# 权重和偏置参数的导数
self.dW = None
self.db = None
def forward(self, x):
# 对应张量
self.original_x_shape = x.shape
x = x.reshape(x.shape[0], -1)
self.x = x
out = np.dot(self.x, self.W) + self.b
return out
def backward(self, dout):
dx = np.dot(dout, self.W.T)
self.dW = np.dot(self.x.T, dout)
self.db = np.sum(dout, axis=0)
dx = dx.reshape(*self.original_x_shape) # 还原输入数据的形状(对应张量)
return dx
均方误差的forward、backward
class MeanSquaredError:
def __init__(self):
self.loss = None
self.diff = None
self.batch_size = None
def forward(self, y_pred, y_true):
self.batch_size = y_true.shape[0] # 样本数量
self.diff = y_pred - y_true # 预测值与真实值之差
self.loss = np.mean(self.diff ** 2) # 损失值
return self.loss
def backward(self, dout=None):
if dout is None:
dout = np.ones_like(self.diff)
grad = 2 * ((self.diff * dout) / self.batch_size)
return grad
模型封装
class Network:
def __init__(self, input_size, output_size, weight_init_std=0.01):
self.params = {'w1': np.random.rand(input_size, output_size),
'b1': np.array([0.0])}
self.layers = OrderedDict()
self.layers['A1'] = Affine(self.params['w1'], self.params['b1'])
self.lastLayer = MeanSquaredError()
def predict(self, x):
for layer in self.layers.values():
x = layer.forward(x)
return x
def loss(self, x, y):
pred_y = self.predict(x)
return self.lastLayer.forward(pred_y, y)
def gradient(self, x, y):
# forward
# 触发计算 即在loss中调用一次predict(x)以及均方损失
self.loss(x, y)
# backward
# dout的shape应和模型最终输出的shape一致
# 首先要对损失函数求导
dout = 1
dout = self.lastLayer.backward(dout)
# 将正向传播的次序反转, 注意OrderedDict是有序字典,反转之后就按计算图求逆向传播了
layers = list(self.layers.values())
layers.reverse()
for layer in layers:
dout = layer.backward(dout)
# backward走完之后,会将求导结果放置在本层中
grads = dict()
grads['w1'] = self.layers['A1'].dW
grads['b1'] = self.layers['A1'].db
return grads
运行测试
if __name__ == '__main__':
start = time.perf_counter()
# np.random.seed(1)
true_w1 = np.random.rand(2, 1)
true_b1 = np.random.rand(1)
# true_w1 = np.array([[3.0], [4.0]])
# true_b1 = np.array([5.0]) x_train, y_train = build_data(true_w1, true_b1, 5000)
net = Network(2, 1, 0.01)
init_loss = net.loss(x_train, y_train)
print(net.params)
loss_history = list()
loss_history.append(init_loss)
num_epochs = 1
batch_size = 50
learning_rate = 0.01
for i in range(num_epochs):
for x_batch, y_batch in data_iter(x_train, y_train, batch_size):
grads = net.gradient(x_batch, y_batch)
for key in grads:
net.params[key] -= learning_rate * grads[key]
running_loss = net.loss(x_batch, y_batch)
loss_history.append(running_loss)
# current_loss = net.loss(x_train, y_train)
# loss_history.append(current_loss)
# print(f'第{i}次:{net.params}')
plt.title("基于 计算图 计算梯度 的单层简单线性模型", fontproperties="STSong")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.plot(loss_history, linestyle='dotted')
plt.show()
# print(loss_history)
print(f'初始损失值:{init_loss}')
print(f'最后一次损失值:{loss_history[-1]}')
print(f'正确参数: true_w1={true_w1}, true_b1={true_b1}')
print(f'预测参数: true_w1={net.params["w1"]}, true_b1={net.params["b1"]}')
print()
end = time.perf_counter()
print(f"运行时间:{(end - start) * 1000}毫秒")
运行结果
文章评论