tntzbzc 发表于 2014-11-20 15:34:54

OpenStack Swift源码分析(3)----swift服务启动源码分析之三

本帖最后由 pig2 于 2014-11-21 15:08 编辑

问题导读

1、如何初始化类GreenPool?
2、方法的具体功能是如何实现的?
3、日志和负载请求处理器是如何设置的?

static/image/hrline/4.gif




本篇博文开始以/usr/bin/swift-proxy-server为例,详细分析源码,来进一步解析swift中的服务启动过程;
首先来看swift-proxy-server代码:
from swift.common.utils import parse_options
from swift.common.wsgi import run_wsgi
if __name__ == '__main__':
    conf_file, options = parse_options()
   
    # conf_file:/etc/swift/proxy-server.conf;
    # 'proxy-server';
    # default_port=8080;
    # options = {'verbose':False};
    run_wsgi(conf_file, 'proxy-server', default_port=8080, **options)
进一步看方法run_wsgi:
def run_wsgi(conf_file, app_section, *args, **kwargs):
    """
    Runs the server using the specified number of workers.
    用指定数目的worker运行指定服务;
   
    # 调用之一传进来的参数:
    # conf_file:/etc/swift/proxy-server.conf;
    # app_section = 'proxy-server';
    # default_port = 8080;
    # options = {'verbose':False};
    """
   
    # 负载配置,设置日志和负载请求处理器;
    # 加载通用设置从conf;
    # 设置日志记录器logger;
    # 加载请求处理器app;
    # conf_file:/etc/swift/proxy-server.conf;
    # app_section = 'proxy-server';
    try:
      (app, conf, logger, log_name) = init_request_processor(conf_file, app_section, *args, **kwargs)
    except ConfigFileError, e:
      print e
      return
    # 绑定socket到配置文件conf中规定的ip:port;
    # 获取sock;
    sock = get_socket(conf, default_port=kwargs.get('default_port', 8080))
   
    drop_privileges(conf.get('user', 'swift'))
   
    reserve = int(conf.get('fallocate_reserve', 0))
    if reserve > 0:
      utils.FALLOCATE_RESERVE = reserve
    # 日志记录未处理的异常,关闭标准输入输出,获取标准输出和异常信息;
    capture_stdio(logger)
    def run_server():
      # 获取默认的HTTP版本;
      wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
      # Turn off logging requests by the underlying WSGI software.
      # 关闭日志记录请求,通过相关的WSGI软件;
      wsgi.HttpProtocol.log_request = lambda *a: None
      # Redirect logging other messages by the underlying WSGI software.
      # 重定向log的其它信息,通过相关的WSGI软件;
      wsgi.HttpProtocol.log_message = lambda s, f, *a: logger.error('ERROR WSGI: ' + f % a)
      # 获取TIMEOUT值;
      wsgi.WRITE_TIMEOUT = int(conf.get('client_timeout') or 60)
      eventlet.hubs.use_hub(get_hub())
      eventlet.patcher.monkey_patch(all=False, socket=True)
      
      eventlet_debug = config_true_value(conf.get('eventlet_debug', 'no'))
      #### eventlet_debug
      eventlet.debug.hub_exceptions(eventlet_debug)
      
      app = loadapp('config:%s' % conf_file,global_conf={'log_name': log_name})
      
      # 初始化GreenPool类,实例化GreenPool类的对象;
      pool = GreenPool(size=1024)
      
      # server:从提供的服务器套接字启动一个的WSGI服务器处理请求;
      # 这个方法将会一直循环;
      # 当服务退出之后,sock对象将会关闭;
      # 但是底层描述符将会继续开放;
      # sock:获取的sock;
      try:
            wsgi.server(sock, app, NullLogger(), custom_pool=pool)
      except socket.error, err:
            if err != errno.EINVAL:
                raise
      # 等待池中的所有绿色线程都运行完成;
      pool.waitall()
    worker_count = int(conf.get('workers', '1'))
    # Useful for profiling .
    if worker_count == 0:
      run_server()
      return
    def kill_children(*args):
      """
      杀死整个进程组;
      """
      logger.error('SIGTERM received')
      signal.signal(signal.SIGTERM, signal.SIG_IGN)
      running = False
      os.killpg(0, signal.SIGTERM)
    def hup(*args):
      """
      关闭服务,但是允许请求运行完成;
      """
      logger.error('SIGHUP received')
      signal.signal(signal.SIGHUP, signal.SIG_IGN)
      running = False
    running =
   
    # kill_children:杀死整个进程组;
    signal.signal(signal.SIGTERM, kill_children)
   
    signal.signal(signal.SIGHUP, hup)
   
    children = []
    while running:
      while len(children)

