最近打算学习 tornado 的源码,所以就建立一个系列主题 “深入理解 tornado”。 在此记录学习经历及个人见解与大家分享。文中一定会出现理解不到位或理解错误的地方,还请大家多多指教

进入正题:

tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 wsgi 写的简单服务器,并没有自己实现底层结构。 关于 wsgi 详见之前的文章: 自己写一个 wsgi 服务器运行 Django 、Tornado 应用)。 那么 tornado.ioloop 就是 tornado web server 最底层的实现。

看 ioloop 之前,我们需要了解一些预备知识,有助于我们理解 ioloop。

epoll

ioloop 的实现基于 epoll ,那么什么是 epoll? epoll 是Linux内核为处理大批量文件描述符而作了改进的 poll 。
那么什么又是 poll ? 首先,我们回顾一下, socket 通信时的服务端,当它接受( accept )一个连接并建立通信后( connection )就进行通信,而此时我们并不知道连接的客户端有没有信息发完。 这时候我们有两种选择:

  1. 一直在这里等着直到收发数据结束;
  2. 每隔一定时间来看看这里有没有数据;

第二种办法要比第一种好一些,多个连接可以统一在一定时间内轮流看一遍里面有没有数据要读写,看上去我们可以处理多个连接了,这个方式就是 poll / select 的解决方案。 看起来似乎解决了问题,但实际上,随着连接越来越多,轮询所花费的时间将越来越长,而服务器连接的 socket 大多不是活跃的,所以轮询所花费的大部分时间将是无用的。为了解决这个问题, epoll 被创造出来,它的概念和 poll 类似,不过每次轮询时,他只会把有数据活跃的 socket 挑出来轮询,这样在有大量连接时轮询就节省了大量时间。

对于 epoll 的操作,其实也很简单,只要 4 个 API 就可以完全操作它。

epoll_create

用来创建一个 epoll 描述符( 就是创建了一个 epoll )

epoll_ctl

操作 epoll 中的 event;可用参数有:

参数 含义
EPOLL_CTL_ADD 添加一个新的epoll事件
EPOLL_CTL_DEL 删除一个epoll事件
EPOLL_CTL_MOD 改变一个事件的监听方式

而事件的监听方式有七种,而我们只需要关心其中的三种:

宏定义 含义
EPOLLIN 缓冲区满,有数据可读
EPOLLOUT 缓冲区空,可写数据
EPOLLERR 发生错误

epoll_wait

就是让 epoll 开始工作,里面有个参数 timeout,当设置为非 0 正整数时,会监听(阻塞) timeout 秒;设置为 0 时立即返回,设置为 -1 时一直监听。

在监听时有数据活跃的连接时其返回活跃的文件句柄列表(此处为 socket 文件句柄)。

close

关闭 epoll

现在了解了 epoll 后,我们就可以来看 ioloop 了 (如果对 epoll 还有疑问可以看这两篇资料: epoll 的原理是什么、百度百科:epoll)

tornado.ioloop

很多初学者一定好奇 tornado 运行服务器最后那一句 tornado.ioloop.IOLoop.current().start() 到底是干什么的。 我们先不解释作用,来看看这一句代码背后到底都在干什么。

先贴 ioloop 代码:

Python

from __future__ import absolute_import, division, print_function, with_statement import datetime import errno import functools import heapq # 最小堆 import itertools import logging import numbers import os import select import sys import threading import time import traceback import math from tornado.concurrent import TracebackFuture, is_future from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try: import signal except ImportError: signal = None if PY3: import _thread as thread else: import thread _POLL_TIMEOUT = 3600.0 class TimeoutError(Exception): pass class IOLoop(Configurable): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() @staticmethod def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance @staticmethod def initialized(): """Returns true if the singleton instance has been created.""" return hasattr(IOLoop, "_instance") def install(self): assert not IOLoop.initialized() IOLoop._instance = self @staticmethod def clear_instance(): """Clear the global `IOLoop` instance. .. versionadded:: 4.0 """ if hasattr(IOLoop, "_instance"): del IOLoop._instance @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @staticmethod def clear_current(): IOLoop._current.instance = None @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current() def close(self, all_fds=False): raise NotImplementedError() def add_handler(self, fd, handler, events): raise NotImplementedError() def update_handler(self, fd, events): raise NotImplementedError() def remove_handler(self, fd): raise NotImplementedError() def set_blocking_signal_threshold(self, seconds, action): raise NotImplementedError() def set_blocking_log_threshold(self, seconds): self.set_blocking_signal_threshold(seconds, self.log_stack) def log_stack(self, signal, frame): gen_log.warning('IOLoop blocked for %f seconds in\n%s', self._blocking_signal_threshold, ''.join(traceback.format_stack(frame))) def start(self): raise NotImplementedError() def _setup_logging(self): if not any([logging.getLogger().handlers, logging.getLogger('tornado').handlers, logging.getLogger('tornado.application').handlers]): logging.basicConfig() def stop(self): raise NotImplementedError() def run_sync(self, func, timeout=None): future_cell = [None] def run(): try: result = func() if result is not None: from tornado.gen import convert_yielded result = convert_yielded(result) except Exception: future_cell[0] = TracebackFuture() future_cell[0].set_exc_info(sys.exc_info()) else: if is_future(result): future_cell[0] = result else: future_cell[0] = TracebackFuture() future_cell[0].set_result(result) self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) if timeout is not None: timeout_handle = self.add_timeout(self.time() + timeout, self.stop) self.start() if timeout is not None: self.remove_timeout(timeout_handle) if not future_cell[0].done(): raise TimeoutError('Operation timed out after %s seconds' % timeout) return future_cell[0].result() def time(self): return time.time() ...

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

