-
代码主要利用队列随机读取文件中的样本数据,进行自动编码器的训练。总觉得,代码哪个地方不对,那有没有大佬给指点一下,代码如下:
# coding=utf-8 ''' Created on 2016年12月3日 @author: chunsoft ''' import tensorflow as tf import numpy as np import os import pandas as pd # 参数 learning_rate = 0.01 # 学习速率 training_epochs = 50000 # 训练批次 batch_size = 10000 # 随机选择训练数据大小 display_step = 200 # 展示步骤 # 网络参数 # 我这里采用了三层编码,实际针对mnist数据,隐层两层,分别为256,128效果最好 n_hidden_1 = 30 # 第一隐层神经元数量 n_hidden_2 = 15 # 第二 n_hidden_3 = 10 # 第三 n_input = 15 # 输入 classes = 3 # tf Graph输入 # 权重初始化 weights = { 'encoder_h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])), 'encoder_h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])), 'encoder_h3': tf.Variable(tf.random_normal([n_hidden_2, n_hidden_3])), 'decoder_h1': tf.Variable(tf.random_normal([n_hidden_3, n_hidden_2])), 'decoder_h2': tf.Variable(tf.random_normal([n_hidden_2, n_hidden_1])), 'decoder_h3': tf.Variable(tf.random_normal([n_hidden_1, n_input])), 'soft_w': tf.Variable(tf.truncated_normal([n_hidden_3, classes])) } # 偏置值初始化 biases = { 'encoder_b1': tf.Variable(tf.random_normal([n_hidden_1])), 'encoder_b2': tf.Variable(tf.random_normal([n_hidden_2])), 'encoder_b3': tf.Variable(tf.random_normal([n_hidden_3])), 'decoder_b1': tf.Variable(tf.random_normal([n_hidden_2])), 'decoder_b2': tf.Variable(tf.random_normal([n_hidden_1])), 'decoder_b3': tf.Variable(tf.random_normal([n_input])), 'soft_b': tf.Variable(tf.constant(0.0, shape=[classes])), } # 开始编码 def encoder(x): # sigmoid激活函数,layer = x*weights['encoder_h1']+biases['encoder_b1'] layer_1 = tf.nn.sigmoid(tf.add(tf.matmul(x, weights['encoder_h1']), biases['encoder_b1'])) layer_2 = tf.nn.sigmoid(tf.add(tf.matmul(layer_1, weights['encoder_h2']), biases['encoder_b2'])) layer_3 = tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['encoder_h3']), biases['encoder_b3'])) return layer_3 # 开始解码 def decoder(x): # sigmoid激活函数,layer = x*weights['decoder_h1']+biases['decoder_b1'] layer_1 = tf.nn.sigmoid(tf.add(tf.matmul(x, weights['decoder_h1']), biases['decoder_b1'])) layer_2 = tf.nn.sigmoid(tf.add(tf.matmul(layer_1, weights['decoder_h2']), biases['decoder_b2'])) layer_3 = tf.nn.sigmoid(tf.add(tf.matmul(layer_2, weights['decoder_h3']), biases['decoder_b3'])) return layer_3 def sample_split(X, Y): size_sample = Y.shape[0] # np.random.shuffle(int(size_sample)) # shuffle 没有返回值 index = np.random.permutation(size_sample) return X[index[0:int(size_sample/2)], :], X[index[int(size_sample/2)+1:-1], :], \ Y[index[0:int(size_sample/2)], :], Y[index[int(size_sample/2)+1:-1], :] def csvfile( filelist, batch_size ): file_queue = tf.train.string_input_producer(filelist) reader = tf.TextLineReader() # 每次read的执行都会从文件中读取一行内容, decode_csv 操作会解析这一行内容并将其转为张量列表 key, value = reader.read(file_queue) records = list([1.0] for i in range(16)) out = tf.decode_csv(value, record_defaults=records) sample = tf.train.shuffle_batch([out], batch_size=batch_size, capacity=200, min_after_dequeue=100, num_threads=2) sample_batch = sample[:, 1:] label_batch = sample[:, 0] label_batch = tf.one_hot(tf.cast(label_batch, tf.int32), 3, 1, 0) return sample_batch, label_batch # 导入数据 # 生成一个先入先出队列和一个QueueRunner,生成文件名队列 filenames = os.listdir("/home/ubuntian18/Desktop/Data/all_r") file_name = "/home/ubuntian18/Desktop/Data/all_r" filelist = [os.path.join(file_name, line) for line in filenames] print(filelist) sample_batch, lable_batch = csvfile(filelist, batch_size) # 导入测试数据 with open(r'/home/ubuntian18/Desktop/Data/test_data/test_data.csv') as f: test_data = pd.read_csv(f) test_data = np.array(test_data) test_x = test_data[:, 1:] test_y = test_data[:, 0].astype(np.int16) test_y = np.eye(np.max(test_y))[test_y-1] # 重新排序 # 构造模型 print(sample_batch.shape) encoder_op = encoder(sample_batch) encoder_result = encoder_op decoder_op = decoder(encoder_op) # 预测 x_pred = decoder_op # 实际输入数据当作标签 x_true = sample_batch # 定义代价函数和优化器,最小化平方误差,这里可以根据实际修改误差模型 cost = tf.reduce_mean(tf.pow(x_true - x_pred, 2)) optimizer = tf.train.RMSPropOptimizer(learning_rate).minimize(cost) print(weights['soft_w'].get_shape().as_list()) print(encoder_op.shape, '&'*10) # 进行softmax分类 soft_out = tf.nn.softmax(tf.matmul(encoder_op, weights['soft_w']) + biases['soft_b']) lable_batch = tf.cast(lable_batch, tf.float32) print(lable_batch) print(soft_out) loss = tf.reduce_mean(-tf.reduce_sum(lable_batch * tf.log(soft_out), reduction_indices=[1])) opt_loss = tf.train.AdamOptimizer(1e-4).minimize(loss) # 计算精度 correct_prediction = tf.equal(tf.argmax(soft_out, 1), tf.argmax(lable_batch, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # 初始化变量 init = tf.global_variables_initializer() # 运行Graph with tf.Session() as sess: sess.run(init) coord = tf.train.Coordinator() # 创建一个协调器,管理线程 threads = tf.train.start_queue_runners(coord=coord) # 启动QueueRunner, 此时文件名队列已经进队。 # 预训练 print("*"*10) for epoch in range(training_epochs//1000): _, c = sess.run([optimizer, cost]) # 展示每次训练结果 if epoch % display_step == 0: print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(c)) print("Optimization Finished!") # 优化训练 for epoch in range(training_epochs): _, c = sess.run([opt_loss, loss]) # 展示每次训练结果 if epoch % display_step == 0: print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(c)) print("训练精度:", sess.run(accuracy)) sample_batch, lable_batch = test_x, test_y print("测试精度:", sess.run(accuracy)) print("Optimization Finished!") # Applying encode and decode over test set coord.request_stop() coord.join(threads)
版权声明:本文为Op_chaos原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。