博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
『TensorFlow』分布式训练_其二_单机多GPU并行&GPU模式设定
阅读量:5157 次
发布时间:2019-06-13

本文共 14432 字,大约阅读时间需要 48 分钟。

建议比对

一、tensorflow GPU设置

GPU指定占用

gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.7)sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))

上面分配给tensorflow的GPU显存大小为:GPU实际显存*0.7。

GPU模式禁用

import os os.environ["CUDA_VISIBLE_DEVICES"]="-1"

GPU资源申请规则

# 设置 GPU 按需增长config = tf.ConfigProto()config.gpu_options.allow_growth = Truesess = tf.Session(config=config)

二、单机多GPU工作原理

以一篇csdn博客(出处见水印)上的图说明多GPU工作原理:

 

想让 TensorFlow 在多个 GPU 上运行, 需要建立 multi-tower 结构, 在这个结构里每个 tower 分别被指配给不同的 GPU 运行,汇总工作一般交由CPU完成,示意如下,

# 新建一个 graph.c = []for d in ['/gpu:2', '/gpu:3']:  with tf.device(d):    a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3])    b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2])    c.append(tf.matmul(a, b))with tf.device('/cpu:0'):  sum = tf.add_n(c)# 新建session with log_device_placement并设置为True.sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))# 运行这个op.print sess.run(sum)

 

三、官方demo

多GPU分布+LR衰减+滑动平均

和MXNet不同,由于TensorFlow使用上下文指定设备,所以数据无需显示的拷贝到指定设备,在目标设备上下文中获取即可(需要调用对应节点于该设备下,如下文中的出队节点)

另一个值得注意的点在于收集来的梯度格式为List of lists of (gradient, variable) tuples,我们计算后返回的是List of (gradient, variable) tuples,variable随便指定一组gpu上的即可,这是因为和MXNet不同,MXNet是得到grad平均值后分发给各个GPU各自更新,TensorFlow实际是各个GPU使用同一套参数(tf.get_variable_scope().reuse_variables()),虽然会被拷贝到各个设备,但是彼此之间是有逻辑关系的,是共享参数,简化示意如下:

#将神经网络的优化过程跑在不同的GPU上for i in range(N_GPU):    with tf.debice('/gpu:%d'%i)        with tf.name_scope('GPU_%d'%i) as scope:            cur_loss = get_loss(x,y_regularizer,scope)            #tf.get_variable的命名空间            tf.get_variable_scope().reuse_variables()            #使用当前gpu计算所有变量的梯度            grads= opt.compute_gradients(cur_loss)            tower_grads.append(grads)#计算变量的平均梯度grads = average_gradients(tower_grads)#使用平均梯度更新参数apply_gradient_op = opt.apply_gradients(grads,global_step = global)

