410°

TensorFlow 卷积自编码和去噪自编码

卷积网络的自编码

编码使用卷积核池化操作

解码使用反卷积和反池化操作

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

# 导入 MINST 数据集
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)


# 最大池化
def max_pool_with_argmax(net, stride):
    _, mask = tf.nn.max_pool_with_argmax(net, ksize=[1, stride, stride, 1], strides=[1, stride, stride, 1],
                                         padding='SAME')
    mask = tf.stop_gradient(mask)
    net = tf.nn.max_pool(net, ksize=[1, stride, stride, 1], strides=[1, stride, stride, 1], padding='SAME')
    return net, mask


# 4*4----2*2--=2*2 【6,8,12,16】
# 反池化
def unpool(net, mask, stride):
    ksize = [1, stride, stride, 1]
    input_shape = net.get_shape().as_list()

    output_shape = (input_shape[0], input_shape[1] * ksize[1], input_shape[2] * ksize[2], input_shape[3])

    one_like_mask = tf.ones_like(mask)
    batch_range = tf.reshape(tf.range(output_shape[0], dtype=tf.int64), shape=[input_shape[0], 1, 1, 1])
    b = one_like_mask * batch_range
    y = mask // (output_shape[2] * output_shape[3])
    x = mask % (output_shape[2] * output_shape[3]) // output_shape[3]
    feature_range = tf.range(output_shape[3], dtype=tf.int64)
    f = one_like_mask * feature_range

    updates_size = tf.size(net)
    indices = tf.transpose(tf.reshape(tf.stack([b, y, x, f]), [4, updates_size]))
    values = tf.reshape(net, [updates_size])
    ret = tf.scatter_nd(indices, values, output_shape)
    return ret


def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')


def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                          strides=[1, 2, 2, 1], padding='SAME')


# 网络模型参数
learning_rate = 0.01
n_conv_1 = 16  # 第一层16个ch
n_conv_2 = 32  # 第二层32个ch
n_input = 784  # MNIST data 输入 (img shape: 28*28)
batchsize = 50

# 占位符
x = tf.placeholder("float", [batchsize, n_input])  # 输入

x_image = tf.reshape(x, [-1, 28, 28, 1])


# 编码
def encoder(x):
    h_conv1 = tf.nn.relu(conv2d(x, weights['encoder_conv1']) + biases['encoder_conv1'])
    h_conv2 = tf.nn.relu(conv2d(h_conv1, weights['encoder_conv2']) + biases['encoder_conv2'])
    return h_conv2, h_conv1


# 解码
def decoder(x, conv1):
    t_conv1 = tf.nn.conv2d_transpose(x - biases['decoder_conv2'], weights['decoder_conv2'], conv1.shape, [1, 1, 1, 1])
    t_x_image = tf.nn.conv2d_transpose(t_conv1 - biases['decoder_conv1'], weights['decoder_conv1'], x_image.shape,
                                       [1, 1, 1, 1])
    return t_x_image


# 学习参数
weights = {
    'encoder_conv1': tf.Variable(tf.truncated_normal([5, 5, 1, n_conv_1], stddev=0.1)),
    'encoder_conv2': tf.Variable(tf.random_normal([3, 3, n_conv_1, n_conv_2], stddev=0.1)),
    'decoder_conv1': tf.Variable(tf.random_normal([5, 5, 1, n_conv_1], stddev=0.1)),
    'decoder_conv2': tf.Variable(tf.random_normal([3, 3, n_conv_1, n_conv_2], stddev=0.1))
}
biases = {
    'encoder_conv1': tf.Variable(tf.zeros([n_conv_1])),
    'encoder_conv2': tf.Variable(tf.zeros([n_conv_2])),
    'decoder_conv1': tf.Variable(tf.zeros([n_conv_1])),
    'decoder_conv2': tf.Variable(tf.zeros([n_conv_2])),
}

# 输出的节点
encoder_out, conv1 = encoder(x_image)
h_pool2, mask = max_pool_with_argmax(encoder_out, 2)

h_upool = unpool(h_pool2, mask, 2)
pred = decoder(h_upool, conv1)

# 使用平方差为cost
cost = tf.reduce_mean(tf.pow(x_image - pred, 2))
optimizer = tf.train.RMSPropOptimizer(learning_rate).minimize(cost)

# 训练参数
training_epochs = 20  # 一共迭代20次

display_step = 5  # 迭代5次输出一次信息

# 启动绘话
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    total_batch = int(mnist.train.num_examples / batchsize)
    # 开始训练
    for epoch in range(training_epochs):  # 迭代

        for i in range(total_batch):
            batch_xs, batch_ys = mnist.train.next_batch(batchsize)  # 取数据
            _, c = sess.run([optimizer, cost], feed_dict={x: batch_xs})  # 训练模型
        if (epoch + 1) % display_step == 0:  # 现实日志信息
            print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(c))

    print("完成!")

    # 测试
    batch_xs, batch_ys = mnist.train.next_batch(batchsize)
    print("Error:", cost.eval({x: batch_xs}))

    # 可视化结果
    show_num = 10
    reconstruction = sess.run(
        # pred, feed_dict={x: mnist.test.images[:show_num]})
        pred, feed_dict={x: batch_xs})

    f, a = plt.subplots(2, 10, figsize=(10, 2))
    for i in range(show_num):
        # a[0][i].imshow(np.reshape(mnist.test.images[i], (28, 28)))
        a[0][i].imshow(np.reshape(batch_xs[i], (28, 28)))
        a[1][i].imshow(np.reshape(reconstruction[i], (28, 28)))
    plt.show()

 

 

