shadowsocks-manyuser(一下简称ss-manyuser)是ss的分支,同样用python写的。
本版本的ss-manyuser解读根据的是当时下载的版本,你接触的版本可能是不同的。
版本下载地址为:http://www.rffanlab.com/download/manyuser.zip
0x01
入口文件为servers.py
代码如下
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2015 mengskysama # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. #这里一大堆是license声明,其实就是说这玩意用的是Apache2的协议,你们随便用。没关系。 import sys #导入sys包 import os #导入os包 import logging #导入log包 import thread #导入多线程包 import config #导入配置文件 import signal #导入信号包 import time #导入时间包 if config.LOG_ENABLE: logging.basicConfig(format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%Y, %b %d %a %H:%M:%S',filename=config.LOG_FILE,level=config.LOG_LEVEL) #如果LOG_ENABLE这个配置设置为True 则logging.basicConfig设置格式 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../')) #sys.path 这个是一个数组存放着搜索包的路径。如果本脚本以全路径运行。那么这个脚本路径的父目录将会加入到sys.path的路径数组的第一个去。 from shadowsocks import shell, daemon, eventloop, tcprelay, udprelay, \ asyncdns, manager #这句从shadowsocks模块中导入了shell,等模块,这里不意义解读。在讲到其他文档时会解读 import manager #导入manager包 import config #导入配置文件 from dbtransfer import DbTransfer #从dbtransfer中导入DbTransfer类。 def handler_SIGQUIT(): return #定义一个空返回,留待程序猿写。 def main(): configer = { 'server': '%s' % config.SS_BIND_IP, 'local_port': 1081, 'port_password': { }, 'method': '%s' % config.SS_METHOD, 'manager_address': '%s:%s' % (config.MANAGE_BIND_IP, config.MANAGE_PORT), 'timeout': 185, # some protocol keepalive packet 3 min Eg bt 'fast_open': False, 'verbose': 1 } #定义了一个configer字典从config文件中加载各种参数 t = thread.start_new_thread(manager.run, (configer,)) #开启一个线程,从manager.run中跑起来。 time.sleep(1) #休眠1秒 t = thread.start_new_thread(DbTransfer.thread_db, ()) #定义了一个新线程,跑thread_db time.sleep(1) #休眠1秒 t = thread.start_new_thread(DbTransfer.thread_push, ()) #定义了一个新的线程来跑thread_push while True: time.sleep(100) #休眠100秒 if __name__ == '__main__': main() #如果命令行中直接执行的文件是此文件,则执行main()函数
main()函数分析可以得出,入口在manager.run,DbTransfer.thread_db,DbTransfer.thread_push中。当然其中还有一些配置,例如configer这个变量,是读取配置文件的。
所以现在还是先分析一下manager模块中的run
manager模块代码如下
#!/usr/bin/python # -*- coding: utf-8 -*- # # Copyright 2015 clowwindy # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. #还是一堆协议定义 from __future__ import absolute_import, division, print_function, \ with_statement #关于这4个import其他地方有更深入的解释,简而言之就是absolute_import就是当你的模块名和系统模块名冲突时,有线调用你的模块。 #这也是他为什么在主文件的sys.path中添加程序目录的原因。division就是使用新的出发特性"/"这个发挥了新的除法特性。本来如果 #你的分子分母都是整数的时候,那么这个除法运算会去余整除。但是import这玩意折后,就会直接算出小数。原来的特性用"//"来代替。 #print_function 就是加载新的print函数。with_statement 是使用with关键字 #URL为:http://www.cnblogs.com/ksedz/p/3190208.html import errno #导入errno包这个包定义了各种错误有URL可以详细参详 #http://www.cnblogs.com/Security-Darren/p/4168392.html import traceback #导入了traceback import socket #导入socket包,这个包可能是这个程序中最重要的包了。 import logging #导入日志包 import json #导入json包 import collections #导入collections包,来提供额外的数据类型。 from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell #从自有的shadowsocks包里导入common等模块 BUF_SIZE = 1506 #设置缓冲大小 STAT_SEND_LIMIT = 50 #设置发送链接数量 #下面就来到了真正的Manager类了。 class Manager(object): #定义了初始化函数,__init__函数是一个类在被新创建的时候就会被初始化的。 def __init__(self, config): self._config = config #定义了一个_config的类变量来接收外部传进来的config self._relays = {} # (tcprelay, udprelay) #定义了一个_relays变量(用途还没搞明白) self._loop = eventloop.EventLoop() #定义了一个_loop变量 (用途暂时没搞清楚) self._dns_resolver = asyncdns.DNSResolver() self._dns_resolver.add_to_loop(self._loop) self._statistics = collections.defaultdict(int) self._control_client_addr = None try: manager_address = config['manager_address'] if ':' in manager_address: addr = manager_address.rsplit(':', 1) addr = addr[0], int(addr[1]) addrs = socket.getaddrinfo(addr[0], addr[1]) if addrs: family = addrs[0][0] else: logging.error('invalid address: %s', manager_address) exit(1) else: addr = manager_address family = socket.AF_UNIX self._control_socket = socket.socket(family, socket.SOCK_DGRAM) self._control_socket.bind(addr) self._control_socket.setblocking(False) except (OSError, IOError) as e: logging.error(e) logging.error('can not bind to manager address') exit(1) self._loop.add(self._control_socket, eventloop.POLL_IN, self) # self._loop.add_periodic(self.handle_periodic) port_password = config['port_password'] del config['port_password'] for port, password in port_password.items(): a_config = config.copy() a_config['server_port'] = int(port) a_config['password'] = password self.add_port(a_config) def add_port(self, config): port = int(config['server_port']) servers = self._relays.get(port, None) if servers: logging.error("server already exists at %s:%d" % (config['server'], port)) return logging.info("adding server at %s:%d" % (config['server'], port)) t = tcprelay.TCPRelay(config, self._dns_resolver, False, self.stat_callback) u = udprelay.UDPRelay(config, self._dns_resolver, False, self.stat_callback) t.add_to_loop(self._loop) u.add_to_loop(self._loop) self._relays[port] = (t, u) def remove_port(self, config): port = int(config['server_port']) servers = self._relays.get(port, None) if servers: logging.info("removing server at %s:%d" % (config['server'], port)) t, u = servers t.close(next_tick=False) u.close(next_tick=False) del self._relays[port] else: logging.error("server not exist at %s:%d" % (config['server'], port)) def stat_port(self, config): port = int(config['server_port']) servers = self._relays.get(port, None) if servers: self._send_control_data(b'{"stat":"ok", "password":"%s"}' % servers[0]._config['password']) else: self._send_control_data(b'{"stat":"ko"}') def handle_event(self, sock, fd, event): if sock == self._control_socket and event == eventloop.POLL_IN: data, self._control_client_addr = sock.recvfrom(BUF_SIZE) parsed = self._parse_command(data) if parsed: command, config = parsed a_config = self._config.copy() if command == 'transfer': self.handle_periodic() else: if config: # let the command override the configuration file a_config.update(config) if 'server_port' not in a_config: logging.error('can not find server_port in config') else: if command == 'add': self.add_port(a_config) self._send_control_data(b'ok') elif command == 'remove': self.remove_port(a_config) self._send_control_data(b'ok') elif command == 'stat': self.stat_port(a_config) elif command == 'ping': self._send_control_data(b'pong') else: logging.error('unknown command %s', command) def _parse_command(self, data): # commands: # add: {"server_port": 8000, "password": "foobar"} # remove: {"server_port": 8000"} data = common.to_str(data) parts = data.split(':', 1) if len(parts) < 2: return data, None command, config_json = parts try: config = shell.parse_json_in_str(config_json) return command, config except Exception as e: logging.error(e) return None def stat_callback(self, port, data_len): self._statistics[port] += data_len def handle_periodic(self): r = {} i = 0 def send_data(data_dict): if data_dict: # use compact JSON format (without space) data = common.to_bytes(json.dumps(data_dict, separators=(',', ':'))) self._send_control_data(data) for k, v in self._statistics.items(): r[k] = v i += 1 # split the data into segments that fit in UDP packets if i >= STAT_SEND_LIMIT: send_data(r) r.clear() i = 0 if len(r) > 0: send_data(r) self._send_control_data('e') self._statistics.clear() def _send_control_data(self, data): if self._control_client_addr: try: self._control_socket.sendto(data, self._control_client_addr) except (socket.error, OSError, IOError) as e: error_no = eventloop.errno_from_exception(e) if sys.platform == "win32": if error_no in (errno.EAGAIN, errno.EINPROGRESS, errno.EWOULDBLOCK, errno.WSAEWOULDBLOCK): return elif error_no in (errno.EAGAIN, errno.EINPROGRESS, errno.EWOULDBLOCK): return else: shell.print_exception(e) if self._config['verbose']: traceback.print_exc() def run(self): self._loop.run() def run(config): Manager(config).run() def test(): import time import threading import struct from shadowsocks import encrypt logging.basicConfig(level=5, format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') enc = [] eventloop.TIMEOUT_PRECISION = 1 def run_server(): config = { 'server': '127.0.0.1', 'local_port': 1081, 'port_password': { '8381': 'foobar1', '8382': 'foobar2' }, 'method': 'aes-256-cfb', 'manager_address': '127.0.0.1:6001', 'timeout': 60, 'fast_open': False, 'verbose': 2 } manager = Manager(config) enc.append(manager) manager.run() t = threading.Thread(target=run_server) t.start() time.sleep(1) manager = enc[0] cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) cli.connect(('127.0.0.1', 6001)) # test add and remove time.sleep(1) cli.send(b'add: {"server_port":7001, "password":"asdfadsfasdf"}') time.sleep(1) assert 7001 in manager._relays data, addr = cli.recvfrom(1506) assert b'ok' in data cli.send(b'remove: {"server_port":8381}') time.sleep(1) assert 8381 not in manager._relays data, addr = cli.recvfrom(1506) assert b'ok' in data logging.info('add and remove test passed') # test statistics for TCP header = common.pack_addr(b'google.com') + struct.pack('>H', 80) data = encrypt.encrypt_all(b'asdfadsfasdf', 'aes-256-cfb', 1, header + b'GET /\r\n\r\n') tcp_cli = socket.socket() tcp_cli.connect(('127.0.0.1', 7001)) tcp_cli.send(data) tcp_cli.recv(4096) tcp_cli.close() data, addr = cli.recvfrom(1506) data = common.to_str(data) assert data.startswith('stat: ') data = data.split('stat:')[1] stats = shell.parse_json_in_str(data) assert '7001' in stats logging.info('TCP statistics test passed') # test statistics for UDP header = common.pack_addr(b'127.0.0.1') + struct.pack('>H', 80) data = encrypt.encrypt_all(b'foobar2', 'aes-256-cfb', 1, header + b'test') udp_cli = socket.socket(type=socket.SOCK_DGRAM) udp_cli.sendto(data, ('127.0.0.1', 8382)) tcp_cli.close() data, addr = cli.recvfrom(1506) data = common.to_str(data) assert data.startswith('stat: ') data = data.split('stat:')[1] stats = json.loads(data) assert '8382' in stats logging.info('UDP statistics test passed') manager._loop.stop() t.join() if __name__ == '__main__': test()
由于代码太长,下个文件分析将会在下一篇文章中提现。
未经允许不得转载:RffanLAB|Rffan实验室 » ss-manyuser 程序研究笔记[Last Update:2016-08-10]