from __future__ import absolute_import, division, print_function, with_statement

import datetime

import errno

import functools

import heapq       # 最小堆

import itertools

import logging

import numbers

import os

import select

import sys

import threading

import time

import traceback

import math

from tornado.concurrent import TracebackFuture, is_future

from tornado.log import app_log, gen_log

from tornado.platform.auto import set_close_exec, Waker

from tornado import stack_context

from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds

try:

import signal

except ImportError:

signal = None

if PY3:

import _thread as thread

else:

import thread

_POLL_TIMEOUT = 3600.0

class TimeoutError(Exception):

pass

class IOLoop(Configurable):

_EPOLLIN = 0x001

_EPOLLPRI = 0x002

_EPOLLOUT = 0x004

_EPOLLERR = 0x008

_EPOLLHUP = 0x010

_EPOLLRDHUP = 0x2000

_EPOLLONESHOT = (1 << 30)

_EPOLLET = (1 << 31)

# Our events map exactly to the epoll events

NONE = 0

READ = _EPOLLIN

WRITE = _EPOLLOUT

ERROR = _EPOLLERR | _EPOLLHUP

# Global lock for creating global IOLoop instance

_instance_lock = threading.Lock()

_current = threading.local()

@staticmethod

def instance():

if not hasattr(IOLoop, "_instance"):

with IOLoop._instance_lock:

if not hasattr(IOLoop, "_instance"):

# New instance after double check

IOLoop._instance = IOLoop()

return IOLoop._instance

@staticmethod

def initialized():

"""Returns true if the singleton instance has been created."""

return hasattr(IOLoop, "_instance")

def install(self):

assert not IOLoop.initialized()

IOLoop._instance = self

@staticmethod

def clear_instance():

"""Clear the global `IOLoop` instance.

.. versionadded:: 4.0

"""

if hasattr(IOLoop, "_instance"):

del IOLoop._instance

@staticmethod

def current(instance=True):

current = getattr(IOLoop._current, "instance", None)

if current is None and instance:

return IOLoop.instance()

return current

def make_current(self):

IOLoop._current.instance = self

@staticmethod

def clear_current():

IOLoop._current.instance = None

@classmethod

def configurable_base(cls):

return IOLoop

@classmethod

def configurable_default(cls):

if hasattr(select, "epoll"):

from tornado.platform.epoll import EPollIOLoop

return EPollIOLoop

if hasattr(select, "kqueue"):

# Python 2.6+ on BSD or Mac

from tornado.platform.kqueue import KQueueIOLoop

return KQueueIOLoop

from tornado.platform.select import SelectIOLoop

return SelectIOLoop

def initialize(self, make_current=None):

if make_current is None:

if IOLoop.current(instance=False) is None:

self.make_current()

elif make_current:

if IOLoop.current(instance=False) is not None:

raise RuntimeError("current IOLoop already exists")

self.make_current()

def close(self, all_fds=False):

raise NotImplementedError()

def add_handler(self, fd, handler, events):

raise NotImplementedError()

def update_handler(self, fd, events):

raise NotImplementedError()

def remove_handler(self, fd):

raise NotImplementedError()

def set_blocking_signal_threshold(self, seconds, action):

raise NotImplementedError()

def set_blocking_log_threshold(self, seconds):

self.set_blocking_signal_threshold(seconds, self.log_stack)

def log_stack(self, signal, frame):

gen_log.warning('IOLoop blocked for %f seconds in\n%s',

self._blocking_signal_threshold,

''.join(traceback.format_stack(frame)))

def start(self):

raise NotImplementedError()

def _setup_logging(self):

if not any([logging.getLogger().handlers,

logging.getLogger('tornado').handlers,

logging.getLogger('tornado.application').handlers]):

logging.basicConfig()

def stop(self):

raise NotImplementedError()

def run_sync(self, func, timeout=None):

future_cell = [None]

def run():

try:

result = func()

if result is not None:

from tornado.gen import convert_yielded

result = convert_yielded(result)

except Exception:

future_cell[0] = TracebackFuture()

future_cell[0].set_exc_info(sys.exc_info())

else:

if is_future(result):

future_cell[0] = result

else:

future_cell[0] = TracebackFuture()

future_cell[0].set_result(result)

self.add_future(future_cell[0], lambda future: self.stop())

self.add_callback(run)

