DTeam 团队日志

Doer、Delivery、Dream

土法炮制:卷积网络是如何实现的?

Posted at — Dec 12, 2019 阅读

上一篇中,我们已经看到如何从 0 到 1 实现一个前馈网络。本文则将展示卷积网络的构建过程。为了测试自行实现的效果,我们将采用 MNIST 数据集作为验证。

按照惯例,我们先看一下 Keras 的做法。

使用 Keras 的做法

但凡讲深度学习的书,讲到卷积网络时基本都会祭出 MNIST 数据集作为入门。因此,作为一篇展示卷积网络如何实现的文章,遵循这个惯例是有好处的。这样一来,我们土法炮制的卷积网络也就有了一个可以参考的效果。

废话少说,直接上代码(keras版):

import tensorflow as tf

# 数据集
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape((-1, 28, 28, 1))
x_test = x_test.reshape((-1, 28, 28, 1))
x_train, x_test = x_train / 255.0, x_test / 255.0

# 定义
model = tf.keras.models.Sequential([
  tf.keras.layers.Conv2D(32, 7, activation='relu', padding="same", input_shape=[28, 28, 1]),
  tf.keras.layers.MaxPooling2D(2),
  tf.keras.layers.Conv2D(64, 3, activation='relu', padding="same"),
  tf.keras.layers.MaxPooling2D(2),
  tf.keras.layers.Conv2D(64, 3, activation='relu', padding="same"),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(64, activation='relu'),
  tf.keras.layers.Dropout(0.5),
  tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# 训练
model.fit(x_train, y_train, epochs=5, validation_split=0.1)

# 预测
print('This predicted value of first picture in test set is %x' % model.predict_classes(x_test[:1])[0])
print('This real value of first picture in test set is %x' % y_test[0])

因为本文的目的不是去解释那些基本概念,而是直接展示卷积网络的内部实现,所以呢,假如你对卷积、池化、Dropout、卷积网络等都不太熟悉的话,建议先去补充基础知识。动手学深度学习是一个不错的起点和参考。

对于上面的代码,有一点需要注意:这里的 loss 采用的是 sparse_categorical_crossentropy ,这是因为数据集的 label 是数字,而该函数期望入参的就是数字和概率数组。假如你期望的是 one-hot 编码,那么采用 CategoricalCrossentropy 。

使用 TF 实现自定义结构

很明显,为了实现一个卷积网络,以上面的例子来讲,我们需要自行实现的类有:

记忆力不错的小伙伴应该还记得在上一篇中已经实现了 Dense 和 Model。本文这部分的代码与前面区别不大,但因为有些参数是原来没有的,故仍然还是单独列出了。这样也方便每篇文章都可以独立阅读,不需要来回参考。

那么,接下来就展示一下每部分的代码。

MyDense

这部分代码唯一变动的就是 call 部分会根据激活函数不同而调用不同的 TF 实现。

class MyDense(Layer):

    def __init__(self, units=32, activation='relu'):
        super(MyDense, self).__init__()
        self.units = units
        self.activation = activation

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
            initializer='random_normal', trainable=True)
        self.b = self.add_weight(shape=(self.units,),
            initializer='random_normal', trainable=True)

    def call(self, inputs):
        if self.activation == 'relu':
            return tf.nn.relu(tf.matmul(inputs, self.w) + self.b)
        elif self.activation == 'softmax':
            return tf.nn.softmax(tf.matmul(inputs, self.w) + self.b)

MyConv2D

自定义卷积层,注意:

class MyConv2D(Layer):
    
    def __init__(self, filters, kernel_size):
        super(MyConv2D, self).__init__()
        self.filters = filters
        self.kernel_size = kernel_size

    def build(self, input_shape):
        self.w = self.add_weight(shape=(self.kernel_size, self.kernel_size, input_shape[-1], self.filters),
            initializer='random_normal', trainable=True)
        self.b = self.add_weight(shape=(1,),
            initializer='random_normal', trainable=True)
    
    def call(self, inputs):
        return tf.nn.relu(tf.nn.conv2d(inputs, self.w, strides=1, padding='SAME') + self.b)

MaxPoolong

自定义 MaxPoolong 层,注意:

class MyMaxPooling2D(Layer):
    
    def __init__(self, pool_size):
        super(MyMaxPooling2D, self).__init__()
        self.pool_size = pool_size
    
    def call(self, inputs):
        return tf.nn.max_pool2d(inputs, self.pool_size, strides=1, padding='SAME')

MyDropout

自定义 Dropout 层,注意:

class MyDropout(Layer):
    
    def __init__(self, rate):
        super(MyDropout, self).__init__()
        self.rate = rate
    
    def call(self, inputs, training=False):
        if training:
            return tf.nn.dropout(inputs, self.rate)
        else:
            return inputs

MyFlatten

自定义 Flatten 层,注意:

class MyFlatten(Layer):
    
    def __init__(self):
        super(MyFlatten, self).__init__()
    
    def call(self, inputs):
        shape = inputs.get_shape().as_list()
        return tf.reshape(inputs, [shape[0], shape[1] * shape[2] * shape[3]])

MyModel

与前文代码差别不大,主要差异就在损失函数和优化器更换,此外在更新准确率之前需要转换。

class MyModel(Layer):

    def __init__(self, layers):
        super(MyModel, self).__init__()
        self.layers = layers

    def call(self, inputs):
        x = self.layers[0](inputs)
        for layer in self.layers[1:-1]:
            x = layer(x)
        return self.layers[-1](x)
    
    def train(self, x_train, y_train, epochs = 5):
        loss = tf.keras.losses.SparseCategoricalCrossentropy()
        optimizer = tf.keras.optimizers.Adam()
        accuracy = tf.keras.metrics.Accuracy()

        dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
        dataset = dataset.shuffle(buffer_size=1024).batch(64)

        for epoch in range(epochs):
            for step, (x, y) in enumerate(dataset):
                with tf.GradientTape() as tape:

                    # Forward pass.
                    y_pred = model(x)
                    
                    # Loss value for this batch.
                    loss_value = loss(y, y_pred)

                    # Get gradients of loss wrt the weights.
                    gradients = tape.gradient(loss_value, model.trainable_weights)

                    # Update the weights of our linear layer.
                    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
                    
                    # Update the running accuracy.
                    accuracy.update_state(y, tf.argmax(y_pred, axis=1))

                print('Epoch:', epoch, ', Loss from last epoch: %.3f' % loss_value, ', Total running accuracy so far: %.3f' % accuracy.result(), end='\r')
            print('\n')

至此,各个必需的类均已定义完毕,接下来就看看检验效果了:

# 定义
model = MyModel([
    MyConv2D(32, 7),
    MyMaxPooling2D(2),
    MyConv2D(64, 3),
    MyMaxPooling2D(2),
    MyConv2D(64, 3),
    MyFlatten(),
    MyDense(64, activation='relu'),
    MyDropout(0.5),
    MyDense(10, activation='softmax')
])

# 训练
model.train(x_train, y_train, 5)

# 预测
print('This predicted value of first picture in test set is %x' % tf.argmax(model(x_test[1:2])[0]))
print('This real value of first picture in test set is %x' % y_test[1])

除了训练有点慢,准确率还是不错的:超过了 98% 。

实现总结

总的说来,,注意几点: