人工智能实践——一个基础神经网络源码分析

  • Post author:
  • Post category:其他


x_data = datasets.load_iris().data
y_data = datasets.load_iris().target

datasets.load_iris()返回的是<class ‘sklearn.utils.Bunch’>

我们可以利用type(datasets.load_iris())得到属性。姑且这里可以理解为类吧。

np.random.seed(116)#随机数种子
np.random.shuffle(x_data)#打乱顺序,因为原数组y值按序排列,会影响模型拟合性
np.random.seed(116)
np.random.shuffle(y_data)



扩展:np.random随机模块

1:np.random.rand(3,2) 返回[3,2]的随机数


array([[ 0.14022471, 0.96360618], #random

[ 0.37601032, 0.25528411], #random

[ 0.49313049, 0.94909878]]) #random

2:a=np.random.randn(3,4) 随机正态分布的样本【normal distribution】


[[-0.33048377 0.23360005 -0.90960278 -1.71455775]

[ 0.15790672 -1.75625325 1.44082303 -0.27947681]

[ 1.96307815 -0.15549447 1.43778136 0.07263787]]

3:返回随机整数np.random.randint(low,high,size=(a,b))

一位参数:0-high)

二位参数:[low,high)

三位参数

a=np.random.randint(3,9, size=(2,4))


[[7 5 5 5]

[7 5 3 3]]


b=np.random.randint(1, size=(3,6))


[[0 0 0 0 0 0]

[0 0 0 0 0 0]

[0 0 0 0 0 0]]

4:返回0-1的随机浮点数 np.random.random([3,4])

a=2*np.random.random([3,4])+3


[[3.80825656 4.19216891 3.13883402 4.80575456]

[3.24359894 3.58765032 3.5430586 4.72078944]

[3.87530062 4.50962233 4.2954225 3.22771614]]

5:np.random.seed()

设置的seed()值仅一次有效

np.random.seed(100)

a=np.random.rand(3,4)

b=np.random.rand(3,4)

print(a)

print(b)


[[0.54340494 0.27836939 0.42451759 0.84477613]

[0.00471886 0.12156912 0.67074908 0.82585276]

[0.13670659 0.57509333 0.89132195 0.20920212]]

[[0.18532822 0.10837689 0.21969749 0.97862378]

[0.81168315 0.17194101 0.81622475 0.27407375]

[0.43170418 0.94002982 0.81764938 0.33611195]]


会发现两次随机数结果不一样

6:np.random.shuffle(y_data) 随机打乱顺序

x_train= x_data[:-30]
y_train= y_data[:-30]
x_test=x_data[-30:]
y_test=y_data[-30:]

-30代表倒数第30个 [:-30]代表从0到倒数第30个

[-30:]代表倒数第30个到最后一个

x[rows,columns] 逗号前是对行操作,后是对列操作 。

x_train = tf.cast(x_train, tf.float32)
x_test=tf.cast(x_test,tf.float32)

强制类型转换,转为tf.float32

#batch为分组,即每一组分为Batch个

train_db = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(10)



扩展: tf.data.Dataset.from_tensor_slices

将x_train和y_train,x_test和y_test相互关联【from_tensor_slices函数切分传入的 Tensor 的第一个维度,生成相应的 dataset】

关联x_train和y_train关联后为Dataset,无法正常输出【可以理解为很多块batch】打印可以发现

有两种输出方式:

1:迭代器


train_iter = iter(train_db)

sample = next(train_iter)【很类似于早期java的List】

此时sample是数组,sample[0]是x_train的第一个batch,sample[1]是y_train的第一个batch


sample =array([1, 1, 1, 0, 0, 2, 2, 2, 1, 0, 0, 2, 0, 1, 0, 1, 0, 1, 2, 2, 1, 0,

0, 0, 0, 1, 1, 1, 0, 2, 0, 2])>)

sample[0]batch: (32, 4) (32,)

sample[1]batch: (10, 4) (10,)

2:for step, (x_train, y_train) in enumerate(train_db):

此时x_train和y_train都为张量



扩展:batch(32)

将数组分为32个一batch,是一种优化算法