if timeout is not None:

timeout_handle = self.add_timeout(self.time() + timeout, self.stop)

self.start()

if timeout is not None:

self.remove_timeout(timeout_handle)

if not future_cell[0].done():

raise TimeoutError('Operation timed out after %s seconds' % timeout)

return future_cell[0].result()

def time(self):

return time.time()

...

IOLoop 类首先声明了 epoll 监听事件的宏定义,当然,如前文所说,我们只要关心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。

类中的方法有很多,看起来有点晕,但其实我们只要关心 IOLoop 核心功能的方法即可,其他的方法在明白核心功能后也就不难理解了。所以接下来我们着重分析核心代码。

instanceinitializedinstallclear_instancecurrentmake_currentclear_current 这些方法不用在意细节,总之现在记住它们都是为了让 IOLoop 类变成一个单例,保证从全局上调用的都是同一个 IOLoop 就好。

你一定疑惑 IOLoop 为何没有 __init__, 其实是因为要初始化成为单例,IOLoop 的 new 函数已经被改写了,同时指定了 initialize 做为它的初始化方法,所以此处没有 __init__ 。 说到这,ioloop 的代码里好像没有看到 new 方法,这又是什么情况? 我们先暂时记住这里。

接着我们来看这个初始化方法:

def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is None: raise RuntimeError("current IOLoop already exists") self.make_current() def make_current(self): IOLoop._current.instance = self

1

2

3

4

5

6

7

8

9

10

11

def initialize(self, make_current=None):

if make_current is None:

if IOLoop.current(instance=False) is None:

self.make_current()

elif make_current:

if IOLoop.current(instance=False) is None:

raise RuntimeError("current IOLoop already exists")

self.make_current()

def make_current(self):

IOLoop._current.instance = self

what? 里面只是判断了是否第一次初始化或者调用 self.make_current() 初始化,而 make_current() 里也仅仅是把实例指定为自己,那么初始化到底去哪了?

然后再看看 start()run()close() 这些关键的方法都成了返回 NotImplementedError 错误,全部未定义?!跟网上搜到的源码分析完全不一样啊。 这时候看下 IOLoop 的继承关系,原来问题出在这里,之前的 tornado.ioloop 继承自 object 所以所有的一切都自己实现,而现在版本的 tornado.ioloop 则继承自 Configurable 看起来现在的 IOLoop 已经成为了一个基类,只定义了接口。 所以接着看 Configurable 代码:

tornado.util.Configurable

