ss-manyuser 程序研究笔记[Last Update:2016-08-10]

shadowsocks
shadowsocks-manyuser(一下简称ss-manyuser)是ss的分支,同样用python写的。

本版本的ss-manyuser解读根据的是当时下载的版本,你接触的版本可能是不同的。

版本下载地址为:http://www.rffanlab.com/download/manyuser.zip

0x01

入口文件为servers.py

代码如下

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2015 mengskysama
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

#这里一大堆是license声明,其实就是说这玩意用的是Apache2的协议,你们随便用。没关系。

import sys     #导入sys包
import os      #导入os包
import logging    #导入log包
import thread     #导入多线程包
import config    #导入配置文件
import signal    #导入信号包
import time      #导入时间包

if config.LOG_ENABLE:
    logging.basicConfig(format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',datefmt='%Y, %b %d %a %H:%M:%S',filename=config.LOG_FILE,level=config.LOG_LEVEL)

#如果LOG_ENABLE这个配置设置为True 则logging.basicConfig设置格式


sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../'))
#sys.path 这个是一个数组存放着搜索包的路径。如果本脚本以全路径运行。那么这个脚本路径的父目录将会加入到sys.path的路径数组的第一个去。
from shadowsocks import shell, daemon, eventloop, tcprelay, udprelay, \
    asyncdns, manager

#这句从shadowsocks模块中导入了shell,等模块,这里不意义解读。在讲到其他文档时会解读

import manager   #导入manager包
import config    #导入配置文件
from dbtransfer import DbTransfer  #从dbtransfer中导入DbTransfer类。

def handler_SIGQUIT():
    return
#定义一个空返回,留待程序猿写。

def main():
    configer = {
        'server': '%s' % config.SS_BIND_IP,
        'local_port': 1081,
        'port_password': {
        },
        'method': '%s' % config.SS_METHOD,
        'manager_address': '%s:%s' % (config.MANAGE_BIND_IP, config.MANAGE_PORT),
        'timeout': 185, # some protocol keepalive packet 3 min Eg bt
        'fast_open': False,
        'verbose': 1
    }
    #定义了一个configer字典从config文件中加载各种参数
    t = thread.start_new_thread(manager.run, (configer,))
    #开启一个线程,从manager.run中跑起来。
    time.sleep(1)
    #休眠1秒
    t = thread.start_new_thread(DbTransfer.thread_db, ())
    #定义了一个新线程,跑thread_db
    time.sleep(1)
    #休眠1秒
    t = thread.start_new_thread(DbTransfer.thread_push, ())
    #定义了一个新的线程来跑thread_push

    while True:
        time.sleep(100)
    #休眠100秒


if __name__ == '__main__':
    main()

#如果命令行中直接执行的文件是此文件,则执行main()函数

main()函数分析可以得出,入口在manager.run,DbTransfer.thread_db,DbTransfer.thread_push中。当然其中还有一些配置,例如configer这个变量,是读取配置文件的。

所以现在还是先分析一下manager模块中的run

manager模块代码如下

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2015 clowwindy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

#还是一堆协议定义

from __future__ import absolute_import, division, print_function, \
    with_statement
#关于这4个import其他地方有更深入的解释,简而言之就是absolute_import就是当你的模块名和系统模块名冲突时,有线调用你的模块。
#这也是他为什么在主文件的sys.path中添加程序目录的原因。division就是使用新的出发特性"/"这个发挥了新的除法特性。本来如果
#你的分子分母都是整数的时候,那么这个除法运算会去余整除。但是import这玩意折后,就会直接算出小数。原来的特性用"//"来代替。
#print_function 就是加载新的print函数。with_statement 是使用with关键字
#URL为:http://www.cnblogs.com/ksedz/p/3190208.html

import errno  #导入errno包这个包定义了各种错误有URL可以详细参详
#http://www.cnblogs.com/Security-Darren/p/4168392.html
import traceback #导入了traceback
import socket  #导入socket包,这个包可能是这个程序中最重要的包了。
import logging  #导入日志包
import json     #导入json包
import collections    #导入collections包,来提供额外的数据类型。

from shadowsocks import common, eventloop, tcprelay, udprelay, asyncdns, shell
#从自有的shadowsocks包里导入common等模块


BUF_SIZE = 1506  #设置缓冲大小
STAT_SEND_LIMIT = 50  #设置发送链接数量


#下面就来到了真正的Manager类了。
class Manager(object):
    #定义了初始化函数,__init__函数是一个类在被新创建的时候就会被初始化的。
    def __init__(self, config):
        self._config = config
        #定义了一个_config的类变量来接收外部传进来的config
        self._relays = {}  # (tcprelay, udprelay)
        #定义了一个_relays变量(用途还没搞明白)
        self._loop = eventloop.EventLoop()
        #定义了一个_loop变量 (用途暂时没搞清楚)
        self._dns_resolver = asyncdns.DNSResolver()
        self._dns_resolver.add_to_loop(self._loop)

        self._statistics = collections.defaultdict(int)
        self._control_client_addr = None
        try:
            manager_address = config['manager_address']
            if ':' in manager_address:
                addr = manager_address.rsplit(':', 1)
                addr = addr[0], int(addr[1])
                addrs = socket.getaddrinfo(addr[0], addr[1])
                if addrs:
                    family = addrs[0][0]
                else:
                    logging.error('invalid address: %s', manager_address)
                    exit(1)
            else:
                addr = manager_address
                family = socket.AF_UNIX
            self._control_socket = socket.socket(family,
                                                 socket.SOCK_DGRAM)
            self._control_socket.bind(addr)
            self._control_socket.setblocking(False)
        except (OSError, IOError) as e:
            logging.error(e)
            logging.error('can not bind to manager address')
            exit(1)
        self._loop.add(self._control_socket,
                       eventloop.POLL_IN, self)
        # self._loop.add_periodic(self.handle_periodic)
        port_password = config['port_password']
        del config['port_password']
        for port, password in port_password.items():
            a_config = config.copy()
            a_config['server_port'] = int(port)
            a_config['password'] = password
            self.add_port(a_config)

    def add_port(self, config):
        port = int(config['server_port'])
        servers = self._relays.get(port, None)
        if servers:
            logging.error("server already exists at %s:%d" % (config['server'],
                                                              port))
            return
        logging.info("adding server at %s:%d" % (config['server'], port))
        t = tcprelay.TCPRelay(config, self._dns_resolver, False,
                              self.stat_callback)
        u = udprelay.UDPRelay(config, self._dns_resolver, False,
                              self.stat_callback)
        t.add_to_loop(self._loop)
        u.add_to_loop(self._loop)
        self._relays[port] = (t, u)

    def remove_port(self, config):
        port = int(config['server_port'])
        servers = self._relays.get(port, None)
        if servers:
            logging.info("removing server at %s:%d" % (config['server'], port))
            t, u = servers
            t.close(next_tick=False)
            u.close(next_tick=False)
            del self._relays[port]
        else:
            logging.error("server not exist at %s:%d" % (config['server'],
                                                         port))

    def stat_port(self, config):
        port = int(config['server_port'])
        servers = self._relays.get(port, None)
        if servers:
            self._send_control_data(b'{"stat":"ok", "password":"%s"}' % servers[0]._config['password'])
        else:
            self._send_control_data(b'{"stat":"ko"}')

    def handle_event(self, sock, fd, event):
        if sock == self._control_socket and event == eventloop.POLL_IN:
            data, self._control_client_addr = sock.recvfrom(BUF_SIZE)
            parsed = self._parse_command(data)
            if parsed:
                command, config = parsed
                a_config = self._config.copy()
                if command == 'transfer':
                    self.handle_periodic()
                else:
                    if config:
                        # let the command override the configuration file
                        a_config.update(config)
                    if 'server_port' not in a_config:
                        logging.error('can not find server_port in config')
                    else:
                        if command == 'add':
                            self.add_port(a_config)
                            self._send_control_data(b'ok')
                        elif command == 'remove':
                            self.remove_port(a_config)
                            self._send_control_data(b'ok')
                        elif command == 'stat':
                            self.stat_port(a_config)
                        elif command == 'ping':
                            self._send_control_data(b'pong')
                        else:
                            logging.error('unknown command %s', command)

    def _parse_command(self, data):
        # commands:
        # add: {"server_port": 8000, "password": "foobar"}
        # remove: {"server_port": 8000"}
        data = common.to_str(data)
        parts = data.split(':', 1)
        if len(parts) < 2: return data, None command, config_json = parts try: config = shell.parse_json_in_str(config_json) return command, config except Exception as e: logging.error(e) return None def stat_callback(self, port, data_len): self._statistics[port] += data_len def handle_periodic(self): r = {} i = 0 def send_data(data_dict): if data_dict: # use compact JSON format (without space) data = common.to_bytes(json.dumps(data_dict, separators=(',', ':'))) self._send_control_data(data) for k, v in self._statistics.items(): r[k] = v i += 1 # split the data into segments that fit in UDP packets if i >= STAT_SEND_LIMIT:
                send_data(r)
                r.clear()
                i = 0
        if len(r) > 0:
            send_data(r)
        self._send_control_data('e')
        self._statistics.clear()

    def _send_control_data(self, data):
        if self._control_client_addr:
            try:
                self._control_socket.sendto(data, self._control_client_addr)
            except (socket.error, OSError, IOError) as e:
                error_no = eventloop.errno_from_exception(e)
                if sys.platform == "win32":
                    if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                                    errno.EWOULDBLOCK, errno.WSAEWOULDBLOCK):
                        return
                elif error_no in (errno.EAGAIN, errno.EINPROGRESS,
                                errno.EWOULDBLOCK):
                    return
                else:
                    shell.print_exception(e)
                    if self._config['verbose']:
                        traceback.print_exc()

    def run(self):
        self._loop.run()


def run(config):
    Manager(config).run()


def test():
    import time
    import threading
    import struct
    from shadowsocks import encrypt

    logging.basicConfig(level=5,
                        format='%(asctime)s %(levelname)-8s %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S')
    enc = []
    eventloop.TIMEOUT_PRECISION = 1

    def run_server():
        config = {
            'server': '127.0.0.1',
            'local_port': 1081,
            'port_password': {
                '8381': 'foobar1',
                '8382': 'foobar2'
            },
            'method': 'aes-256-cfb',
            'manager_address': '127.0.0.1:6001',
            'timeout': 60,
            'fast_open': False,
            'verbose': 2
        }
        manager = Manager(config)
        enc.append(manager)
        manager.run()

    t = threading.Thread(target=run_server)
    t.start()
    time.sleep(1)
    manager = enc[0]
    cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    cli.connect(('127.0.0.1', 6001))

    # test add and remove
    time.sleep(1)
    cli.send(b'add: {"server_port":7001, "password":"asdfadsfasdf"}')
    time.sleep(1)
    assert 7001 in manager._relays
    data, addr = cli.recvfrom(1506)
    assert b'ok' in data

    cli.send(b'remove: {"server_port":8381}')
    time.sleep(1)
    assert 8381 not in manager._relays
    data, addr = cli.recvfrom(1506)
    assert b'ok' in data
    logging.info('add and remove test passed')

    # test statistics for TCP
    header = common.pack_addr(b'google.com') + struct.pack('>H', 80)
    data = encrypt.encrypt_all(b'asdfadsfasdf', 'aes-256-cfb', 1,
                               header + b'GET /\r\n\r\n')
    tcp_cli = socket.socket()
    tcp_cli.connect(('127.0.0.1', 7001))
    tcp_cli.send(data)
    tcp_cli.recv(4096)
    tcp_cli.close()

    data, addr = cli.recvfrom(1506)
    data = common.to_str(data)
    assert data.startswith('stat: ')
    data = data.split('stat:')[1]
    stats = shell.parse_json_in_str(data)
    assert '7001' in stats
    logging.info('TCP statistics test passed')

    # test statistics for UDP
    header = common.pack_addr(b'127.0.0.1') + struct.pack('>H', 80)
    data = encrypt.encrypt_all(b'foobar2', 'aes-256-cfb', 1,
                               header + b'test')
    udp_cli = socket.socket(type=socket.SOCK_DGRAM)
    udp_cli.sendto(data, ('127.0.0.1', 8382))
    tcp_cli.close()

    data, addr = cli.recvfrom(1506)
    data = common.to_str(data)
    assert data.startswith('stat: ')
    data = data.split('stat:')[1]
    stats = json.loads(data)
    assert '8382' in stats
    logging.info('UDP statistics test passed')

    manager._loop.stop()
    t.join()

if __name__ == '__main__':
    test()

由于代码太长,下个文件分析将会在下一篇文章中提现。

未经允许不得转载:RffanLAB|Rffan实验室 » ss-manyuser 程序研究笔记[Last Update:2016-08-10]

赞 (1)

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址