w1 = tf.Variable(tf.random.truncated_normal([4, 32], stddev=0.1,seed=1))
b1=tf.Variable(tf.random.truncated_normal([32], stddev=0.1,seed=1))
w2 = tf.Variable(tf.random.truncated_normal([32, 32], stddev=0.1,seed=2))
b2=tf.Variable(tf.random.truncated_normal([32], stddev=0.1,seed=2))
w3 = tf.Variable(tf.random.truncated_normal([32, 3], stddev=0.1,seed=3))
b3=tf.Variable(tf.random.truncated_normal([3], stddev=0.1,seed=3))



扩展:动态分布和变量与常量

tf.Variable:生成变量

tf.constant:生成常量

正态分布生成:

tf.random.truncated_normal

从截断的正态分布中输出随机值。

生成的值服从具有指定平均值和标准偏差的正态分布,如果生成的值大于平均值2个标准偏差的值则丢弃重新选择。

shape: 一维的张量,也是输出的张量。

mean: 正态分布的均值。

stddev: 正态分布的标准差。

dtype: 输出的类型。

seed: 一个整数,当设置之后,每次生成的随机数都一样。

name: 操作的名字。

tf.random_normal(shape, mean=0.0, stddev=1.0, dtype=tf.float32, seed=None, name=None)

从正态分布中输出随机值。

lr = 0.1#学习率
train_loss_results = []#损失总值
epoch = 500#迭代次数
loss_all=0
for epoch in range(epoch):
   for step, (x_train, y_train) in enumerate(train_db):
       with tf.GradientTape() as tape:
           #可以理解成有两个隐藏层和一个输出层
           h1 = tf.matmul(x_train, w1) + b1
           h2 = tf.matmul(h1,w2) + b2
           y = tf.matmul(h2, w3) + b3
           y_onehot = tf.one_hot(y_train, depth=3)
           loss = tf.reduce_mean(tf.square(y_onehot - y))
           loss_all+=loss.numpy()
           
       # compute gradients
       #此处grads=[dw1,db1,dw2,db2,dw3,db3]
       grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
       # w1 = w1 - lr * w1_grad
       # 反向传播进行更新
       w1.assign_sub(lr * grads[0])
       b1.assign_sub(lr * grads[1])
       w2.assign_sub(lr * grads[2])
       b2.assign_sub(lr * grads[3])
       w3.assign_sub(lr * grads[4])
       b3.assign_sub(lr * grads[5])

   if epoch %100==0:
   print(epoch, step, 'loss:', float(loss))



扩展: with tf.GradientTape() as tape梯度下降


print(“一元梯度”)

x=tf.constant(value=3.0)

with tf.GradientTape(persistent=True,watch_accessed_variables=True) as tape:

tape.watch(x)

y1=2

x

y2=x

x+2

y3=x

x+2

x

dy1_dx=tape.gradient(target=y1,sources=x)

dy2_dx = tape.gradient(target=y2, sources=x)

dy3_dx = tape.gradient(target=y3, sources=x)

print(“dy1_dx:”,dy1_dx)

print(“dy2_dx:”, dy2_dx)

print(“dy3_dx:”, dy3_dx)

print(“二元梯度”)

x = tf.constant(value=3.0)

y = tf.constant(value=2.0)

with tf.GradientTape(persistent=True,watch_accessed_variables=True) as tape:

tape.watch([x,y])

z1=x

x

y+x*y

#一阶导数

dz1_dx=tape.gradient(target=z1,sources=x)

dz1_dy = tape.gradient(target=z1, sources=y)

dz1_d=tape.gradient(target=z1,sources=[x,y])

print(“dz1_dx:”, dz1_dx)

print(“dz1_dy:”, dz1_dy)

print(“dz1_d:”,dz1_d)

print(“type of dz1_d:”,type(dz1_d))

输出:


dy1_dx: tf.Tensor(2.0, shape=(), dtype=float32)

dy2_dx: tf.Tensor(6.0, shape=(), dtype=float32)

dy3_dx: tf.Tensor(8.0, shape=(), dtype=float32)

二元梯度

dz1_dx: tf.Tensor(14.0, shape=(), dtype=float32)

dz1_dy: tf.Tensor(12.0, shape=(), dtype=float32)

dz1_d: [<tf.Tensor: id=52, shape=(), dtype=float32, numpy=14.0>, <tf.Tensor: id=53, shape=(), dtype=float32, numpy=12.0>]

type of dz1_d: <class ‘list’>

1:tf.GradientTape(persistent=False,watch_accessed_variables=True)