从头来看这个方法的代码:
    # 负载配置,设置日志和负载请求处理器;
    # 加载通用设置从conf;
    # 设置日志记录器logger;
    # 加载请求处理器app;
    # conf_file:/etc/swift/proxy-server.conf;
    # app_section = 'proxy-server';
    try:
      (app, conf, logger, log_name) = init_request_processor(conf_file, app_section, *args, **kwargs)
    except ConfigFileError, e:
      print e
      return这部分代码实现的是根据参数和配置文件设置日志记录器和加载通用设置等功能;
    # 绑定socket到配置文件conf中规定的ip:port;
    # 获取sock;
    sock = get_socket(conf, default_port=kwargs.get('default_port', 8080))这是比较重要的语句部分,实现了获取socket,从conf中获取规定的ip:port值,并与socket进行绑定;
这部分代码的具体实现现在先不进行分析,有时间可以好好深入看一下具体是怎么实现功能的;
继续来看代码:
<blockquote>drop_privileges(conf.get('user', 'swift'))

reserve = int(conf.get('fallocate_reserve', 0))
if reserve > 0:
utils.FALLOCATE_RESERVE = reserve
# 日志记录未处理的异常,关闭标准输入输出,获取标准输出和异常信息;

      pool.waitall()这个方法实现了启动服务,首先是一些通信参数的配置,这里应用了两个python网络编程框架----wsgi和eventlet,这里不进行深入的分析解读,我将会在其他博文中解析学习这两个框架;
这个方法的核心部分是语句:wsgi.server(sock, app, NullLogger(), custom_pool=pool),实现了方法的具体功能,即启动服务,我们来进一步看方法
server:
def server(sock, site,
         log=None,
         environ=None,
         max_size=None,
         max_http_version=DEFAULT_MAX_HTTP_VERSION,
         protocol=HttpProtocol,
         server_event=None,
         minimum_chunk_size=None,
         log_x_forwarded_for=True,
         custom_pool=None,
         keepalive=True,
         log_output=True,
         log_format=DEFAULT_LOG_FORMAT,
         url_length_limit=MAX_REQUEST_LINE,
         debug=True):
    """
    从给定的server socket,启动一个wsgi服务;
    这个方法将会一直循环执行;
    sock对象将会在服务退出后关闭;
   
    param sock:Server socket,必须已经绑定到端口,并已经监听;
    param site: WSGI的应用方法;
    param log: 类文件的对象,定义了信息要写入的日志;
    param environ:运行环境的参数信息;
    param max_size:针对一个服务,在任意时刻开启客户端最大的连接数目;
    param max_http_version:HTTP版本;
    param protocol:HTTP协议类的实例化对象;(不建议使用???)
    param server_event:用来收集Server对象;(不建议使用???)
    param minimum_chunk_size:对于http块的字节的最小尺寸;
    param log_x_forwarded_for:如果为True,记录x-forwarded-for的头文件内容和实际的客户端ip地址;
    param custom_pool:一个自定义的GreenPool类的实例对象,用来spawn客户端绿色线程;如果已经存在,max_size将会被忽略;
    param keepalive:这个参数如果设置为Flase,在服务中禁用keepalives;在完成服务请求之后,所有的连接都会关闭;
    param log_output:这是一个布尔类型值,表明服务是否记录数据;
    param log_format:一个python格式字符串,被用作模板来生成日志行;可以格式化成以下值:client_ip, date_time, request_line, status_code, body_length, wall_seconds;
    param url_length_limit:这个参数定义了请求url的最大长度值;
    param debug:当定义为真,如果服务器发送异常回溯给客户端,标志为 500 error;当定义为假,服务器将回应空的响应;
    """
   
    # 进行类Server的初始化,获取类Server的实例化对象;
    # 各个参数的值示例:
    # sock =
    # sock.getsockname() = ('0.0.0.0', 8080)
    # site =
    # log =
    # environ = None
    # max_http_version = HTTP/1.1
    # protocol = eventlet.wsgi.HttpProtocol
    # minimum_chunk_size = None
    # log_x_forwarded_for = True
    # keepalive = True
    # log_output = True
    # log_format = %(client_ip)s - - [%(date_time)s] "%(request_line)s" %(status_code)s %(body_length)s %(wall_seconds).6f
    # url_length_limit = 8192
    # debug = True
    serv = Server(sock, sock.getsockname(),
                  site, log,
                  environ=environ,
                  max_http_version=max_http_version,
                  protocol=protocol,
                  minimum_chunk_size=minimum_chunk_size,
                  log_x_forwarded_for=log_x_forwarded_for,
                  keepalive=keepalive,
                  log_output=log_output,
                  log_format=log_format,
                  url_length_limit=url_length_limit,
                  debug=debug)
   
    # server_event:用来收集server对象,不建议使用;
    if server_event is not None:
      server_event.send(serv)
   
    # max_size:针对一个服务,在任意时刻开启客户端最大的连接数目;
    # 如果这个值还没有设置,则设置为默认值1024;
    if max_size is None:
      max_size = DEFAULT_MAX_SIMULTANEOUS_REQUESTS
   
    # custom_pool:一个自定义的GreenPool类的实例对象,用来spawn客户端绿色线程;
    # 如果还没有对类GreenPool进行实例化,获取GreenPool类的实例化对象,初始化连接数为max_size;
    if custom_pool is not None:
      pool = custom_pool
    else:
      pool = greenpool.GreenPool(max_size)
      
    # sock.getsockname() = ('0.0.0.0', 8080)
    # 分别获取host和port;
    try:
      # host = '0.0.0.0';
      # port = 8080;
      host, port = sock.getsockname()[:2]
      
      # port = ':8080';
      port = ':%s' % (port, )
      # hasattr:判断一个对象是否含有某种属性;
      # 判断sock是否具有'do_handshake'属性;
      # HTTPS是HTTP的安全模式,在HTTP下加入SSL层,HTTPS的安全基础就是SSL,有'do_handshake'属性说明符合SSL协议,即应用HTTPS协议;
      # 总体来说,即根据sock是否有'do_handshake'属性,判断所采用的通信协议;
      if hasattr(sock, 'do_handshake'):
            scheme = 'https'
            if port == ':443':
                port = ''
      else:
            scheme = 'http'
            if port == ':80':
                port = ''
      serv.log.write("(%s) wsgi starting up on %s://%s%s/\n" % (serv.pid, scheme, host, port))
      while True:
            try:
                # 获取client_socket;
                client_socket = sock.accept()
                if debug:
                  serv.log.write("(%s) accepted %r\n" % (serv.pid, client_socket))
               
                try:
                  pool.spawn_n(serv.process_request, client_socket)
                except AttributeError:
                  warnings.warn("wsgi's pool should be an instance of " \
                        "eventlet.greenpool.GreenPool, is %s. Please convert your"\
                        " call site to use GreenPool instead" % type(pool),
                        DeprecationWarning, stacklevel=2)
                  pool.execute_async(serv.process_request, client_socket)
            except ACCEPT_EXCEPTIONS, e:
                if get_errno(e) not in ACCEPT_ERRNO:
                  raise
            except (KeyboardInterrupt, SystemExit):
                serv.log.write("wsgi exiting\n")
                break
    finally:
      try:
            # 关闭sock;
            sock.close()
      except socket.error, e:
            if get_errno(e) not in BROKEN_SOCK:
                traceback.print_exc()

