tensorflow 13:多gpu 并行训练

2022-03-06

在训练深度学习模型时,GPU 通常用于加速。当只有几百万个训练样本时,单卡GPU通常可以满足我们的需求。 ,但是当训练样本数达到几千万或几亿时,单卡训练需要很长时间。这时候通常需要多机多卡加速。深度学习Doka训练常用的方法有两种,一种是数据并行化(data),另一种是模型并行化(model)。

深度模型训练方法:

深度学习模型的训练是一个迭代过程。在一小部分训练数据上计算预测值,然后反向传播算法,然后根据损失函数计算参数的梯度并更新参数。

一、数据并行化

数据并行化:在每个GPU上运行一个模型,模型和模型结构参数相同,但训练数据不同。每个模型通过最终的loss计算得到梯度后,将梯度传递给一个(PS)进行参数平均,然后根据模型更新模型的参数。

由于数据量大、算法复杂度高,深度学习算法往往需要使用并行机制。

### 常用的并行化深度学习模型训练方法有两种:同步模式和异步模式。

同步模式

同步模式:在所有数据分片完成梯度计算并将梯度传递给PS后,统一更新各个模型的参数。优点是训练稳定,训练出来的模型准确率比较高;缺点是训练时间取决于切片中最慢的切片,所以同步模式适用于GPU之间性能差异不大的情况。

同步模式训练方法

同步模式下,所有设备同时读取参数值,反向传播算法完成后,同步更新参数值。设备不会单独更新参数,而是在所有设备完成反向传播后统一更新参数。

同步模式训练流程图如下:

在这里写图片说明

异步模式

异步模式训练方法并行训练深度学习模型时,不同的设备(GPU或CPU)可以在不同的训练数据上运行整个迭代过程,不同并行模式的区别在于参数更新方式不同。

异步模式训练流程图如下:

在这里写图片说明

同步/异步比较

* 同步模式解决了异步模式下的参数更新问题,但同步模式效率低于异步模式

* 同步模式下,每次迭代需要设备统一起止

* 如果设备的运行速度不一致,那么每一轮训练都需要等待最慢的设备完成后再开始更新参数,所以会花费大量时间等待

p>

* 虽然异步模式在理论上存在缺陷,但由于训练深度学习模型所使用的随机梯度下降本身就是梯度下降的近似解,即使梯度下降也不能保证达到全局最优

p>

* 所以在实际应用中,异步模式训练的模型不一定比同步模式同时差。

代码示例

#优化神经网络进程运行在不同的GPU上

for i in range(N_GPU):
    with tf.debice('/gpu:%d'%i)
        with tf.name_scope('GPU_%d'%i) as scope:
            cur_loss = get_loss(x,y_regularizer,scope)
            #tf.get_variable的命名空间
            tf.get_variable_scope().reuse_variables()
            #使用当前gpu计算所有变量的梯度
            grads= opt.compute_gradients(cur_loss)
            tower_grads.append(grads)
#计算变量的平均梯度
grads = average_gradients(tower_grads)
#使用平均梯度更新参数
apply_gradient_op = opt.apply_gradients(grads,global_step = global)

二、模型并行化

模型并行化:当模型非常复杂,非常大,单机内存根本无法容纳时,模型并行化是一个不错的选择。直观地说,有多个 GPU 用于训练,每个 GPU 都持有模型的一个切片。它的优势是显而易见的。大模型训练的缺点是模型分片之间的通信和数据传输比较耗时,所以不能简单的说模型并行一定比数据并行快。

还有数据并行和模型并行的混合模型:

数据并行适用于数据量相对较少的模型的快速训练。并行适用于大数据和大模型场景。这只是一个简单的介绍,如果你想了解更多的细节,你可以找到其他资料学习。下面主要以数据并行化为例。

1、单机多卡训练:举个例子,比如一台机器配备4张GPU卡,cpu作为PS(),主要是保存参数和变量,并执行梯度平均。其余 4 个 GPU 训练模型 (),并执行一些增加计算量的操作。

2、 分布式多机多卡:在多机集群上训练时使用这种方式,这里需要明确指定ps和ps的地址。该方法兼容单机多卡,只需将ps和local的地址设置为local即可。