作用:创建一个新的GradientTape

参数:

persistent: 布尔值,用来指定新创建的gradient tape是否是可持续性的。默认是False,意味着只能够调用一次gradient()函数。

watch_accessed_variables: 布尔值,表明这个gradien tap是不是会自动追踪任何能被训练(trainable)的变量。默认是True。要是为False的话,意味着你需要手动去指定你想追踪的那些变量。

2:watch(tensor)

作用:确保某个tensor被tape追踪

参数:tensor: 一个Tensor或者一个Tensor列表【通常是可以自动追踪的,只要是变量】

3:gradient(target,sources,dtype)

作用:根据tape上面的上下文来计算某个或者某些tensor的梯度【表示为[x1,x2,x3]】

参数:

target: 被微分的Tensor或者Tensor列表,你可以理解为经过某个函数之后的值

sources: Tensors 或者Variables列表(当然可以只有一个值). 你可以理解为函数的某个变量



扩展:张量的处理

注意要用tf的方法来处理张量,且符合广播原则

h1 = tf.matmul(x_train, w1) + b1# x_train(32,3) w1(3,32) b1(32) 结果(32,32)

h2 = tf.matmul(h1,w2) + b2#(32,3) #h1(32,32) w2(32,32) b2(32) 结果(32,32)

y = tf.matmul(h2, w3) + b3#(32,3)#h2(32,32) w3(32,3) b2(32) 结果(32,3)

看一下值

print(pd.DataFrame(y.numpy().round(3)).head())

y.numpy将张量转化为numpy处理,再转化为DataFrame,取前五个值

0 1 2

0 -0.137 -0.005 0.099

1 -0.157 -0.003 0.110

2 -0.158 -0.003 0.113

3 -0.038 -0.046 0.030

4 -0.047 -0.041 0.035

可以看到在0左右波动,因为是对y_train的预测,y_train的值0,1,2,所以得到的y值也应该在这些值范围内波动



扩展:独热编码

y_onehot = tf.one_hot(y_train, depth=3)

loss = tf.reduce_mean(tf.square(y_onehot – y))

loss_all+=loss.numpy()

这里编码为独热编码,我理解为因为y【150,3】的矩阵 ,而y_train是一个[150,1]的矩阵,是无法运算的,所以采用独热编码depth=3即转化为[150,3]的矩阵,这样就可以相互运算。


可能会有疑问,转化为独热后,y_train的意义不就失效了吗?


注意神经网络是利用反向传播让loss最小,而loss是one_hot(y_train)和y的差值经过运算得来的【理解为相异性】,所以loss越小,意味着两者相异性越低,即正确率越高


这里补充一点。上面y【150,3】错误的!因为已经分batch了,所以应该是[32,3]且最后一个是[24,3 y_train同理

]

同样loss是张量,无法和loss_all加减,需要转化为numpy



扩展:张量的修改

张量是无法通过加减直接修改的,需要调用下列函数

tf.assign(ref, value, validate_shape = None, use_locking = None, name=None)

tf.assign_add(ref, value, use_locking = None, name=None)

tf.assign_sub(ref, value, use_locking = None, name=None)

tf.variable.assign(value, use_locking=False)

tf.variable.assign_add(delta, use_locking=False)

tf.variable.assign_sub(delta, use_locking=False)

这6个函数本质上是一样的,都是用来对变量值进行更新,其中tf.assign还可以更新变量的shape。

解释一下它们的意思:tf.assign是用value的值赋给ref,这种赋值会覆盖掉原来的值,即更新而不会创建一个新的tensor。tf.assign_add相当于ref=ref+value来更新ref。tf.assign_sub相当于ref=ref-value来更新ref。tf.variable.assign相当于tf.assign(ref, value)。同理tf.variable.assign_add和tf.variable.assign_sub。

#test(做测试)
total_correct, total_number = 0, 0
for step,(x_train, y_train) in enumerate(test_db):
    h1 = tf.matmul(x_train, w1) + b1
    h2 = tf.matmul(h1,w2) + b2
    y = tf.matmul(h2, w3) + b3
    pred=tf.argmax(y, axis=1)#按行获得每一行的最大值
    # 因为pred的dtype为int64,在计算correct时会出错,所以需要将它转化为int32
    pred = tf.cast(pred, dtype=tf.int32)
    correct=tf.cast(tf.equal(pred, y_train), dtype=tf.int32)#获得一个(10,1)数组
    correct=tf.reduce_sum(correct)#从correct数组中求,按列相加。得到一个0维张量
    total_correct += int(correct)#记录正确的记录,张量需要强转
    total_number += x_train.shape[0]#记录的是所有记录