这个方法首先做的是对Server这个类进行初始化,获取其实例化对象;
serv = Server(sock, sock.getsockname(),
                  site, log,
                  environ=environ,
                  max_http_version=max_http_version,
                  protocol=protocol,
                  minimum_chunk_size=minimum_chunk_size,
                  log_x_forwarded_for=log_x_forwarded_for,
                  keepalive=keepalive,
                  log_output=log_output,
                  log_format=log_format,
                  url_length_limit=url_length_limit,
                  debug=debug)class Server(BaseHTTPServer.HTTPServer):
    def __init__(self,
               socket,
               address,
               app,
               log=None,
               environ=None,
               max_http_version=None,
               protocol=HttpProtocol,
               minimum_chunk_size=None,
               log_x_forwarded_for=True,
               keepalive=True,
               log_output=True,
               log_format=DEFAULT_LOG_FORMAT,
               url_length_limit=MAX_REQUEST_LINE,
               debug=True):
      self.outstanding_requests = 0
      self.socket = socket
      self.address = address
      if log:
            self.log = log
      else:
            self.log = sys.stderr
      self.app = app
      self.keepalive = keepalive
      self.environ = environ
      self.max_http_version = max_http_version
      self.protocol = protocol
      self.pid = os.getpid()
      if minimum_chunk_size is not None:
            protocol.minimum_chunk_size = minimum_chunk_size
      self.log_x_forwarded_for = log_x_forwarded_for
      self.log_output = log_output
      self.log_format = log_format
      self.url_length_limit = url_length_limit
      self.debug = debug继续看代码:
    # custom_pool:一个自定义的GreenPool类的实例对象,用来spawn客户端绿色线程;
    # 如果还没有对类GreenPool进行实例化,获取GreenPool类的实例化对象,初始化连接数为max_size;
    if custom_pool is not None:
      pool = custom_pool
    else:
      pool = greenpool.GreenPool(max_size)

这段代码的作用是初始化类GreenPool,获取它的实例化对象;
首先判断是否已经获取其实例化对象,如果没有则获取GreenPool类的实例化对象,初始化连接数为max_size;
# host = '0.0.0.0';
# port = 8080;
host, port = sock.getsockname()[:2]通过sock获取host和port的值;
      # hasattr:判断一个对象是否含有某种属性;
      # 判断sock是否具有'do_handshake'属性;
      # HTTPS是HTTP的安全模式,在HTTP下加入SSL层,HTTPS的安全基础就是SSL,有'do_handshake'属性说明符合SSL协议,即应用HTTPS协议;
      # 总体来说,即根据sock是否有'do_handshake'属性,判断所采用的通信协议;
      if hasattr(sock, 'do_handshake'):
            scheme = 'https'
            if port == ':443':
                port = ''
      else:
            scheme = 'http'
            if port == ':80':
                port = ''

这段代码通过判断sock是否具有'do_handshake'属性,以此来选择所要采用的通信协议;
      while True:
            try:
                # 获取client_socket;
                client_socket = sock.accept()
                if debug:
                  serv.log.write("(%s) accepted %r\n" % (serv.pid, client_socket))
               
                try:
                  pool.spawn_n(serv.process_request, client_socket)
                except AttributeError:
                  warnings.warn("wsgi's pool should be an instance of " \
                        "eventlet.greenpool.GreenPool, is %s. Please convert your"\
                        " call site to use GreenPool instead" % type(pool),
                        DeprecationWarning, stacklevel=2)
                  pool.execute_async(serv.process_request, client_socket)
            except ACCEPT_EXCEPTIONS, e:
                if get_errno(e) not in ACCEPT_ERRNO:
                  raise
            except (KeyboardInterrupt, SystemExit):
                serv.log.write("wsgi exiting\n")

                break这段代码主要做了以下几件事:
(1)通过sock获取client_socket;
(2)通过调用方法spawn_n建立一个绿色线程来运行指定的方法;
(3)处理异常情况;
我们来看一下方法spawn_n:
def spawn_n(self, function, *args, **kwargs):
      """
      建立一个绿色线程来运行function;
      方法spawn_n和方法spawn是相似的,不同在于方法spawn_n没有返回值;
      方法function的运行结果也不返回;
      """
      # 获取当前的线程;
      current = greenthread.getcurrent()
      
      # 在协程中运行方法function;
      if self.sem.locked() and current in self.coroutines_running:
            # _spawn_n_impl:依据所带的参数在协程中运行function方法;
            self._spawn_n_impl(function, args, kwargs, None)
      else:
            self.sem.acquire()
            # spawn_n:在绿色进程中应用自己的具体参数值运行方法function;
            # 这个方法和方法spawn是类似的,但是不同的地方就是返回一个greenlet对象;
            # 这个方法要比方法spawn高效,当没有kwargs参数的时候运行速度是最快的;
            g = greenthread.spawn_n(self._spawn_n_impl, function, args, kwargs, True)
            if not self.coroutines_running:
                self.no_coros_running = event.Event()
            # 把g加入到当前运行的协程集合coroutines_running中;
            self.coroutines_running.add(g)这个方法中核心的部分就是语句:self._spawn_n_impl(function, args, kwargs, None),它实现了在一个绿色线程中运行方法function的功能;

我们再来看方法_spawn_n_impl:
    def _spawn_n_impl(self, func, args, kwargs, coro):
      try:
            # 依据所带的参数在协程中运行func方法;
            try:
                func(*args, **kwargs)
            except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
                raise
            except:
                if DEBUG:
                  traceback.print_exc()
      finally:
            # 释放协程,保证core为None;
            if coro is None:
                return
            else:
                coro = greenthread.getcurrent()
                self._spawn_done(coro)

可见这个方法实现了在协程(绿色线程)中运行func方法的功能;
其中的func方法具体是什么呢?
我们来调试运行一下,得到调试示例:
func.__name__ = _replicate_object
func = >

方法_replicate_object实现的是复制数据库信息,具体方法的实现这里先不进行解析;
具体为什么要调用这个方法,现在还没有弄明白;
回到方法server,看最后一部分代码:
    finally:
      try:
            # 关闭sock;
            sock.close()
      except socket.error, e:
            if get_errno(e) not in BROKEN_SOCK:
                traceback.print_exc()无论前面的程序运行是否正常,在方法的最后,都要关闭sock;

至此方法server解析完成;
我们回到方法run_wsgi,继续看代码。在方法run_server的最后有一条语句:
pool.waitall()实现的功能是等待池中的所有绿色线程都运行完成;
在方法run_wsgi的末尾一部分代码:
    # 关闭sock;
    greenio.shutdown_safe(sock)
    sock.close()
    logger.notice('Exited')

这部分代码实现的功能是确保正常关闭sock;
至此方法run_wsgi解析完成,也就是服务swift-proxy-server的的启动过程解析完成。但是其中比较重要的一个方法_replicate_object理解的还不是很清楚,我也会在后续的学习当中继续关注这个问题,并完善此篇博文。
博文中不免有不正确的地方,欢迎朋友们不吝批评指正,谢谢大家了!


相关内容:
OpenStack Swift源码分析(1)----swift服务启动源码分析之一

OpenStack Swift源码分析(2)----swift服务启动源码分析之二

OpenStack Swift源码分析(3)----swift服务启动源码分析之三


页: [1]
查看完整版本: OpenStack Swift源码分析(3)----swift服务启动源码分析之三