shadowsocks实现原理(二)

上一篇写了clowwindy很早的一个版本的实现,那个版本比较简陋,和现在最新的版本差别还是不小。目前的版本使用了更高效的IO复用,定义了状态机,代码也复杂一点。

“TCPRelay”

这个在绝对是shadowsocks最核心的模块。那么这个模块是做什么的呢?模块的中文是TCP流量重传

sslocal的作用是把浏览器传来的流量加密传给ssserver代理,并且把ssserver传来的流量解密后传给浏览器。

ssserver的作用是把sslocal传来的流量解密然后传给remote,比如google.com,把remote传来的数据加密传给sslocal

即tcprelay负责把两个sock的数据通过加密解密后传输。

tcprelay handler

sslocal举例,监听了1080端口后,就会起一个TCP Relay实例。浏览器访问google.com时,就会触发tcprelay生成一个TCP Relay Handler实例。源码的注释如下:

1
2
3
# for each opening port, we have a TCP Relay
# for each connection, we have a TCP Relay Handler to handle the connection

在tcprelay类中,handle_event为loop触发的,如果是accept事件,则建立连接,然后创建TCPRelayHandler,即handler是维护了accept的那个连接的通信。在hanlder内部,accept的连接一定是local sock的连接,所以在handler中连接了local sock和remote sock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def handle_event(self, sock, fd, event):
#...
# (bt) new connection trigger server_socket
if sock == self._server_socket:
if event & eventloop.POLL_ERR:
raise Exception('server_socket error')
try:
conn = self._server_socket.accept()
# (bt) 在handler内部实现把连接加入到loop中
# (bt) 创建一个TCPRelayHandler,负责这个新连接
TCPRelayHandler(self, self._fd_to_handlers,
self._eventloop, conn[0], self._config,
self._dns_resolver, self._is_local)
except (OSError, IOError) as e:
error_no = eventloop.errno_from_exception(e)
if error_no in (errno.EAGAIN, errno.EINPROGRESS,
errno.EWOULDBLOCK):
return
else:
shell.print_exception(e)
if self._config['verbose']:
traceback.print_exc()
else:
if sock:
# (bt) get the TCPRelayHandler instance to handle
handler = self._fd_to_handlers.get(fd, None)
if handler:
handler.handle_event(sock, event)
else:
logging.warn('poll removed fd')

其中self._fd_to_handlers作用是TCPRelay类能够维护多个TCPRelayHandler类的实例,在sock不为_server_socket即不是accept事件的时候,可以从_fd_to_handlers中取到对应的handler实例去处理。为什么在TCPRelay类处理socket事件呢?因为它是eventloop的handler。这种层层handle的关系也是网络编程中需要花时间去掌握的技巧。

状态机

在TCPRelayHandler中,local socket和remote sock的状态会发送改变,每个状态的下一步怎么做,这些通过状态机来维护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# for each handler, we have 2 sockets:
# local: connected to the client
# remote: connected to remote server
# for each handler, it could be at one of several stages:
# as sslocal:
# stage 0 SOCKS hello received from local, send hello to local
# stage 1 addr received from local, query DNS for remote
# stage 2 UDP assoc
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote
# as ssserver:
# stage 0 just jump to stage 1
# stage 1 addr received from local, query DNS for remote
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote

ss的状态机是到了一个阶段就把self._stage转变为下一个阶段。比如在解析好了addr后就立刻变成了DNS状态;DNS解析完成后立刻变成CONNECTING状态。

和状态机搭配使用的是数据流的上行和下行。

1
2
3
4
5
# for each handler, we have 2 stream directions:
# upstream: from client to server direction
# read local and write to remote
# downstream: from server to client direction
# read remote and write to local

stream分为STREAM_DOWNSTREAM_UP,每个流又分为WAIT_STATUS_READING,WAIT_STATUS_READING,WAIT_STATUS_READWRITING三种状态。

_update_stream函数中,把某个stream的状态更改后,根据更新到的status来修改epoll需要关注的事件。比如在当_downstream_statusWAIT_STATUS_WRITINGremote_socklocal_sock写的数据流在写的时候,_local_sock的epoll就监听可写。

1
2
3
4
5
6
7
8
9
if self._local_sock:
event = eventloop.POLL_ERR
if self._downstream_status & WAIT_STATUS_WRITING:
# downstream正在写,表示要写到local_sock中,关注local_sock的可写
event |= eventloop.POLL_OUT
if self._upstream_status & WAIT_STATUS_READING:
# upstream正在读,表示要读local_sock,然后把数据传给远方
event |= eventloop.POLL_IN
self._loop.modify(self._local_sock, event)