去噪自编码Denoising Autoencoder

要想取得好的特征只靠重构输入数据是不够的,实际应用中,还需要让这些特征具有抗干扰能力,即当输入数据发生一定程度的扰动时,生成的特征仍然保持不变,这时需要添加噪声来增加更大的困难,在这种情况下训练出来的模型才会有更好的鲁棒性。

对mnist图像每个像素乘以高斯噪声,输出对比仍使用原始输入,这样迫使网络在提取特征的同时将噪声去除,为了防止过拟合需要在其中加上dropout层

dropout为1时

 noise,image,decode三层

 

dropout为0.6时

 

可以看到dropout对噪点有很好的过滤作用

import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

train_X = mnist.train.images
train_Y = mnist.train.labels
test_X = mnist.test.images
test_Y = mnist.test.labels

tf.reset_default_graph()

n_input = 784
n_hidden_1 = 256

# 占位符
x = tf.placeholder("float", [None, n_input])
y = tf.placeholder("float", [None, n_input])
dropout_keep_prob = tf.placeholder("float")

# 学习参数
weights = {
    'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
    'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_1])),
    'out': tf.Variable(tf.random_normal([n_hidden_1, n_input]))
}
biases = {
    'b1': tf.Variable(tf.zeros([n_hidden_1])),
    'b2': tf.Variable(tf.zeros([n_hidden_1])),
    'out': tf.Variable(tf.zeros([n_input]))
}


# 网络模型
def denoise_auto_encoder(_X, _weights, _biases, _keep_prob):
    layer_1 = tf.nn.sigmoid(tf.add(tf.matmul(_X, _weights['h1']), _biases['b1']))
    layer_1out = tf.nn.dropout(layer_1, _keep_prob)
    layer_2 = tf.nn.sigmoid(tf.add(tf.matmul(layer_1out, _weights['h2']), _biases['b2']))
    layer_2out = tf.nn.dropout(layer_2, _keep_prob)
    return tf.nn.sigmoid(tf.matmul(layer_2out, _weights['out']) + _biases['out'])


reconstruction = denoise_auto_encoder(x, weights, biases, dropout_keep_prob)

# COST
cost = tf.reduce_mean(tf.pow(reconstruction - y, 2))
# OPTIMIZER
optm = tf.train.AdamOptimizer(0.01).minimize(cost)

# 训练参数
epochs = 20
batch_size = 256
disp_step = 2

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    print("开始训练")
    for epoch in range(epochs):
        num_batch = int(mnist.train.num_examples / batch_size)
        total_cost = 0.
        for i in range(num_batch):
            batch_xs, batch_ys = mnist.train.next_batch(batch_size)
            batch_xs_noisy = batch_xs + 0.3 * np.random.randn(batch_size, 784)
            # feeds = {x: batch_xs_noisy, y: batch_xs, dropout_keep_prob: 1.}
            feeds = {x: batch_xs_noisy, y: batch_xs, dropout_keep_prob: .6}
            sess.run(optm, feed_dict=feeds)
            total_cost += sess.run(cost, feed_dict=feeds)

        # 显示训练日志
        if epoch % disp_step == 0:
            print("Epoch %02d/%02d average cost: %.6f"
                  % (epoch, epochs, total_cost / num_batch))

    print("完成")

    show_num = 10
    test_noisy = mnist.test.images[:show_num] + 0.3 * np.random.randn(show_num, 784)
    encode_decode = sess.run(
        reconstruction, feed_dict={x: test_noisy, dropout_keep_prob: 1.})
    f, a = plt.subplots(3, 10, figsize=(10, 3))
    for i in range(show_num):
        a[0][i].imshow(np.reshape(test_noisy[i], (28, 28)))
        a[1][i].imshow(np.reshape(mnist.test.images[i], (28, 28)))
        a[2][i].matshow(np.reshape(encode_decode[i], (28, 28)), cmap=plt.get_cmap('gray'))
    plt.show()

    # 换一种噪声测试一个
    randidx = np.random.randint(test_X.shape[0], size=1)
    orgvec = test_X[randidx, :]
    testvec = test_X[randidx, :]
    label = np.argmax(test_Y[randidx, :], 1)

    print("label is %d" % (label))
    # Noise type

    print("Salt and Pepper Noise")
    noisyvec = testvec
    rate = 0.15
    noiseidx = np.random.randint(test_X.shape[1]
                                 , size=int(test_X.shape[1] * rate))
    noisyvec[0, noiseidx] = 1 - noisyvec[0, noiseidx]

    outvec = sess.run(reconstruction, feed_dict={x: noisyvec, dropout_keep_prob: 1})
    outimg = np.reshape(outvec, (28, 28))

    # Plot 
    plt.matshow(np.reshape(orgvec, (28, 28)), cmap=plt.get_cmap('gray'))
    plt.title("Original Image")
    plt.colorbar()

    plt.matshow(np.reshape(noisyvec, (28, 28)), cmap=plt.get_cmap('gray'))
    plt.title("Input Image")
    plt.colorbar()

    plt.matshow(outimg, cmap=plt.get_cmap('gray'))
    plt.title("Reconstructed Image")
    plt.colorbar()
    plt.show()

 

 

本文由【阿】发布于开源中国,原文链接:https://my.oschina.net/ahaoboy/blog/1924391

全部评论: 0

    我有话说: