asyncore为select.select、poll的封装(实际上现在大家都使用更为高效的epoll),变成了框架的使用模式,该库已经作为兼容模式存在,新的库为asyncio。且在2和3中asyncore代码有一点点差异。(源码版本2.7.11)

dispatcher类(调度)

文件asyncore.py中最重要的类为dispatcher,使用的时候只需要继承asyncore.dispatcher就好了

可以这样理解每一个继承了asyncore.dispatcher的类都代表了一个socket(监听socket或者连接socket)

在asyncore.py的最开始有一个全局字典socket_map对应fd和一个asyncore.dispatcher类,当你继承一个dispatcher类的时候总会调用函数在全局字典socket_map中创建一个映射,当然在关闭的时候也会从全局字典中删除

最后是一个loop死循环,可以很容易想到是使用了系统IO多路复用接口select.select对全局socket_map进行了操作。根据返回的事件进行操作(3个事件读、写、错误)

源码中我们着重看dispatcher类,它可以划分为以下几个要点

  1. 添加了4个状态,connected/accepting/connecting/closing,想想为什么要设置这几个状态呢???
    connected:已经连接上对方,比如监听套接字sock,addr = socket.accept()返回的sock
  2. add_channel/del_channel/create_socket/set_socket这几个都是针对全局字典socket_map的操作。添加进去或者删除
  3. readable/writable 用的地方不是太多,预先确定该socket该不该添加到socket的可读可写里面
  4. listen/bind/connect/accept/send/recv/close 这几个函数嘛,对原有的行为稍微改动了下,比如listen设置为accepting状态,connect设置为connecting状态,send,recv一定条件出发close,close即为从全局字典中删除并关闭socket
  5. handle_read_event/handle_connect_event/handle_write_event.当select有事件返回的时候就是调用的这3个方法。只是需要注意。这里并不是最终执行的操作(send、recv等)!!!这里也体现了标注socket状态的作用,read可以分为监听(accepting)、连接完成后(connecting)→→→这里就是一个hook,如果自己调用connect连接那么完成后是此状态,可以自己重写handle_connect处理该事件,如果是直接初始化的socket则直接为connected状态。handle_write_event也很明显,当为connecting状态的时候处理hook,否则处理写入事件
  6. 最后这一部分基本就是使用者需要重写的部分了。handle_expt/handle_read/handle_write/handle_connect/handle_accept/handle_close含义很明白,就不表述了

在后面就是dispatcher的继承类dispatcher_with_send和file_dispatcher,前者嘛,试想一下你要发送非常多内容,肯定调用一次send发不完。它就给你解决了这个问题,当然方法也很简单。后面就是为了高效发送文件而产生的了

流程图示(以一个客户端为例)

sockets5 DEMO

对于使用asyncore,官方的例子我觉得已经表现的非常到位了。下面我放出一个使用它写的简单socks5server(作为一个demo,只对tcp进行处理,没有写远程dns解析)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import asyncore
import socket
import struct

sockets_map = {}

class Socks5Listen(asyncore.dispatcher):
def __init__(self, address=('localhost', 8081)):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(address)
self.listen(2048)

def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
print('socks connection from ', addr)
Local(sock)


class Local(asyncore.dispatcher_with_send):
def __init__(self, sock):
asyncore.dispatcher_with_send.__init__(self, sock=sock)
self.read_buffer = ''
self.status = 'init'

def read(self, num):
data = self.recv(num)
self.read_buffer += data
if len(self.read_buffer) != num:
return None
else:
_ = self.read_buffer
self.read_buffer = ''
return _

def handle_read(self):
if self.status == 'init':
self.recv(262)
self.send(b"\x05\x00")
self.status = 'request'
return
if self.status == 'request':
data = self.read(4)
if data is None: return
mode = ord(data[1])
addrtype = ord(data[3])
if addrtype == 1: # IPv4
addr = self.read(4)
if addr is None: return
addr = socket.inet_ntoa(addr)
elif addrtype == 3: # Domain name
addr = self.read(ord(self.recv(1)[0]))
if addr is None: return
port = self.read(2)
if port is None: return
port = struct.unpack('>H', port)[0]
if mode == 1: # 1. Tcp connect
remote = Remote(addr, port)
sockets_map[self] = remote
sockets_map[remote] = self
self.status = 'transfer'
return
else:
reply = b"\x05\x07\x00\x01" # Command not supported
self.send(reply)
self.close()
return
if self.status == 'transfer':
data = self.recv(2048)
if data:
sockets_map[self].send(data)

def handle_expt(self):
self.handle_close()

def handle_close(self):
asyncore.dispatcher_with_send.handle_close(self)
remote = sockets_map.get(self)
if remote:
del sockets_map[remote]
if self in sockets_map:
del sockets_map[self]


class Remote(asyncore.dispatcher_with_send):
def __init__(self, addr, port):
asyncore.dispatcher_with_send.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((addr, port))

def handle_connect(self):
print('Tcp connect to', self.addr)
local = self.socket.getsockname()
reply = b"\x05\x00\x00\x01"
reply += socket.inet_aton(local[0]) + struct.pack(">H", local[1])
sockets_map[self].socket.send(reply)

def handle_read(self):
data = self.recv(2048)
if data:
sockets_map[self].send(data)

def handle_expt(self):
reply = '\x05\x05\x00\x01\x00\x00\x00\x00\x00\x00'
self.send(reply)
self.handle_close()

def handle_close(self):
asyncore.dispatcher_with_send.handle_close(self)
local = sockets_map.get(self)
if local:
del sockets_map[local]
if self in sockets_map:
del sockets_map[self]


server = Socks5Listen()
asyncore.loop()

一个socks5服务端会有3类socket。所以会有三个类。

小知识

LEGB作用域

asyncore中使用全局字典socket_map记录映射。想到全局就会想到global

1
2
3
4
5
6
a = 1
def main():
global a
a += 1
main()
print a

可以注意到这里必须要有global关键字,然而换成字典。为何就不需要global了。实际上这里要关注的是对象的id,对于数字、字符串这些对象是不可变的,然而字典对象是可变的。要改变不可变变量实际是重新赋值,所以对于改变外部不可变变量要用global,可变对象不需要

框架和库的区别

俗称好莱坞模式(Don’t call us, we will call you)。比如库函数sum你很容易想到给几个数字它返回给你和。框架嘛。就像上面的asyncore你需要的是继承dispatcher类然后重新一些方法。虽然它能达到目的,可是如果你不看源码或许你永远也无法明白框架在后面做了什么。从我个人的理解来看我是比较喜欢成熟的框架的。毕竟如果不用框架自己用库函数写最后也可能是一个框架,当然很可能是一个垃圾的框架:D

为什么多路IO复用一定是配合的非阻塞socket

妈蛋,这个问题各种说法真是太多了。想一想如果select返回给你一个fd可读,那么他一定是可读的。如果这样那么一般情况下我们用select配合阻塞socket也是可以的。可是现实不这样纸啊。unix手册中明确说明了不是这样的。所以工程上几乎没有见过配合阻塞socket使用的例子。另外asyncore种connect方法用的是connect_ex

相关资料

官方文档

知乎:为什么IO多路复用要搭配非阻塞socket