class Configurable(object): __impl_class = None __impl_kwargs = None def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(*args, **init_kwargs) return instance @classmethod def configurable_base(cls): """Returns the base class of a configurable hierarchy. This will normally return the class in which it is defined. (which is *not* necessarily the same as the cls classmethod parameter). """ raise NotImplementedError() @classmethod def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError() def initialize(self): """Initialize a `Configurable` subclass instance. Configurable classes should use `initialize` instead of ``__init__``. .. versionchanged:: 4.2 Now accepts positional arguments in addition to keyword arguments. """ @classmethod def configure(cls, impl, **kwargs): """Sets the class to use when the base class is instantiated. Keyword arguments will be saved and added to the arguments passed to the constructor. This can be used to set global defaults for some parameters. """ base = cls.configurable_base() if isinstance(impl, (unicode_type, bytes)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs @classmethod def configured_class(cls): """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class @classmethod def _save_configuration(cls): base = cls.configurable_base() return (base.__impl_class, base.__impl_kwargs) @classmethod def _restore_configuration(cls, saved): base = cls.configurable_base() base.__impl_class = saved[0] base.__impl_kwargs = saved[1]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

class Configurable(object):

__impl_class = None

__impl_kwargs = None

def __new__(cls, *args, **kwargs):

base = cls.configurable_base()

init_kwargs = {}

if cls is base:

impl = cls.configured_class()

if base.__impl_kwargs:

init_kwargs.update(base.__impl_kwargs)

else:

impl = cls

init_kwargs.update(kwargs)

instance = super(Configurable, cls).__new__(impl)

# initialize vs __init__ chosen for compatibility with AsyncHTTPClient

# singleton magic.  If we get rid of that we can switch to __init__

# here too.

instance.initialize(*args, **init_kwargs)

return instance

@classmethod

def configurable_base(cls):

"""Returns the base class of a configurable hierarchy.

This will normally return the class in which it is defined.

(which is *not* necessarily the same as the cls classmethod parameter).

"""

raise NotImplementedError()

@classmethod

def configurable_default(cls):

"""Returns the implementation class to be used if none is configured."""

raise NotImplementedError()

def initialize(self):

"""Initialize a `Configurable` subclass instance.

Configurable classes should use `initialize` instead of ``__init__``.

.. versionchanged:: 4.2

Now accepts positional arguments in addition to keyword arguments.

"""

@classmethod

def configure(cls, impl, **kwargs):

"""Sets the class to use when the base class is instantiated.

Keyword arguments will be saved and added to the arguments passed

to the constructor.  This can be used to set global defaults for

some parameters.

"""

base = cls.configurable_base()

if isinstance(impl, (unicode_type, bytes)):

impl = import_object(impl)

if impl is not None and not issubclass(impl, cls):

raise ValueError("Invalid subclass of %s" % cls)

base.__impl_class = impl

base.__impl_kwargs = kwargs

@classmethod

def configured_class(cls):

"""Returns the currently configured class."""

base = cls.configurable_base()

if cls.__impl_class is None:

base.__impl_class = cls.configurable_default()

return base.__impl_class

@classmethod

def _save_configuration(cls):

base = cls.configurable_base()

return (base.__impl_class, base.__impl_kwargs)

@classmethod

def _restore_configuration(cls, saved):

base = cls.configurable_base()

base.__impl_class = saved[0]

base.__impl_kwargs = saved[1]

之前我们寻找的 __new__ 出现了! 注意其中这句: impl = cls.configured_class() impl 在这里就是 epoll ,它的生成函数是 configured_class(), 而其方法里又有 base.__impl_class = cls.configurable_default() ,调用了 configurable_default() 。而 Configurableconfigurable_default():

def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError()

1

2

3

def configurable_default(cls):

"""Returns the implementation class to be used if none is configured."""

raise NotImplementedError()

显然也是个接口,那么我们再回头看 ioloop 的 configurable_default():

def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop

1

2

3

4

5

6

7

8

9

10

def configurable_default(cls):

if hasattr(select, "epoll"):

from tornado.platform.epoll import EPollIOLoop

return EPollIOLoop

if hasattr(select, "kqueue"):

# Python 2.6+ on BSD or Mac

from tornado.platform.kqueue import KQueueIOLoop

return KQueueIOLoop

from tornado.platform.select import SelectIOLoop

return SelectIOLoop

原来这是个工厂函数,根据不同的操作系统返回不同的事件池(linux 就是 epoll, mac 返回 kqueue,其他就返回普通的 select。 kqueue 基本等同于 epoll, 只是不同系统对其的不同实现)

现在线索转移到了 tornado.platform.epoll.EPollIOLoop 上,我们再来看看 EPollIOLoop:

tornado.platform.epoll.EPollIOLoop

import select from tornado.ioloop import PollIOLoop class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

1

2

3

4

5

6

7

8

import select

from tornado.ioloop import PollIOLoop

class EPollIOLoop(PollIOLoop):

def initialize(self, **kwargs):

super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

EPollIOLoop 完全继承自 PollIOLoop注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll,所以看起来我们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,所以接下来,我们真正需要理解和阅读的内容应该都在这里:

tornado.ioloop.PollIOLoop

class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self): ... try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: # If we're in the IOLoop's thread, we know it's not currently # polling. If we're not, and we added the first callback to an # empty list, we may need to wake it up (it may wake up on its # own, but an occasional extra wake is harmless). Waking # up a polling IOLoop is relatively expensive, so we try to # avoid it when we can. self._waker.wake() def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add # it normally (modulo the NullContext) self.add_callback(callback, *args, **kwargs) else: # If we're on the IOLoop's thread, we cannot use # the regular add_callback because it may deadlock on # _callback_lock. Blindly insert into self._callbacks. # This is safe because the GIL makes list.append atomic. # One subtlety is that if the signal interrupted the # _callback_lock block in IOLoop.start, we may modify # either the old or new version of self._callbacks, # but either way will work. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

class PollIOLoop(IOLoop):

"""Base class for IOLoops built around a select-like function.

For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`

(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or

`tornado.platform.select.SelectIOLoop` (all platforms).

"""

def initialize(self, impl, time_func=None, **kwargs):

super(PollIOLoop, self).initialize(**kwargs)

self._impl = impl

if hasattr(self._impl, 'fileno'):

set_close_exec(self._impl.fileno())

self.time_func = time_func or time.time

self._handlers = {}

self._events = {}

self._callbacks = []

self._callback_lock = threading.Lock()

self._timeouts = []

self._cancellations = 0

self._running = False

self._stopped = False

self._closing = False

self._thread_ident = None

self._blocking_signal_threshold = None

self._timeout_counter = itertools.count()

# Create a pipe that we send bogus data to when we want to wake

# the I/O loop when it is idle

self._waker = Waker()

self.add_handler(self._waker.fileno(),

lambda fd, events: self._waker.consume(),

self.READ)

def close(self, all_fds=False):

with self._callback_lock:

self._closing = True

self.remove_handler(self._waker.fileno())

if all_fds:

for fd, handler in self._handlers.values():

self.close_fd(fd)

self._waker.close()

self._impl.close()

self._callbacks = None

self._timeouts = None

def add_handler(self, fd, handler, events):

fd, obj = self.split_fd(fd)

self._handlers[fd] = (obj, stack_context.wrap(handler))

self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):

fd, obj = self.split_fd(fd)

self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):

fd, obj = self.split_fd(fd)

self._handlers.pop(fd, None)

self._events.pop(fd, None)

try:

self._impl.unregister(fd)

except Exception:

gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

def set_blocking_signal_threshold(self, seconds, action):

if not hasattr(signal, "setitimer"):

gen_log.error("set_blocking_signal_threshold requires a signal module "

"with the setitimer method")

return

self._blocking_signal_threshold = seconds

if seconds is not None:

signal.signal(signal.SIGALRM,

action if action is not None else signal.SIG_DFL)

def start(self):

...

try:

while True:

# Prevent IO event starvation by delaying new callbacks

# to the next iteration of the event loop.

with self._callback_lock:

callbacks = self._callbacks

self._callbacks = []

# Add any timeouts that have come due to the callback list.

# Do not run anything until we have determined which ones

# are ready, so timeouts that call add_timeout cannot

# schedule anything in this iteration.

due_timeouts = []

if self._timeouts:

now = self.time()

while self._timeouts:

if self._timeouts[0].callback is None:

# The timeout was cancelled.  Note that the

# cancellation check is repeated below for timeouts

# that are cancelled by another timeout or callback.

heapq.heappop(self._timeouts)

self._cancellations -= 1

elif self._timeouts[0].deadline  512

and self._cancellations > (len(self._timeouts) >> 1)):

# Clean up the timeout queue when it gets large and it's

# more than half cancellations.

self._cancellations = 0

self._timeouts = [x for x in self._timeouts

if x.callback is not None]

heapq.heapify(self._timeouts)

for callback in callbacks:

self._run_callback(callback)

for timeout in due_timeouts:

if timeout.callback is not None:

self._run_callback(timeout.callback)

# Closures may be holding on to a lot of memory, so allow

# them to be freed before we go into our poll wait.

callbacks = callback = due_timeouts = timeout = None

if self._callbacks:

# If any callbacks or timeouts called add_callback,

# we don't want to wait in poll() before we run them.

poll_timeout = 0.0

elif self._timeouts:

# If there are any timeouts, schedule the first one.

# Use self.time() instead of 'now' to account for time

# spent running callbacks.

poll_timeout = self._timeouts[0].deadline - self.time()

poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))

else:

# No timeouts and no callbacks, so use the default.

poll_timeout = _POLL_TIMEOUT

if not self._running:

break

if self._blocking_signal_threshold is not None:

# clear alarm so it doesn't fire while poll is waiting for

# events.

signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:

event_pairs = self._impl.poll(poll_timeout)

except Exception as e:

# Depending on python version and IOLoop implementation,

# different exception types may be thrown and there are

# two ways EINTR might be signaled:

# * e.errno == errno.EINTR

# * e.args is like (errno.EINTR, 'Interrupted system call')

if errno_from_exception(e) == errno.EINTR:

continue

else:

raise

if self._blocking_signal_threshold is not None:

signal.setitimer(signal.ITIMER_REAL,

self._blocking_signal_threshold, 0)

# Pop one fd at a time from the set of pending fds and run

# its handler. Since that handler may perform actions on

# other file descriptors, there may be reentrant calls to

# this IOLoop that update self._events

self._events.update(event_pairs)

while self._events:

fd, events = self._events.popitem()

try:

fd_obj, handler_func = self._handlers[fd]

handler_func(fd_obj, events)

except (OSError, IOError) as e:

if errno_from_exception(e) == errno.EPIPE:

# Happens when the client closes the connection

pass

else:

self.handle_callback_exception(self._handlers.get(fd))

except Exception:

self.handle_callback_exception(self._handlers.get(fd))

fd_obj = handler_func = None

finally:

# reset the stopped flag so another start/stop pair can be issued

self._stopped = False

if self._blocking_signal_threshold is not None:

signal.setitimer(signal.ITIMER_REAL, 0, 0)

IOLoop._current.instance = old_current

if old_wakeup_fd is not None:

signal.set_wakeup_fd(old_wakeup_fd)

def stop(self):

self._running = False

self._stopped = True

self._waker.wake()

def time(self):

return self.time_func()

def call_at(self, deadline, callback, *args, **kwargs):

timeout = _Timeout(

deadline,

functools.partial(stack_context.wrap(callback), *args, **kwargs),

self)

heapq.heappush(self._timeouts, timeout)

return timeout

def remove_timeout(self, timeout):

# Removing from a heap is complicated, so just leave the defunct

# timeout object in the queue (see discussion in

# http://docs.python.org/library/heapq.html).

# If this turns out to be a problem, we could add a garbage

# collection pass whenever there are too many dead timeouts.

timeout.callback = None

self._cancellations += 1

def add_callback(self, callback, *args, **kwargs):

with self._callback_lock:

if self._closing:

raise RuntimeError("IOLoop is closing")

list_empty = not self._callbacks

self._callbacks.append(functools.partial(

stack_context.wrap(callback), *args, **kwargs))

if list_empty and thread.get_ident() != self._thread_ident:

# If we're in the IOLoop's thread, we know it's not currently

# polling.  If we're not, and we added the first callback to an

# empty list, we may need to wake it up (it may wake up on its

# own, but an occasional extra wake is harmless).  Waking

# up a polling IOLoop is relatively expensive, so we try to

# avoid it when we can.

self._waker.wake()

def add_callback_from_signal(self, callback, *args, **kwargs):

with stack_context.NullContext():

if thread.get_ident() != self._thread_ident:

# if the signal is handled on another thread, we can add

# it normally (modulo the NullContext)

self.add_callback(callback, *args, **kwargs)

else:

# If we're on the IOLoop's thread, we cannot use

# the regular add_callback because it may deadlock on

# _callback_lock.  Blindly insert into self._callbacks.

# This is safe because the GIL makes list.append atomic.

# One subtlety is that if the signal interrupted the

# _callback_lock block in IOLoop.start, we may modify

# either the old or new version of self._callbacks,

# but either way will work.

self._callbacks.append(functools.partial(

stack_context.wrap(callback), *args, **kwargs))

果然, PollIOLoop 继承自 IOLoop 并实现了它的所有接口,现在我们终于可以进入真正的正题了

ioloop 分析

首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现:

epoll 操作

def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

def add_handler(self, fd, handler, events):

fd, obj = self.split_fd(fd)

self._handlers[fd] = (obj, stack_context.wrap(handler))

self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):

fd, obj = self.split_fd(fd)

self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):

fd, obj = self.split_fd(fd)

self._handlers.pop(fd, None)

self._events.pop(fd, None)

try:

self._impl.unregister(fd)

except Exception:

gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。

epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create 。

epoll_wait:epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)

epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。

initialize

接下来看 PollIOLoop 的初始化方法中作了什么:

def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 指定 epoll if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) # fork 后关闭无用文件描述符 self.time_func = time_func or time.time # 指定获取当前时间的函数 self._handlers = {} # handler 的字典,储存被 epoll 监听的 handler,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应 self._events = {} # event 的字典,储存 epoll 返回的活跃的 fd event pairs self._callbacks = [] # 储存各个 fd 回调函数的列表 self._callback_lock = threading.Lock() # 指定进程锁 self._timeouts = [] # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback ) self._cancellations = 0 # 关于 timeout 的计数器 self._running = False # ioloop 是否在运行 self._stopped = False # ioloop 是否停止 self._closing = False # ioloop 是否关闭 self._thread_ident = None # 当前线程堆标识符 ( thread identify ) self._blocking_signal_threshold = None # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲) self._waker = Waker() # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

def initialize(self, impl, time_func=None, **kwargs):

super(PollIOLoop, self).initialize(**kwargs)

self._impl = impl                         # 指定 epoll

if hasattr(self._impl, 'fileno'):

set_close_exec(self._impl.fileno())   # fork 后关闭无用文件描述符

self.time_func = time_func or time.time   # 指定获取当前时间的函数

self._handlers = {}                       # handler 的字典,储存被 epoll 监听的 handler,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应

self._events = {}                         # event 的字典,储存 epoll 返回的活跃的 fd event pairs

self._callbacks = []                      # 储存各个 fd 回调函数的列表

self._callback_lock = threading.Lock()    # 指定进程锁

self._timeouts = []                       # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback )

self._cancellations = 0                   # 关于 timeout 的计数器

self._running = False                     # ioloop 是否在运行

self._stopped = False                     # ioloop 是否停止

self._closing = False                     # ioloop 是否关闭

self._thread_ident = None                 #  当前线程堆标识符 ( thread identify )

self._blocking_signal_threshold = None    # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll

self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲)

self._waker = Waker()                     # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe

self.add_handler(self._waker.fileno(),

lambda fd, events: self._waker.consume(),

self.READ)               # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件

除了注释中的解释,还有几点补充:

  1. close_exec 的作用: 子进程在fork出来的时候,使用了写时复制(COW,Copy-On-Write)方式获得父进程的数据空间、 堆和栈副本,这其中也包括文件描述符。刚刚fork成功时,父子进程中相同的文件描述符指向系统文件表中的同一项,接着,一般我们会调用exec执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,我们就无法关闭无用的文件描述符了。所以通常我们会fork子进程后在子进程中直接执行close关掉无用的文件描述符,然后再执行exec。 所以 close_exec 执行的其实就是 关闭 + 执行的作用。 详情可以查看: 关于linux进程间的close-on-exec机制
  2. Waker(): Waker 封装了对于管道 pipe 的操作:

    def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) set_close_exec(r) set_close_exec(w) self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.write(b"x") except IOError: pass def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass def close(self): self.reader.close() self.writer.close()

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    def set_close_exec(fd):

    flags = fcntl.fcntl(fd, fcntl.F_GETFD)

    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

    def _set_nonblocking(fd):

    flags = fcntl.fcntl(fd, fcntl.F_GETFL)

    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

    class Waker(interface.Waker):

    def __init__(self):

    r, w = os.pipe()

    _set_nonblocking(r)

    _set_nonblocking(w)

    set_close_exec(r)

    set_close_exec(w)

    self.reader = os.fdopen(r, "rb", 0)

    self.writer = os.fdopen(w, "wb", 0)

    def fileno(self):

    return self.reader.fileno()

    def write_fileno(self):

    return self.writer.fileno()

    def wake(self):

    try:

    self.writer.write(b"x")

    except IOError:

    pass

    def consume(self):

    try:

    while True:

    result = self.reader.read()

    if not result:

    break

    except IOError:

    pass

    def close(self):

    self.reader.close()

    self.writer.close()

    可以看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中写入随意字符从而释放管道。

start

ioloop 最核心的部分:

def start(self): if self._running: # 判断是否已经运行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 设置停止为假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 获得当前线程标识符 self._running = True # 设置运行 old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服务器进程正式开始,类似于其他服务器的 serve_forever with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据 callbacks = self._callbacks # 读取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务 if self._timeouts: # 判断 _timeouts 里是否有数据 now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时 while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的 if self._timeouts[0].callback is None: # 超时任务无回调 heapq.heappop(self._timeouts) # 直接弹出 self._cancellations -= 1 # 超时计数器 -1 elif self._timeouts[0].deadline 512 and self._cancellations > (len(self._timeouts) >> 1)): # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务 self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 运行 callbacks 里所有的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 运行所有已过期任务的 callback callbacks = callback = due_timeouts = timeout = None # 释放内存 if self._callbacks: # _callbacks 里有数据时 poll_timeout = 0.0 # 设置 epoll_wait 时间为0( 立即返回 ) elif self._timeouts: # _timeouts 里有数据时 poll_timeout = self._timeouts[0].deadline - self.time() # 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600,则用 3600,如果最小过期时间小于0 就设置为0 立即返回。 else: poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间 if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续 break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 ) try: event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # epoll_wait 结束, 再设置 signal alarm self._events.update(event_pairs) # 将活跃事件加入 _events while self._events: fd, events = self._events.popitem() # 循环弹出事件 try: fd_obj, handler_func = self._handlers[fd] # 处理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 确保发生异常也继续运行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

def start(self):

if self._running:       # 判断是否已经运行

raise RuntimeError("IOLoop is already running")

self._setup_logging()

if self._stopped:

self._stopped = False  # 设置停止为假

return

old_current = getattr(IOLoop._current, "instance", None)

IOLoop._current.instance = self

self._thread_ident = thread.get_ident()  # 获得当前线程标识符

self._running = True # 设置运行

old_wakeup_fd = None

if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':

try:

old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())

if old_wakeup_fd != -1:

signal.set_wakeup_fd(old_wakeup_fd)

old_wakeup_fd = None

except ValueError:

old_wakeup_fd = None

try:

while True:  # 服务器进程正式开始,类似于其他服务器的 serve_forever

with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据

callbacks = self._callbacks # 读取 _callbacks

self._callbacks = []. # 清空 _callbacks

due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务

if self._timeouts: # 判断 _timeouts 里是否有数据

now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时

while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的

if self._timeouts[0].callback is None: # 超时任务无回调

heapq.heappop(self._timeouts) # 直接弹出

self._cancellations -= 1 # 超时计数器 -1

elif self._timeouts[0].deadline  512

and self._cancellations > (len(self._timeouts) >> 1)):  # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务

self._cancellations = 0

self._timeouts = [x for x in self._timeouts

if x.callback is not None]

heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化

for callback in callbacks:

self._run_callback(callback) # 运行 callbacks 里所有的 calllback

for timeout in due_timeouts:

if timeout.callback is not None:

self._run_callback(timeout.callback) # 运行所有已过期任务的 callback

callbacks = callback = due_timeouts = timeout = None # 释放内存

if self._callbacks: # _callbacks 里有数据时

poll_timeout = 0.0 # 设置 epoll_wait 时间为0( 立即返回 )

elif self._timeouts: # _timeouts 里有数据时

poll_timeout = self._timeouts[0].deadline - self.time()

# 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回

poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))

# 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600,则用 3600,如果最小过期时间小于0 就设置为0 立即返回。

else:

poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间

if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续

break

if self._blocking_signal_threshold is not None:

signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 )

try:

event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队

except Exception as e:

if errno_from_exception(e) == errno.EINTR:

continue

else:

raise

if self._blocking_signal_threshold is not None:

signal.setitimer(signal.ITIMER_REAL,

self._blocking_signal_threshold, 0) #  epoll_wait 结束, 再设置 signal alarm

self._events.update(event_pairs) # 将活跃事件加入 _events

while self._events:

fd, events = self._events.popitem() # 循环弹出事件

try:

fd_obj, handler_func = self._handlers[fd] # 处理事件

handler_func(fd_obj, events)

except (OSError, IOError) as e:

if errno_from_exception(e) == errno.EPIPE:

pass

else:

self.handle_callback_exception(self._handlers.get(fd))

except Exception:

self.handle_callback_exception(self._handlers.get(fd))

fd_obj = handler_func = None

finally:

self._stopped = False # 确保发生异常也继续运行

if self._blocking_signal_threshold is not None:

signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm

IOLoop._current.instance = old_current

if old_wakeup_fd is not None:

signal.set_wakeup_fd(old_wakeup_fd)   # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路

stop

def stop(self): self._running = False self._stopped = True self._waker.wake()

1

2

3

4

def stop(self):

self._running = False

self._stopped = True

self._waker.wake()

这个很简单,设置判断条件,然后调用 self._waker.wake() 向 pipe 写入随意字符唤醒 ioloop 事件循环。 over!

总结

噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。

最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!

转载于:https://my.oschina.net/u/3270404/blog/868950

深入理解 tornado 之底层 ioloop 实现相关推荐

  1. 深入理解 tornado 之 底层 ioloop 实现(一)

    tornado 优秀的大并发处理能力得益于它的 web server 从底层开始就自己实现了一整套基于 epoll 的单线程异步架构(其他 python web 框架的自带 server 基本是基于 ...

  2. 深入理解 tornado 之 底层 ioloop 实现(二)

    ioloop 分析 首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现: epoll 操作 1 def ...

  3. [翻译]深入理解Tornado——一个异步web服务器

    本人的第一次翻译,转载请注明出处:http://www.cnblogs.com/yiwenshengmei/archive/2011/06/08/understanding_tornado.html ...

  4. 【手写系列】理解数据库连接池底层原理之手写实现

    前言 数据库连接池的基本思想是:为数据库连接建立一个"缓冲池",预先在池中放入一定数量的数据库连接管道,需要时,从池子中取出管道进行使用,操作完毕后,再将管道放入池子中,从而避免了 ...

  5. Python Tornado框架(ioloop对象分析)

    转自:https://www.cnblogs.com/jasonwang-2016/p/5950548.html 网上都说nginx和lighthttpd是高性能web服务器,而tornado也是著名 ...

  6. mysql索引数据结构图解_深入理解Mysql索引底层数据结构与算法

    索引的定义:索引(Index)是帮助MySQL高效获取数据的数据结构. Q1:大家使用索引有没有想过这个问题?为什么索引能够帮助mysql高效获取数据?我一一给大家道来!在给大家讲之前,先更大家分享一 ...

  7. 深入理解 MySQL 索引底层原理

    hi ,大家好,今天分享MySQL硬核知识,希望大家可以学习到真正的知识,慢慢积累,厚积薄发: 看完本文可以学到什么  一步一步推导出 Mysql 索引的底层数据结构  怎么分析回答技术选型问题 一步 ...

  8. 通过一条语句的执行,深入理解innoDB的底层架构

    MySQL最常用的存储引擎是innodb,我们今天就借助一条更新语句的执行,了解下innodb具体是如何处理的,深入理解下它的架构. 假设更新语句是这样的: update user set name ...

  9. 深入tornado中的ioLoop

    本文所剖析的tornado源码版本为4.4.2 ioloop是tornado的关键,是他的最底层. ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._insta ...

最新文章

  1. UVa12096.The SetStack Computer
  2. Invoice校验差异处理
  3. ccna____总结
  4. 两种解法-树形dp+二分+单调队列(或RMQ)-hdu-4123-Bob’s Race
  5. 实录分享 | 计算未来轻沙龙:揭秘AutoML技术(视频 + PPT)
  6. jieba分词 ‘float‘ object has no attribute ‘decode‘ 解决方法
  7. no cortex-m sw device found_SW大模块水箱桂林厂家图纸
  8. pyqt 子窗口控制主窗口绘图_PyQtGraph如何关闭绘图窗口/关闭所有绘图窗口?
  9. react项目在配置webpack的时候问题
  10. BAT54C那些事儿
  11. 高德地图根据经纬度获取具体城市信息
  12. smartadmin_smartadmin 下载_smartadmin 官网
  13. 基于Php+MySql数据库架构的网络验证系统
  14. jQuery EasyUI教程
  15. 面试被虐题— 谨以此致,mark一个悲惨的下午
  16. 心流:最优体验心理学 1
  17. grbl源码解析——速度前瞻(1)
  18. Java时间格式化工具类大全_二
  19. TIFS_2013_Empirical Evaluation and New Design for Fighting Evolving Twitter Spammers
  20. 九款能将PowerPoint转换成PDF的免费软件

热门文章

  1. obd android,OBD2代码免费修复
  2. Android进阶之路 - 跳转应用商店下载、更新app
  3. 我的世界服务器怎么添加信息框,我的世界服务器怎么添加指定建筑
  4. python 与设计模式 ——工厂与单例
  5. MySQL触发器的题_mysql触发器问题
  6. 飞思卡尔MC9S12X PIT模块
  7. freescale MC9S12G128单片机概述
  8. 2023基于微信小程序的社区小区车位租赁系统平台(springboot+mysql)-JAVA.VUE(论文+开题报告+运行)
  9. 数据结构(八):并查集详解 (多图+动图)
  10. hexo高阶教程:想让你的博客被更多的人在搜索引擎中搜到吗?