2022-03-06
在训练深度学习模型时,GPU 通常用于加速。当只有几百万个训练样本时,单卡GPU通常可以满足我们的需求。 ,但是当训练样本数达到几千万或几亿时,单卡训练需要很长时间。这时候通常需要多机多卡加速。深度学习Doka训练常用的方法有两种,一种是数据并行化(data),另一种是模型并行化(model)。
深度模型训练方法:
深度学习模型的训练是一个迭代过程。在一小部分训练数据上计算预测值,然后反向传播算法,然后根据损失函数计算参数的梯度并更新参数。
一、数据并行化
数据并行化:在每个GPU上运行一个模型,模型和模型结构参数相同,但训练数据不同。每个模型通过最终的loss计算得到梯度后,将梯度传递给一个(PS)进行参数平均,然后根据模型更新模型的参数。
由于数据量大、算法复杂度高,深度学习算法往往需要使用并行机制。
### 常用的并行化深度学习模型训练方法有两种:同步模式和异步模式。
同步模式
同步模式:在所有数据分片完成梯度计算并将梯度传递给PS后,统一更新各个模型的参数。优点是训练稳定,训练出来的模型准确率比较高;缺点是训练时间取决于切片中最慢的切片,所以同步模式适用于GPU之间性能差异不大的情况。
同步模式训练方法
同步模式下,所有设备同时读取参数值,反向传播算法完成后,同步更新参数值。设备不会单独更新参数,而是在所有设备完成反向传播后统一更新参数。
同步模式训练流程图如下:
在这里写图片说明
异步模式
异步模式训练方法并行训练深度学习模型时,不同的设备(GPU或CPU)可以在不同的训练数据上运行整个迭代过程,不同并行模式的区别在于参数更新方式不同。
异步模式训练流程图如下:
在这里写图片说明
同步/异步比较
* 同步模式解决了异步模式下的参数更新问题,但同步模式效率低于异步模式
* 同步模式下,每次迭代需要设备统一起止
* 如果设备的运行速度不一致,那么每一轮训练都需要等待最慢的设备完成后再开始更新参数,所以会花费大量时间等待
p>
* 虽然异步模式在理论上存在缺陷,但由于训练深度学习模型所使用的随机梯度下降本身就是梯度下降的近似解,即使梯度下降也不能保证达到全局最优
p>
* 所以在实际应用中,异步模式训练的模型不一定比同步模式同时差。
代码示例
#优化神经网络进程运行在不同的GPU上
for i in range(N_GPU): with tf.debice('/gpu:%d'%i) with tf.name_scope('GPU_%d'%i) as scope: cur_loss = get_loss(x,y_regularizer,scope) #tf.get_variable的命名空间 tf.get_variable_scope().reuse_variables() #使用当前gpu计算所有变量的梯度 grads= opt.compute_gradients(cur_loss) tower_grads.append(grads) #计算变量的平均梯度 grads = average_gradients(tower_grads) #使用平均梯度更新参数 apply_gradient_op = opt.apply_gradients(grads,global_step = global)
二、模型并行化
模型并行化:当模型非常复杂,非常大,单机内存根本无法容纳时,模型并行化是一个不错的选择。直观地说,有多个 GPU 用于训练,每个 GPU 都持有模型的一个切片。它的优势是显而易见的。大模型训练的缺点是模型分片之间的通信和数据传输比较耗时,所以不能简单的说模型并行一定比数据并行快。
还有数据并行和模型并行的混合模型:
数据并行适用于数据量相对较少的模型的快速训练。并行适用于大数据和大模型场景。这只是一个简单的介绍,如果你想了解更多的细节,你可以找到其他资料学习。下面主要以数据并行化为例。
1、单机多卡训练:举个例子,比如一台机器配备4张GPU卡,cpu作为PS(),主要是保存参数和变量,并执行梯度平均。其余 4 个 GPU 训练模型 (),并执行一些增加计算量的操作。
2、 分布式多机多卡:在多机集群上训练时使用这种方式,这里需要明确指定ps和ps的地址。该方法兼容单机多卡,只需将ps和local的地址设置为local即可。
下面简单介绍几个支持多卡训练和参数更新的API。具体可以参考这篇文章(w实现原理)。
有两种方法可以执行重复训练:In-graph 和 -。 In-graph 为数据并行化模式,- 为数据并行化模式。梯度更新有异步和同步两种模式。
官网也给出了ain.py的例子,运行在单机多卡上。这里我举一个我自己做的单机多卡训练的简单例子,供参考。我也在建造这个结构的过程中。种了很多坑,还在摸索中,只有训练部分。
程序主要分为五个部分:
以下为完整代码:
1 #critital class define 2 3 #getaverage gradient 4 5 defaverage_gradients(tower_grads): 6 7 average_grads = [] 8 9 for grad_and_vars in zip(*tower_grads): 10 11 grads = [] 12 13 for g, _ in grad_and_vars: 14 15 expanded_g = tf.expand_dims(g, 0) 16 17 grads.append(expanded_g) 18 19 grad = tf.concat(axis=0, values=grads) 20 21 grad = tf.reduce_mean(grad, 0) 22 23 v = grad_and_vars[0][1] 24 25 grad_and_var = (grad, v) 26 27 average_grads.append(grad_and_var) 28 29 return average_grads 30 31 32 33 #setupmultiple gpu tower 34 35 defmulti_gpu_model(num_gpus=4, word_embeddings = None): 36 37 grads = [] 38 39 global_step = tf.Variable(0,name="global_step", trainable=False) 40 41 optimizer = tf.train.AdamOptimizer(1e-3) 42 43 withtf.variable_scope(tf.get_variable_scope()) as initScope: 44 45 for i in range(num_gpus): 46 47 withtf.device("/gpu:%d"%i): 48 49 withtf.name_scope("tower_%d"%i): 50 51 siameseModel = SiameseLSTM( 52 53 sequence_length=FLAGS.max_document_length, 54 55 embedding_size=FLAGS.embedding_dim, 56 57 hidden_units=FLAGS.hidden_units, 58 59 l2_reg_lambda=FLAGS.l2_reg_lambda, 60 61 batch_size=FLAGS.batch_size, 62 63 word_embeddings=word_embeddings) 64 65 tf.get_variable_scope().reuse_variables() 66 67 tf.add_to_collection("train_model", siameseModel) 68 69 grad_and_var =optimizer.compute_gradients(siameseModel.loss) 70 71 grads.append(grad_and_var) 72 73 tf.add_to_collection("loss",siameseModel.loss) 74 75 tf.add_to_collection("accuracy",siameseModel.accuracy) 76 77 tf.add_to_collection("distance",siameseModel.distance) 78 79 with tf.device("cpu:0"): 80 81 averaged_gradients =average_gradients(grads) 82 83 train_op =optimizer.apply_gradients(averaged_gradients, global_step=global_step) 84 85 return train_op,global_step 86 87 #generating training data 88 89 defgenerate_feed_dic(sess, batch_generator,feed_dict,train_op): 90 91 92 93 SMS =tf.get_collection("train_model") 94 95 for siameseModel in SMS: 96 97 x1_batch, x2_batch, y_batch =batch_generator.next() 98 99 if random()>0.5: 100 101 feed_dict[siameseModel.input_x1] =x1_batch 102 103 feed_dict[siameseModel.input_x2] =x2_batch 104 105 feed_dict[siameseModel.input_y] =y_batch 106 107 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob 108 109 else: 110 111 feed_dict[siameseModel.input_x1] =x2_batch 112 113 feed_dict[siameseModel.input_x2] =x1_batch 114 115 feed_dict[siameseModel.input_y] =y_batch 116 117 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob 118 119 return feed_dict 120 121 #define main trainingprocess 122 123 def run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,scope,global_step,train_op=None,is_training=False): 124 125 if is_training: 126 127 epoches = len(train_x1_idsList) //FLAGS.batch_size 128 129 batch_generator =datatool.data_iterator(train_x1_idsList, train_x2_idsList,train_y,FLAGS.batch_size,FLAGS.max_document_length) 130 131 # siameseModels =tf.get_collection("train_model") 132 133 while epoches > 0: 134 135 feed_dict = {} 136 137 epoches -= 1 138 139 feed_dict=generate_feed_dic(sess,batch_generator,feed_dict,train_op) 140 141 i = FLAGS.num_iteration 142 143 while i > 0: 144 145 i = i - 1 146 147 losses =tf.get_collection("loss") 148 149 accuracy =tf.get_collection("accuracy") 150 151 distance =tf.get_collection("distance") 152 153 total_accuracy =tf.add_n(losses, name='total_accu') 154 155 total_distance = tf.add_n(losses,name='total_distance') 156 157 total_loss = tf.add_n(losses,name='total_loss') 158 159 160 avg_losses = total_loss / 4 161 162 avg_accu = total_accuracy / 4 163 164 avg_dist = total_distance / 4 165 166 time_str =datetime.datetime.now().isoformat() 167 168 _,step,avg_losses,avg_accu,avg_dist =sess.run([train_op,global_step,total_loss,avg_accu,avg_dist],feed_dict) 169 170 #输出训练精度 171 172 print("TRAIN {}: step {},avg_loss {:g}, avg_dist {:g}, avg_acc {:g}".format(time_str, step,avg_losses, avg_dist, avg_accu)) 173 174 #whole training process 175 176 defmain(argv=None): 177 178 print("nParameters:") 179 180 for attr, value insorted(FLAGS.__flags.items()): 181 182 print("{}={}".format(attr.upper(),value)) 183 184 print("") 185 186 #加载词向量 187 188 word2id, word_embeddings =datatool.load_word2vec("your dir for word2vec") 189 190 print("load train data") 191 192 (train_x1_idsList,train_x2_idsList,train_y),(valid_x1_idsList, valid_x2_lList,valid_y) =datatool.get_data_for_siamese(word2id, FLAGS.data_path) 193 194 195 196 print("starting graph def") 197 198 gpu_options =tf.GPUOptions(per_process_gpu_memory_fraction=0.8) 199 200 withtf.Graph().as_default():#,tf.device('/cpu:0') 201 202 session_conf = tf.ConfigProto( 203 204 allow_soft_placement=FLAGS.allow_soft_placement, 205 206 log_device_placement=FLAGS.log_device_placement, 207 208 gpu_options=gpu_options) 209 210 sess = tf.Session(config=session_conf) 211 212 213 214 print("started session") 215 216 print ("build multiplemodel") 217 218 with tf.name_scope("train")as train_scope: 219 220 print("define multiple gpumodel and init the training operation") 221 222 train_op,global_step =multi_gpu_model(FLAGS.num_gpus,word_embeddings) 223 224 print ("init allvariable") 225 226 sess.run(tf.global_variables_initializer()) 227 228 print ("run epochestage") 229 230 run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,train_scope,global_step,train_op,True) 231 232 233 234 # Checkpoint directory. Tensorflowassumes this directory already exists so we need to create it 235 236 timestamp = str(int(time.time())) 237 238 checkpoint_dir =os.path.abspath(os.path.join(out_dir, "checkpoints")) 239 240 checkpoint_prefix =os.path.join(checkpoint_dir, "model") 241 242 if not os.path.exists(checkpoint_dir): 243 244 os.makedirs(checkpoint_dir) 245 246 out_dir =os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp)) 247 248 print("Writing to {}n".format(out_dir)) 249 250 saver =tf.train.Saver(tf.global_variables(), max_to_keep=100)
@ >
查看代码
暂无评论内容