下面简单介绍几个支持多卡训练和参数更新的API。具体可以参考这篇文章(w实现原理)。

有两种方法可以执行重复训练:In-graph 和 -。 In-graph 为数据并行化模式,- 为数据并行化模式。梯度更新有异步和同步两种模式。

官网也给出了ain.py的例子,运行在单机多卡上。这里我举一个我自己做的单机多卡训练的简单例子,供参考。我也在建造这个结构的过程中。种了很多坑,还在摸索中,只有训练部分。

程序主要分为五个部分:

以下为完整代码:

  1 #critital class define
  2 
  3 #getaverage gradient
  4 
  5 defaverage_gradients(tower_grads):
  6 
  7 average_grads = []
  8 
  9 for grad_and_vars in zip(*tower_grads):
 10 
 11 grads = []
 12 
 13 for g, _ in grad_and_vars:
 14 
 15 expanded_g = tf.expand_dims(g, 0)
 16 
 17 grads.append(expanded_g)
 18 
 19 grad = tf.concat(axis=0, values=grads)
 20 
 21 grad = tf.reduce_mean(grad, 0)
 22 
 23 v = grad_and_vars[0][1]
 24 
 25 grad_and_var = (grad, v)
 26 
 27 average_grads.append(grad_and_var)
 28 
 29 return average_grads
 30 
 31  
 32 
 33 #setupmultiple gpu tower
 34 
 35 defmulti_gpu_model(num_gpus=4, word_embeddings = None):
 36 
 37 grads = []
 38 
 39 global_step = tf.Variable(0,name="global_step", trainable=False)
 40 
 41 optimizer = tf.train.AdamOptimizer(1e-3)
 42 
 43 withtf.variable_scope(tf.get_variable_scope()) as initScope:
 44 
 45 for i in range(num_gpus):
 46 
 47 withtf.device("/gpu:%d"%i):
 48 
 49 withtf.name_scope("tower_%d"%i):
 50 
 51 siameseModel = SiameseLSTM(
 52 
 53 sequence_length=FLAGS.max_document_length,
 54 
 55 embedding_size=FLAGS.embedding_dim,
 56 
 57 hidden_units=FLAGS.hidden_units,
 58 
 59 l2_reg_lambda=FLAGS.l2_reg_lambda,
 60 
 61 batch_size=FLAGS.batch_size,
 62 
 63 word_embeddings=word_embeddings)
 64 
 65 tf.get_variable_scope().reuse_variables()
 66 
 67 tf.add_to_collection("train_model", siameseModel)
 68 
 69 grad_and_var =optimizer.compute_gradients(siameseModel.loss)
 70 
 71 grads.append(grad_and_var)
 72 
 73 tf.add_to_collection("loss",siameseModel.loss)
 74 
 75 tf.add_to_collection("accuracy",siameseModel.accuracy)
 76 
 77 tf.add_to_collection("distance",siameseModel.distance)
 78 
 79 with tf.device("cpu:0"):
 80 
 81 averaged_gradients =average_gradients(grads)
 82 
 83 train_op =optimizer.apply_gradients(averaged_gradients, global_step=global_step)
 84 
 85 return train_op,global_step
 86 
 87 #generating training data
 88 
 89 defgenerate_feed_dic(sess, batch_generator,feed_dict,train_op):
 90 
 91  
 92 
 93 SMS =tf.get_collection("train_model")
 94 
 95 for siameseModel in SMS:
 96 
 97 x1_batch, x2_batch, y_batch =batch_generator.next()
 98 
 99 if random()>0.5:
100 
101 feed_dict[siameseModel.input_x1] =x1_batch
102 
103 feed_dict[siameseModel.input_x2] =x2_batch
104 
105 feed_dict[siameseModel.input_y] =y_batch
106 
107 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob
108 
109 else:
110 
111 feed_dict[siameseModel.input_x1] =x2_batch
112 
113 feed_dict[siameseModel.input_x2] =x1_batch
114 
115 feed_dict[siameseModel.input_y] =y_batch
116 
117 feed_dict[siameseModel.dropout_keep_prob]= FLAGS.dropout_keep_prob
118 
119 return feed_dict
120 
121 #define main trainingprocess
122 
123 def run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,scope,global_step,train_op=None,is_training=False):
124 
125 if is_training:
126 
127 epoches = len(train_x1_idsList) //FLAGS.batch_size
128 
129 batch_generator =datatool.data_iterator(train_x1_idsList, train_x2_idsList,train_y,FLAGS.batch_size,FLAGS.max_document_length)
130 
131 # siameseModels =tf.get_collection("train_model")
132 
133 while epoches > 0:
134 
135 feed_dict = {}
136 
137 epoches -= 1
138 
139 feed_dict=generate_feed_dic(sess,batch_generator,feed_dict,train_op)
140 
141 i = FLAGS.num_iteration
142 
143 while i > 0:
144 
145 i = i - 1
146 
147 losses =tf.get_collection("loss")
148 
149 accuracy =tf.get_collection("accuracy")
150 
151 distance =tf.get_collection("distance")
152 
153 total_accuracy =tf.add_n(losses, name='total_accu')
154 
155 total_distance = tf.add_n(losses,name='total_distance')
156 
157 total_loss = tf.add_n(losses,name='total_loss')
158 
159 
160 avg_losses = total_loss / 4
161 
162 avg_accu = total_accuracy / 4
163 
164 avg_dist = total_distance / 4
165 
166 time_str =datetime.datetime.now().isoformat()
167 
168 _,step,avg_losses,avg_accu,avg_dist =sess.run([train_op,global_step,total_loss,avg_accu,avg_dist],feed_dict)
169 
170 #输出训练精度
171 
172 print("TRAIN {}: step {},avg_loss {:g}, avg_dist {:g}, avg_acc {:g}".format(time_str, step,avg_losses, avg_dist, avg_accu))
173 
174 #whole training process
175 
176 defmain(argv=None):
177 
178 print("nParameters:")
179 
180 for attr, value insorted(FLAGS.__flags.items()):
181 
182 print("{}={}".format(attr.upper(),value))
183 
184 print("")
185 
186 #加载词向量
187 
188 word2id, word_embeddings =datatool.load_word2vec("your dir for word2vec")
189 
190 print("load train data")
191 
192 (train_x1_idsList,train_x2_idsList,train_y),(valid_x1_idsList, valid_x2_lList,valid_y) =datatool.get_data_for_siamese(word2id, FLAGS.data_path)
193 
194  
195 
196 print("starting graph def")
197 
198 gpu_options =tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
199 
200 withtf.Graph().as_default():#,tf.device('/cpu:0')
201 
202 session_conf = tf.ConfigProto(
203 
204 allow_soft_placement=FLAGS.allow_soft_placement,
205 
206 log_device_placement=FLAGS.log_device_placement,
207 
208 gpu_options=gpu_options)
209 
210 sess = tf.Session(config=session_conf)
211 
212  
213 
214 print("started session")
215 
216 print ("build multiplemodel")
217 
218 with tf.name_scope("train")as train_scope:
219 
220 print("define multiple gpumodel and init the training operation")
221 
222 train_op,global_step =multi_gpu_model(FLAGS.num_gpus,word_embeddings)
223 
224 print ("init allvariable")
225 
226 sess.run(tf.global_variables_initializer())
227 
228 print ("run epochestage")
229 
230 run_epoch(sess,train_x1_idsList,train_x2_idsList,train_y,train_scope,global_step,train_op,True)
231 
232  
233 
234 # Checkpoint directory. Tensorflowassumes this directory already exists so we need to create it
235 
236 timestamp = str(int(time.time()))
237 
238 checkpoint_dir =os.path.abspath(os.path.join(out_dir, "checkpoints"))
239 
240 checkpoint_prefix =os.path.join(checkpoint_dir, "model")
241 
242 if not os.path.exists(checkpoint_dir):
243 
244 os.makedirs(checkpoint_dir)
245 
246 out_dir =os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp))
247 
248 print("Writing to {}n".format(out_dir))
249 
250 saver =tf.train.Saver(tf.global_variables(), max_to_keep=100)

@ >

查看代码

© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片