枫叶居

桃李春风一杯酒,江湖夜雨十年灯

0%

Tornado IOLoop图解

Tornado IOLoop简述

前言

笔者信奉这样一种哲学——“把书从薄读厚,然后从厚读薄”,Tornado源码犹如一部文学作品,汇集了众多优秀Python工程师的智慧结晶,奇思妙想让人拍手连连。一本好书每读一次,都有不同的感受,代码也是如此。为了能够在以后的工作学习中时时回顾品味一下,笔者决定将笔者对Tornado的理解以图记录下来。

在这里笔者推荐一款强大的在线绘图软件:https://www.draw.io/,想要Visio的专业,却不喜欢Visio笨重的读者绝对会让你好用到Cry

IOLoop图示

注意:IO多路复用技术不了解的同学,可以先了解一下 阻塞/非阻塞,同步/异步,select,epoll 等概念。

笔者注意到,任何语言的事件循环(比如:libevnodejs,…),最核心的Feature是相同的,不一样的只不过是实现方式,抽象层次不同,笔者将这些核心Feature总结如下:

  • 文件IO事件(比如:socketpipeREADWRITEHUP事件 …)
  • 系统信号(比如:SIGINT,SIGHUP…)
  • 定时器

Tornado IOLoop的实现也不例外,如下图所示:

IOLoop示意图

IOLoop代码随笔

以下是笔者对IOLoop核心方法start的源码注解,可以用于结合图示,加深理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def start(self):
if self._running:
# IOLoop已经启动。
raise RuntimeError("IOLoop is already running")
self._setup_logging() # 开始安装日志模块。
if self._stopped:
self._stopped = False # 如果已经停止,则直接返回。
return
# 获取当前线程的IOLoop实例。
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self # 将当前IOLoop实例置为self。
self._thread_ident = thread.get_ident() # 线程id。
self._running = True

# signal.set_wakeup_fd closes a race condition in event loops:
# a signal may arrive at the beginning of select/poll/etc
# before it goes into its interruptible sleep, so the signal
# will be consumed without waking the select. The solution is
# for the (C, synchronous) signal handler to write to a pipe,
# which will then be seen by select.
#
# In python's signal handling semantics, this only matters on the
# main thread (fortunately, set_wakeup_fd only works on the main
# thread and will raise a ValueError otherwise).
#
# If someone has already set a wakeup fd, we don't want to
# disturb it. This is an issue for twisted, which does its
# SIGCHLD processing in response to its own wakeup fd being
# written to. As long as the wakeup fd is registered on the IOLoop,
# the loop will still wake up and everything should work.

# wakeup_fd是用来唤醒主事件循环(信号唤醒,或者从别的线程唤醒主线程)。
old_wakeup_fd = None
if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
# requires python 2.6+, unix. set_wakeup_fd exists but crashes
# the python process on windows.

# Python2.6版本以上,Unix-like系统中,signal模块支持set_wakeup_fd方法。
# Windows上siganl存在该方法,但是会crash。
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
# Already set, restore previous value. This is a little racy,
# but there's no clean get_wakeup_fd and in real use the
# IOLoop is just started once at the beginning.
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
# Non-main thread, or the previous value of wakeup_fd
# is no longer valid.
# 参考signal的官方手册,set_wakeup_fd仅可在主线程中调用。
old_wakeup_fd = None

try:
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
# ncallbacks记录了此次循环的回调函数个数,新增加的回调函数将要在下次循环被调用。
ncallbacks = len(self._callbacks)

# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
# schedule anything in this iteration.
due_timeouts = [] # 即将超时的任务。
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# The timeout was cancelled. Note that the
# cancellation check is repeated below for timeouts
# that are cancelled by another timeout or callback.
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
# 如果定时任务取消数量大于512,并且超过总定时任务的半数,则清理self._timeouts,并重新平衡堆。
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)

for i in range(ncallbacks):
# 执行回调函数。
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
# 执行定时任务。
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
due_timeouts = timeout = None # 防止内存泄漏

if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
# 如果发现新增的_callbacks,(回调函数执行时加入了新的回调函数)。
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time() # 距离将来最近一次定时任务的时间,wait该时间。
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
# 未发现新的回调函数与定时任务,则调用poll,等待IO事件,超时事件为3600秒。
poll_timeout = _POLL_TIMEOUT

if not self._running:
# 如果回调函数中有调用stop的则,跳出事件循环。
break

if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:
# 等待IO事件,events_pairs内容为:[(fd, events), (fd, events), ]
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')

# poll陷入内核态以后,进程捕获到的信号会导致poll wait结束,并且错误码为EINTR。
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise

if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)

# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that modify self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
# 获取file-like object,与IO事件的处理函数handler。
fd_obj, handler_func = self._handlers[fd]
# 调用handler,处理fd_obj上发生的events事件,
# handler_func在add_handler时候,加入了对事件处理的wraps。
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
# 客户端关闭了同服务器的连接。
pass
else:
# 处理异常。
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
# 还原前一个IOLoop实例(作者也说了这种情况基本没有...)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)
坚持原创技术分享,您的支持将鼓励我继续创作!