# Copyright 2015 The TensorFlow Authors. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# ============================================================================== """A binary to train CIFAR-10 using multiple GPUs with synchronous updates.Accuracy:cifar10_multi_gpu_train.py achieves ~86% accuracy after 100K steps (256epochs of data) as judged by cifar10_eval.py.Speed: With batch_size 128.System        | Step Time (sec/batch)  |     Accuracy--------------------------------------------------------------------1 Tesla K20m  | 0.35-0.60              | ~86% at 60K steps  (5 hours)1 Tesla K40m  | 0.25-0.35              | ~86% at 100K steps (4 hours)2 Tesla K20m  | 0.13-0.20              | ~84% at 30K steps  (2.5 hours)3 Tesla K20m  | 0.13-0.18              | ~84% at 30K steps4 Tesla K20m  | ~0.10                  | ~84% at 30K stepsUsage:Please see the tutorial and website for how to download the CIFAR-10data set, compile the program and train the model.http://tensorflow.org/tutorials/deep_cnn/"""from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_function from datetime import datetimeimport os.pathimport reimport time import numpy as npfrom six.moves import xrange  # pylint: disable=redefined-builtinimport tensorflow as tfimport cifar10 FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_string('train_dir', '/tmp/cifar10_train',                           """Directory where to write event logs """                           """and checkpoint.""")tf.app.flags.DEFINE_integer('max_steps', 1000000,                            """Number of batches to run.""")tf.app.flags.DEFINE_integer('num_gpus', 1,                            """How many GPUs to use.""")tf.app.flags.DEFINE_boolean('log_device_placement', False,                            """Whether to log device placement.""")  def tower_loss(scope, images, labels):  """Calculate the total loss on a single tower running the CIFAR model.  Args:    scope: unique prefix string identifying the CIFAR tower, e.g. 'tower_0'    images: Images. 4D tensor of shape [batch_size, height, width, 3].    labels: Labels. 1D tensor of shape [batch_size].  Returns:     Tensor of shape [] containing the total loss for a batch of data  """   # Build inference Graph.  logits = cifar10.inference(images)   # Build the portion of the Graph calculating the losses. Note that we will  # assemble the total_loss using a custom function below.  _ = cifar10.loss(logits, labels)   # Assemble all of the losses for the current tower only.  losses = tf.get_collection('losses', scope)   # Calculate the total loss for the current tower.  total_loss = tf.add_n(losses, name='total_loss')   # Attach a scalar summary to all individual losses and the total loss; do the  # same for the averaged version of the losses.  for l in losses + [total_loss]:    # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training    # session. This helps the clarity of presentation on tensorboard.    loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)    tf.summary.scalar(loss_name, l)   return total_loss  def average_gradients(tower_grads):  """Calculate the average gradient for each shared variable across all towers.  Note that this function provides a synchronization point across all towers.  Args:    tower_grads: List of lists of (gradient, variable) tuples. The outer list      is over individual gradients. The inner list is over the gradient      calculation for each tower.  Returns:     List of pairs of (gradient, variable) where the gradient has been averaged     across all towers.  """  average_grads = []  for grad_and_vars in zip(*tower_grads):    # Note that each grad_and_vars looks like the following:    #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))    grads = []    for g, _ in grad_and_vars:      # Add 0 dimension to the gradients to represent the tower.      expanded_g = tf.expand_dims(g, 0)       # Append on a 'tower' dimension which we will average over below.      grads.append(expanded_g)     # Average over the 'tower' dimension.    grad = tf.concat(axis=0, values=grads)    grad = tf.reduce_mean(grad, 0)     # Keep in mind that the Variables are redundant because they are shared    # across towers. So .. we will just return the first tower's pointer to    # the Variable.    v = grad_and_vars[0][1]    grad_and_var = (grad, v)    average_grads.append(grad_and_var)  return average_grads  def train():  """Train CIFAR-10 for a number of steps."""  with tf.Graph().as_default(), tf.device('/cpu:0'):    # Create a variable to count the number of train() calls. This equals the    # number of batches processed * FLAGS.num_gpus.    global_step = tf.get_variable(        'global_step', [],        initializer=tf.constant_initializer(0), trainable=False)     # Calculate the learning rate schedule.    num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /                             FLAGS.batch_size)    decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)     # Decay the learning rate exponentially based on the number of steps.    lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,                                    global_step,                                    decay_steps,                                    cifar10.LEARNING_RATE_DECAY_FACTOR,                                    staircase=True)     # Create an optimizer that performs gradient descent.    opt = tf.train.GradientDescentOptimizer(lr)     # Get images and labels for CIFAR-10.    images, labels = cifar10.distorted_inputs()    batch_queue = tf.contrib.slim.prefetch_queue.prefetch_queue(          [images, labels], capacity=2 * FLAGS.num_gpus)    # Calculate the gradients for each model tower.    tower_grads = []    with tf.variable_scope(tf.get_variable_scope()):      for i in xrange(FLAGS.num_gpus):        with tf.device('/gpu:%d' % i):          with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:            # Dequeues one batch for the GPU            image_batch, label_batch = batch_queue.dequeue()            # Calculate the loss for one tower of the CIFAR model. This function            # constructs the entire CIFAR model but shares the variables across            # all towers.            loss = tower_loss(scope, image_batch, label_batch)             # Reuse variables for the next tower.            tf.get_variable_scope().reuse_variables()             # Retain the summaries from the final tower.            summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)             # Calculate the gradients for the batch of data on this CIFAR tower.            grads = opt.compute_gradients(loss)             # Keep track of the gradients across all towers.            tower_grads.append(grads)     # We must calculate the mean of each gradient. Note that this is the    # synchronization point across all towers.    grads = average_gradients(tower_grads)     # Add a summary to track the learning rate.    summaries.append(tf.summary.scalar('learning_rate', lr))     # Add histograms for gradients.    for grad, var in grads:      if grad is not None:        summaries.append(tf.summary.histogram(var.op.name + '/gradients', grad))     # Apply the gradients to adjust the shared variables.    apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)     # Add histograms for trainable variables.    for var in tf.trainable_variables():      summaries.append(tf.summary.histogram(var.op.name, var))     # Track the moving averages of all trainable variables.    variable_averages = tf.train.ExponentialMovingAverage(        cifar10.MOVING_AVERAGE_DECAY, global_step)    variables_averages_op = variable_averages.apply(tf.trainable_variables())     # Group all updates to into a single train op.    train_op = tf.group(apply_gradient_op, variables_averages_op)     # Create a saver.    saver = tf.train.Saver(tf.global_variables())     # Build the summary operation from the last tower summaries.    summary_op = tf.summary.merge(summaries)################################################################################    # Build an initialization operation to run below.    init = tf.global_variables_initializer()     # Start running operations on the Graph. allow_soft_placement must be set to    # True to build towers on GPU, as some of the ops do not have GPU    # implementations.    sess = tf.Session(config=tf.ConfigProto(        allow_soft_placement=True,        log_device_placement=FLAGS.log_device_placement))    sess.run(init)     # Start the queue runners.    tf.train.start_queue_runners(sess=sess)     summary_writer = tf.summary.FileWriter(FLAGS.train_dir, sess.graph)     for step in xrange(FLAGS.max_steps):      start_time = time.time()      _, loss_value = sess.run([train_op, loss])      duration = time.time() - start_time       assert not np.isnan(loss_value), 'Model diverged with loss = NaN'       if step % 10 == 0:        num_examples_per_step = FLAGS.batch_size * FLAGS.num_gpus        examples_per_sec = num_examples_per_step / duration        sec_per_batch = duration / FLAGS.num_gpus         format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '                      'sec/batch)')        print (format_str % (datetime.now(), step, loss_value,                             examples_per_sec, sec_per_batch))       if step % 100 == 0:        summary_str = sess.run(summary_op)        summary_writer.add_summary(summary_str, step)       # Save the model checkpoint periodically.      if step % 1000 == 0 or (step + 1) == FLAGS.max_steps:        checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')        saver.save(sess, checkpoint_path, global_step=step)  def main(argv=None):  # pylint: disable=unused-argument  cifar10.maybe_download_and_extract()  if tf.gfile.Exists(FLAGS.train_dir):    tf.gfile.DeleteRecursively(FLAGS.train_dir)  tf.gfile.MakeDirs(FLAGS.train_dir)  train()  if __name__ == '__main__':  tf.app.run()