acc=total_correct/total_number
print("test_acc:",acc)

代码如下:

#-*- coding: UTF-8 -*-
import tensorflow as tf
import os
from sklearn import datasets
from matplotlib import pyplot as plt
import numpy as np
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # 只显示error,不显示其他信息
x_data = datasets.load_iris().data
y_data = datasets.load_iris().target
#随机打乱数据
np.random.seed(116)
np.random.shuffle(x_data)
np.random.seed(116)
np.random.shuffle(y_data)

x_train= x_data[:-30]
y_train= y_data[:-30]
x_test=x_data[-30:]
y_test=y_data[-30:]

x_train = tf.cast(x_train, tf.float32)
x_test=tf.cast(x_test,tf.float32)
#from_tensor_slices函数切分传入的 Tensor 的第一个维度,生成相应的 dataset
#batch为分组,即每一组分为Batch个
train_db = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)
test_db = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(10)
#iter用来生成迭代器
train_iter = iter(train_db)
#next() 返回迭代器的下一个项目
sample = next(train_iter)
print('batch:', sample[0].shape, sample[1].shape)



#seed: 随机数种子,是一个整数,当设置之后,每次生成的随机数都一样
w1 = tf.Variable(tf.random.truncated_normal([4, 32], stddev=0.1,seed=1))
b1=tf.Variable(tf.random.truncated_normal([32], stddev=0.1,seed=1))
w2 = tf.Variable(tf.random.truncated_normal([32, 32], stddev=0.1,seed=2))
b2=tf.Variable(tf.random.truncated_normal([32], stddev=0.1,seed=2))
w3 = tf.Variable(tf.random.truncated_normal([32, 3], stddev=0.1,seed=3))
b3=tf.Variable(tf.random.truncated_normal([3], stddev=0.1,seed=3))

lr = 0.1
train_loss_results = []
epoch = 500
loss_all=0
for epoch in range(epoch):
    for step, (x_train, y_train) in enumerate(train_db):

        with tf.GradientTape() as tape:
            #可以理解成有两个隐藏层和一个输出层
            h1 = tf.matmul(x_train, w1) + b1
            h2 = tf.matmul(h1,w2) + b2
            y = tf.matmul(h2, w3) + b3

            y_onehot = tf.one_hot(y_train, depth=3)

            # mse = mean(sum(y-out)^2)
            loss = tf.reduce_mean(tf.square(y_onehot - y))
            loss_all+=loss.numpy()

        # compute gradients
        #此处grads=[dw1,db1,dw2,db2,dw3,db3]
        grads = tape.gradient(loss, [w1, b1, w2, b2, w3, b3])
        # w1 = w1 - lr * w1_grad
        # 反向传播进行更新
        w1.assign_sub(lr * grads[0])
        b1.assign_sub(lr * grads[1])
        w2.assign_sub(lr * grads[2])
        b2.assign_sub(lr * grads[3])
        w3.assign_sub(lr * grads[4])
        b3.assign_sub(lr * grads[5])

    if epoch%100==0:
        print(epoch, step, 'loss:', float(loss))


    # test(做测试)
total_correct, total_number = 0, 0
for step,(x_train, y_train) in enumerate(test_db):

    h1 = tf.matmul(x_train, w1) + b1
    h2 = tf.matmul(h1,w2) + b2
    y = tf.matmul(h2, w3) + b3

    pred=tf.argmax(y, axis=1)


     # 因为pred的dtype为int64,在计算correct时会出错,所以需要将它转化为int32
    pred = tf.cast(pred, dtype=tf.int32)
    correct=tf.cast(tf.equal(pred, y_train), dtype=tf.int32)
    correct=tf.reduce_sum(correct)
    total_correct += int(correct)
    total_number += x_train.shape[0]
acc=total_correct/total_number
print("test_acc:",acc)

#绘制loss曲线
plt.title('Loss Function Curve')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.plot(train_loss_results)
plt.show()



版权声明:本文为weixin_45680007原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。