连接到remote

使用非阻塞socket进行连接,使用connect函数,然后加入epoll中,等到可写事件触发后,便确定是连接上了。

连接的地方为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
remote_sock = socket.socket(af, socktype, proto)
self._remote_sock = remote_sock
self._fd_to_handlers[remote_sock.fileno()] = self
remote_sock.setblocking(False)
remote_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
try:
remote_sock.connect((remote_addr, remote_port))
except (OSError, IOError) as e:
if eventloop.errno_from_exception(e) == \
errno.EINPROGRESS:
pass
# (bt) 建立与远端的连接先监听可读事件,因为要发送请求
self._loop.add(remote_sock,
eventloop.POLL_ERR | eventloop.POLL_OUT,
self._server)
# (bt) maybe not connected
self._stage = STAGE_CONNECTING

确定连接上的地方:

1
2
3
4
5
6
7
8
def _on_remote_write(self):
self._stage = STAGE_STREAM
if self._data_to_write_to_remote:
data = b''.join(self._data_to_write_to_remote)
self._data_to_write_to_remote = []
self._write_to_sock(data, self._remote_sock)
else:
self._update_stream(STREAM_UP, WAIT_STATUS_READING)

“EventLoop网络编程”

这是ss的网络通讯核心模块。它封装了epollkqueueselect三者,并且为后两者实现了类似epoll的接口。比如使用select.select封装的epoll接口。

封装epoll接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class SelectLoop(object):
def __init__(self):
self._r_list = set()
self._w_list = set()
self._x_list = set()
def poll(self, timeout):
r, w, x = select.select(self._r_list, self._w_list, self._x_list,
timeout)
results = defaultdict(lambda: POLL_NULL)
for p in [(r, POLL_IN), (w, POLL_OUT), (x, POLL_ERR)]:
for fd in p[0]:
results[fd] |= p[1]
return results.items()
def register(self, fd, mode):
if mode & POLL_IN:
self._r_list.add(fd)
if mode & POLL_OUT:
self._w_list.add(fd)
if mode & POLL_ERR:
self._x_list.add(fd)
def unregister(self, fd):
if fd in self._r_list:
self._r_list.remove(fd)
if fd in self._w_list:
self._w_list.remove(fd)
if fd in self._x_list:
self._x_list.remove(fd)
def modify(self, fd, mode):
self.unregister(fd)
self.register(fd, mode)
def close(self):
pass

其中MODE为POLL_IN,POLL_OUT

1
2
3
4
5
6
POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
POLL_HUP = 0x10
POLL_NVAL = 0x20

因为都是只有一个位为1的16进制数,所以可以做&操作,一定要求完全一致才可以&为1。

eventloop编程模式

采用业界比较通用的loop编程模式。即在一个线程内由epoll作为处理事件和分发事件的核心。如下server的代码,把tcp servers加入到epoll中,最后loop.run()启动服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def run_server():
def child_handler(signum, _):
logging.warn('received SIGQUIT, doing graceful shutting down..')
list(map(lambda s: s.close(next_tick=True),
tcp_servers + udp_servers))
signal.signal(getattr(signal, 'SIGQUIT', signal.SIGTERM),
child_handler)
def int_handler(signum, _):
sys.exit(1)
signal.signal(signal.SIGINT, int_handler)
try:
loop = eventloop.EventLoop()
dns_resolver.add_to_loop(loop)
list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))
daemon.set_user(config.get('user', None))
loop.run()
except Exception as e:
shell.print_exception(e)
sys.exit(1)

真正做事情的类需要实现handle_event方法,然后把自己加入到epoll中就可以了。比如asyncdns类,实现的handle_event就是解析dns的结果,并且触发tcprelayhandler设置的回调函数。

在ss中,tcprelay和asyncdns都实现了add_to_loop方法,

muduo使用的也是loop的编程模式。但是muduo使用的是多线程跑IO,而在ss中使用的是单线程,但是场景不同吧,muduo追求的是高性能网络库,可是ss对高并发连接没有那么多要求。

看后续有没有时间和心力为ss增加loop+多线程提高性能。

总结

最后,对读者感到抱歉,本人写文章不太流畅,只能贴一堆代码,会慢慢锻炼的 :(。