数据输入函数如下,

def distorted_inputs():  """Construct distorted input for CIFAR training using the Reader ops.  Returns:    images: Images. 4D tensor of [batch_size, IMAGE_SIZE, IMAGE_SIZE, 3] size.    labels: Labels. 1D tensor of [batch_size] size.  Raises:    ValueError: If no data_dir  """  if not FLAGS.data_dir:    raise ValueError('Please supply a data_dir')  data_dir = os.path.join(FLAGS.data_dir, 'cifar-10-batches-bin')  images, labels = cifar10_input.distorted_inputs(data_dir=data_dir,                                                  batch_size=FLAGS.batch_size)  if FLAGS.use_fp16:    images = tf.cast(images, tf.float16)    labels = tf.cast(labels, tf.float16)return images, labels

tf.contrib.slim.prefetch_queue.prefetch_queue从介绍来看就是个输入数据队列

Signature: tf.contrib.slim.prefetch_queue.prefetch_queue(tensors, capacity=8, num_threads=1, dynamic_pad=False, shared_name=None, name=None) Docstring: Creates a queue to prefetech tensors from `tensors`. A queue runner for enqueing tensors into the prefetch_queue is automatically added to the TF QueueRunners collection. Example: This is for example useful to pre-assemble input batches read with `tf.train.batch()` and enqueue the pre-assembled batches. Ops that dequeue from the pre-assembled queue will not pay the cost of assembling the batch. images, labels = tf.train.batch([image, label], batch_size=32, num_threads=4) batch_queue = prefetch_queue([images, labels]) images, labels = batch_queue.dequeue() logits = Net(images) loss = Loss(logits, labels) Args: tensors: A list or dictionary of `Tensors` to enqueue in the buffer. capacity: An integer. The maximum number of elements in the queue. num_threads: An integer. Number of threads running the enqueue op. dynamic_pad: Boolean. Whether to allow variable dimensions in input shapes. shared_name: (optional). If set, this queue will be shared under the given name across multiple sessions. name: (Optional) A name for the operations. Returns: A queue from which you can dequeue tensors with the same type and shape as `tensors`.

 

转载于:https://www.cnblogs.com/hellcat/p/9194110.html

你可能感兴趣的文章
python基础-----类和实例
查看>>
我的第一个Linux C 程序
查看>>
C++虚函数表
查看>>
Linux c基础学习
查看>>
Spring核心框架IOC---- Spring项目程序架构---- 面向接口编程
查看>>
事件分发
查看>>
fread 不能读取最后一个数据块
查看>>
Global.asax详解
查看>>
蛇形填数
查看>>
re模块(正则表达式)
查看>>
接口和抽象类有什么区别
查看>>
spring学习笔记(3)——bean配置细节注意
查看>>
编程从入门到放弃(Java)
查看>>
网络系统结构 - 开发系统互连模型 OSI - 七层参考模型
查看>>
LINQ语句中的 ToList() .AsEnumerable() 和 .AsQueryable()的区别
查看>>
面试Java前必须了解的10个概念
查看>>
面试问题搜集及解析
查看>>
行走在懂你的路上(2016年半年总结)
查看>>
正则 re模块
查看>>
@Autowired 与@Resource的区别(详细)
查看>>