线程和队列

在使用TensorFlow进行异步计算时,队列是一种强大的机制。

艾伯特(http://www.aibbt.com/)国内第一家人工智能门户

正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。

为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

Enqueue、 EnqueueManyDequeue都是特殊的节点。他们需要获取队列指针,而非普通的值,如此才能修改队列内容。我们建议您将它们看作队列的方法。事实上,在Python API中,它们就是队列对象的方法(例如q.enqueue(...))。

现在你已经对队列有了一定的了解,让我们深入到细节...

队列使用概述

队列,如FIFOQueueRandomShuffleQueue,在TensorFlow的张量异步计算时都非常重要。

例如,一个典型的输入结构:是使用一个RandomShuffleQueue来作为模型训练的输入:

  • 多个线程准备训练样本,并且把这些样本推入队列。
  • 一个训练线程执行一个训练操作,此操作会从队列中移除最小批次的样本(mini-batches)。

这种结构具有许多优点,正如在Reading data how to中强调的,同时,Reading data how to也概括地描述了如何简化输入管道的构造过程。

TensorFlow的Session对象是可以支持多线程的,因此多个线程可以很方便地使用同一个会话(Session)并且并行地执行操作。然而,在Python程序实现这样的并行运算却并不容易。所有线程都必须能被同步终止,异常必须能被正确捕获并报告,回话终止的时候, 队列必须能被正确地关闭。

所幸TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。从设计上这两个类必须被一起使用。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常。QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:

  • should_stop():如果线程应该停止则返回True。
  • request_stop(<exception>): 请求该线程停止。
  • join(<list of threads>):等待被指定的线程终止。

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

# 线程体:循环执行,直到`Coordinator`收到了停止请求。
# 如果某些条件为真,请求`Coordinator`去停止其他线程。
def MyLoop(coord):while not coord.should_stop():...do something...if ...some condition...:coord.request_stop()# Main code: create a coordinator.
coord = Coordinator()# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord)) for i in xrange(10)]# Start the threads and wait for all of them to stop.
for t in threads: t.start()
coord.join(threads)

显然,Coordinator可以管理线程去做不同的事情。上面的代码只是一个简单的例子,在设计实现的时候不必完全照搬。Coordinator还支持捕捉和报告异常, 具体可以参考Coordinator class的文档。

QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

在Python的训练程序中,创建一个QueueRunner来运行几个线程, 这几个线程处理样本,并且将样本推入队列。创建一个Coordinator,让queue runner使用Coordinator来启动这些线程,创建一个训练的循环, 并且使用Coordinator来控制QueueRunner的线程们的终止。http://www.aibbt.com/a/16370.html

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(threads)

异常处理

通过queue runners启动的线程不仅仅只处理推送样本到队列。他们还捕捉和处理由队列产生的异常,包括OutOfRangeError异常,这个异常是用于报告队列被关闭。 使用Coordinator的训练程序在主循环中必须同时捕捉和报告异常。 下面是对上面训练循环的改进版本。

try:for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
except Exception, e:# Report exceptions to the coordinator.coord.request_stop(e)# Terminate as usual.  It is innocuous to request stop twice.
coord.request_stop()
coord.join(threads)

艾伯特( http://www.aibbt.com/ )国内第一家人工智能门户

正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。

为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

Enqueue、 EnqueueManyDequeue都是特殊的节点。他们需要获取队列指针,而非普通的值,如此才能修改队列内容。我们建议您将它们看作队列的方法。事实上,在Python API中,它们就是队列对象的方法(例如q.enqueue(...))。

现在你已经对队列有了一定的了解,让我们深入到细节...

队列使用概述

队列,如FIFOQueueRandomShuffleQueue,在TensorFlow的张量异步计算时都非常重要。

例如,一个典型的输入结构:是使用一个RandomShuffleQueue来作为模型训练的输入:

  • 多个线程准备训练样本,并且把这些样本推入队列。
  • 一个训练线程执行一个训练操作,此操作会从队列中移除最小批次的样本(mini-batches)。

这种结构具有许多优点,正如在Reading data how to中强调的,同时,Reading data how to也概括地描述了如何简化输入管道的构造过程。

TensorFlow的Session对象是可以支持多线程的,因此多个线程可以很方便地使用同一个会话(Session)并且并行地执行操作。然而,在Python程序实现这样的并行运算却并不容易。所有线程都必须能被同步终止,异常必须能被正确捕获并报告,回话终止的时候, 队列必须能被正确地关闭。

所幸TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。从设计上这两个类必须被一起使用。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常。QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:

  • should_stop():如果线程应该停止则返回True。
  • request_stop(<exception>): 请求该线程停止。
  • join(<list of threads>):等待被指定的线程终止。

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

# 线程体:循环执行,直到`Coordinator`收到了停止请求。
# 如果某些条件为真,请求`Coordinator`去停止其他线程。
def MyLoop(coord):while not coord.should_stop():...do something...if ...some condition...:coord.request_stop()# Main code: create a coordinator.
coord = Coordinator()# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord)) for i in xrange(10)]# Start the threads and wait for all of them to stop.
for t in threads: t.start()
coord.join(threads)

显然,Coordinator可以管理线程去做不同的事情。上面的代码只是一个简单的例子,在设计实现的时候不必完全照搬。Coordinator还支持捕捉和报告异常, 具体可以参考Coordinator class的文档。

QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

在Python的训练程序中,创建一个QueueRunner来运行几个线程, 这几个线程处理样本,并且将样本推入队列。创建一个Coordinator,让queue runner使用Coordinator来启动这些线程,创建一个训练的循环, 并且使用Coordinator来控制QueueRunner的线程们的终止。

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(threads)

异常处理

通过queue runners启动的线程不仅仅只处理推送样本到队列。他们还捕捉和处理由队列产生的异常,包括OutOfRangeError异常,这个异常是用于报告队列被关闭。 使用Coordinator的训练程序在主循环中必须同时捕捉和报告异常。 下面是对上面训练循环的改进版本。

try:for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
except Exception, e:# Report exceptions to the coordinator.coord.request_stop(e)# Terminate as usual.  It is innocuous to request stop twice.
coord.request_stop()
coord.join(threads)

TensorFlow官方文档线程和队列相关推荐

  1. TensorFlow 官方文档中文版发布啦(持续维护)

    TensorFlow 是 Google 研发的第二代人工智能学习系统,是 Google 为了帮助全球开发者们更加方便和高效地开发机器学习 (Machine Learning)和人工智能 (AI) 应用 ...

  2. tensorflow官方文档_开源分享:最好的TensorFlow入门教程

    如果一门技术的学习曲线过于陡峭,那么我们在入门时的场景往往是,一鼓作气,没入门,再而衰,三而竭.演绎一出从入门到放弃的败走麦城. 今天发现一个入门TensorFlow的宝藏,迫不及待的分享给大家.这个 ...

  3. TensorFlow官方文档中的sub 和mul中的函数已经在API中改名了

    照着tensorflow 官方文档学习tensorflow时,出现问题: 第一,执行程序 #进入一个交互式Tensorflow会话 import tensorflow as tf sess = tf. ...

  4. TensorFlow 官方文档中文版

    TensorFlow 官方文档中文版: http://wiki.jikexueyuan.com/project/tensorflow-zh/ w3cschool https://www.w3cscho ...

  5. TensorFlow 官方文档中文版发布啦(持续维护) 1

    TensorFlow 是 Google 研发的第二代人工智能学习系统,是 Google 为了帮助全球开发者们更加方便和高效地开发机器学习 (Machine Learning)和人工智能 (AI) 应用 ...

  6. tensorflow 官方文档中文版 tensorflow教程 tensorflow教学

    github链接:TensorFlow 最新官方文档中文版 文档链接:https://tensorflow.juejin.im/get_started/ 理论上来说,只要仔细阅读官方文档,便能对ten ...

  7. 【TensorFlow官方文档】MNIST机器学习入门

    MNIST是一个入门级的计算机视觉数据集,它包含各种手写数字图片:它也包含每一张图片对应的标签,告诉我们这个是数字几.比如,下面这四张图片的标签分别是5,0,4,1. 从一个很简单的数学模型开始:训练 ...

  8. Tensorflow官方文档---起步 MNIST示例

    Tensorflow •使用图 (graph) 来表示计算任务. • 在被称之为 会话 (Session) 的上下文 (context) 中执行图. • 使用 tensor 表示数据. • 通过 变量 ...

  9. Tensorflow官方文档中文版——第一章

    第一示例: import tensorflow as tf import numpy as npx_data=np.float32(np.random.rand(2,100))#随机输入 y_data ...

最新文章

  1. 自定义WPF窗体形状
  2. Dubbo基础专题——第二章(Dubbo工程简单实践)
  3. Tomcat启动特慢之SecureRandom问题解决
  4. 各个系统下以及VS2017、Qt十分常用的快捷键
  5. oracle性能问题排查,性能测试Oracle消耗排查记录
  6. php上传多张图片为什么只显示一张,javascript,_js多张图片上传 也拿到多张图片的路径 在页面上展示只显示一张?只执行了一次???,javascript - phpStudy...
  7. Ajax中最有名axios插件(只应用于Ajax)(post方法,官网写错了,应是字符串格式)...
  8. Bootstrap组件_按钮组
  9. 【链接】Solr的Filed中indexed与stored属性
  10. 【开源】iTest教学辅助系统源代码
  11. mysql 表 组织 管理_MySQL 基础知识梳理学习(二)----记录在页面层级的组织管理...
  12. Redis下载安装(Windows,Lunix)
  13. 高级程序员的自我修养:如何才能成长为牛逼的高级程序员?
  14. 名帖232 张雨 行书《行书帖选》
  15. tmooccn达内登录_达内上线技术学习平台TMOOC.CN,由线下反攻线上,O2O是在线教育的出路?...
  16. oracle进行列合并,oracle列合并的实现方法
  17. Python开发常见bug
  18. 社区发现不得不了解的库,包含Louvain 算法、Girvan-Newman 算法等多种社区发现算法,还具有可视化功能
  19. kingcms php 漏洞,kingcms5.0/5.1漏洞
  20. 笔记本电脑dns服务器没有响应怎么办,华硕笔记本重装系统后dns服务器未响应怎么办?...

热门文章

  1. 基于centos7 搭建storm1.2.3集群过程
  2. CSS的解决IE5/IE5.5/IE6/FF/IE7的兼容性问题(css hack)
  3. 陈天奇:首个机器学习编译课程!
  4. 前身北京计算机学院,这5所“低调到隐形”的211大学,不仅分数低,还好考
  5. 在kernel中如何sleep
  6. 下载wallheaven壁纸(加入搜索功能)
  7. HEVC中skip模式和merge模式的区别
  8. scala aggregate
  9. NMS (non-maximum suppression)非极大值抑制
  10. [书籍推荐] 编程入门学习必备