分享

cinder服务启动源码分析

shihailong123 发表于 2014-11-22 19:22:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 19086
本帖最后由 xioaxu790 于 2014-11-22 20:20 编辑
问题导读

1、cinder服务启动流程是怎样的呢?
2、如何对wsgi.Server类的对象进行初始化操作?
3、服务的启动,是通过什么方法实现的?





我们知道在/cinder/bin目录下的文件是各个服务的启动脚本,这里我们以cinder-all为例,来解析cinder服务启动流程。
首先来看代码/cinder/bin/cinder-all:
  1. import eventlet
  2. eventlet.monkey_patch()
  3. import os
  4. import sys
  5. from oslo.config import cfg
  6. # 路径的标准化,消除双斜线等等;
  7. possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
  8.                                    os.pardir,
  9.                                    os.pardir))
  10. if os.path.exists(os.path.join(possible_topdir, "cinder", "__init__.py")):
  11.     sys.path.insert(0, possible_topdir)
  12. from cinder.openstack.common import gettextutils
  13. gettextutils.install('cinder', lazy=False)
  14. from cinder.common import config  # Need to register global_opts
  15. from cinder.openstack.common import log as logging
  16. from cinder import service
  17. from cinder import utils
  18. from cinder import version
  19. CONF = cfg.CONF
  20. if __name__ == '__main__':
  21.     CONF(sys.argv[1:], project='cinder',
  22.          version=version.version_string())
  23.     # 日志相关;
  24.     logging.setup("cinder")
  25.     LOG = logging.getLogger('cinder.all')
  26.     utils.monkey_patch()
  27.     servers = []
  28.     # cinder-api
  29.     try:
  30.         servers.append(service.WSGIService('osapi_volume'))
  31.     except (Exception, SystemExit):
  32.         LOG.exception(_('Failed to load osapi_volume'))
  33.     for binary in ['cinder-volume', 'cinder-scheduler']:
  34.         try:
  35.             servers.append(service.Service.create(binary=binary))
  36.         except (Exception, SystemExit):
  37.             LOG.exception(_('Failed to load %s'), binary)
  38.     service.serve(*servers)
  39.     service.wait()
复制代码

在代码中我们看到 分别为'osapi_volume','cinder-volume'和'cinder-scheduler'初始化了三个服务类的对象,分别是cinder.service.WSGIService object,cinder.service.Service object和cinder.service.Service object,我们先来看看类cinder.service.WSGIService的对象初始化方法:
  1. class WSGIService(object):
  2.     """
  3.     Provides ability to launch API from a 'paste' configuration.
  4.     实现启动API的类;
  5.     """
  6.     def __init__(self, name, loader=None):
  7.         """
  8.         Initialize, but do not start the WSGI server.
  9.         类的初始化方法,但是不启动服务;
  10.         :param name: The name of the WSGI server given to the loader.
  11.         给定的要加载的服务的名称;
  12.         :param loader: Loads the WSGI application using the given name.
  13.         :returns: None
  14.         """
  15.         # 给定的要加载的服务的名称;
  16.         # osapi_volume
  17.         self.name = name
  18.         # 根据给定的服务名称导入对应的管理类;
  19.         self.manager = self._get_manager()
  20.         self.loader = loader or wsgi.Loader()
  21.         self.app = self.loader.load_app(name)
  22.         # 获取主机host;
  23.         self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0")
  24.         # 获取主机端口;
  25.         self.port = getattr(CONF, '%s_listen_port' % name, 0)
  26.         # Server:管理WSGI服务的类;
  27.         self.server = wsgi.Server(name,
  28.                                   self.app,
  29.                                   host=self.host,
  30.                                   port=self.port)
复制代码

可见这里进一步进行了wsgi.Server类的对象初始化操作,来看代码:
  1. class Server(object):
  2.     """
  3.     Server class to manage a WSGI server, serving a WSGI application.
  4.     管理WSGI服务的类;
  5.     """
  6.     default_pool_size = 1000
  7.     def __init__(self, name, app, host=None, port=None, pool_size=None,
  8.                  protocol=eventlet.wsgi.HttpProtocol):
  9.         """
  10.         Initialize, but do not start, a WSGI server.
  11.         类的初始化方法,但并不启动WSGI服务;
  12.         :param name: Pretty name for logging.
  13.         :param app: The WSGI application to serve.
  14.         :param host: IP address to serve the application.
  15.         :param port: Port number to server the application.
  16.         :param pool_size: Maximum number of eventlets to spawn concurrently.
复制代码


        启动最大的线程的数目;
  1.         :returns: None
  2.         """
  3.         self.name = name
  4.         self.app = app
  5.         self._host = host or "0.0.0.0"
  6.         self._port = port or 0
  7.         self._server = None
  8.         self._socket = None
  9.         self._protocol = protocol
  10.         # GreenPool:绿色线程池;
  11.         self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
  12.         self._logger = logging.getLogger("eventlet.wsgi.server")
  13.         self._wsgi_logger = logging.WritableLogger(self._logger)
复制代码

可见在这里我们可以指定或者默认的建立具有若干绿色线程的绿色线程池。
我们再来看语句servers.append(service.Service.create(binary=binary))中的create方法,具体来看代码:
  1.     def create(cls, host=None, binary=None, topic=None, manager=None,
  2.                report_interval=None, periodic_interval=None,
  3.                periodic_fuzzy_delay=None, service_name=None):
  4.         """
  5.         Instantiates class and passes back application object.
  6.         :param host: defaults to CONF.host
  7.         :param binary: defaults to basename of executable
  8.         :param topic: defaults to bin_name - 'cinder-' part
  9.         :param manager: defaults to CONF._manager
  10.         :param report_interval: defaults to CONF.report_interval
  11.         :param periodic_interval: defaults to CONF.periodic_interval
  12.         :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
  13.         """
  14.         if not host:
  15.             host = CONF.host
  16.         if not binary:
  17.             binary = os.path.basename(inspect.stack()[-1][1])
  18.         if not topic:
  19.             topic = binary
  20.         if not manager:
  21.             subtopic = topic.rpartition('cinder-')[2]
  22.             manager = CONF.get('%s_manager' % subtopic, None)
  23.         if report_interval is None:
  24.             report_interval = CONF.report_interval
  25.         if periodic_interval is None:
  26.             periodic_interval = CONF.periodic_interval
  27.         if periodic_fuzzy_delay is None:
  28.             periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
  29.             
  30.         service_obj = cls(host, binary, topic, manager,
  31.                           report_interval=report_interval,
  32.                           periodic_interval=periodic_interval,
  33.                           periodic_fuzzy_delay=periodic_fuzzy_delay,
  34.                           service_name=service_name)
复制代码



可见方法create在进行了一系列变量的初始化操作后,应用这些变量作为参数进行类service.Service的初始化对象获取操作。
输出示例:
  1. host = node01.shinian.com
  2. binary = cinder-volume
  3. topic = cinder-volume
  4. manager = cinder.volume.manager.VolumeManager
  5. report_interval = 10
  6. periodic_interval = 60
  7. periodic_fuzzy_delay = 60
  8. cls =
  9. service_obj =
复制代码


我们再回到脚本文件cinder-all中,来看语句service.serve(*servers)中的方法service,在这里把之前获取的三个服务类的初始化对象作为参数传入进来;
  1. def serve(*servers):
  2.     global _launcher
  3.     if not _launcher:
  4.         _launcher = Launcher()
  5.     for server in servers:
  6.         _launcher.launch_server(server)来看语句_launcher.launch_server(server)中的方法launch_server:
  7.     def launch_server(self, server):
  8.         """
  9.         Load and start the given server.
  10.         加载并启动给定的服务;
  11.         :param server: The server you would like to start.
  12.         :returns: None
  13.         """
  14.         gt = eventlet.spawn(self.run_server, server)
  15.         self._services.append(gt)
复制代码

在这个方法中,有两个点需要关注,首先是方法spawn,以及方法self.run_server,首先来看方法spawn的实现:
  1. spawn = greenthread.spawndef spawn(func, *args, **kwargs):
  2.     """
  3.     Create a greenthread to run ``func(*args, **kwargs)``.  Returns a
  4.     :class:`GreenThread` object which you can use to get the results of the
  5.     call.
  6.    
  7.     Execution control returns immediately to the caller; the created greenthread
  8.     is merely scheduled to be run at the next available opportunity.  
  9.     Use :func:`spawn_after` to  arrange for greenthreads to be spawned
  10.     after a finite delay.
  11.     """
  12.     hub = hubs.get_hub()
  13.     g = GreenThread(hub.greenlet)
  14.     hub.schedule_call_global(0, g.switch, func, args, kwargs)
  15.     return g
复制代码

可见方法spawn实现的功能就是建立一个新的绿色线程用来运行变量func所指定的方法。
再来看方法self.run_server的实现:
  1. def run_server(server):
  2.         """
  3.         Start and wait for a server to finish.
  4.         启动并等待服务的结束;
  5.         :param service: Server to run and wait for.
  6.         :returns: None
  7.         """
  8.         server.start()
  9.         server.wait()
复制代码

在这里调用了server对应的start方法,来实现服务的启动,因为我们传进来的是分属两种不同类的实例化对象的三个服务,所以这里也对应的调用了不同类下的start方法;
首先来看cinder.service.WSGIService所对应的start方法的实现:
  1.     def start(self):
  2.         """
  3.         Start serving this service using loaded configuration.
  4.         应用加载的配置来启动指定的服务;
  5.         Also, retrieve updated port number in case '0' was passed in, which
  6.         indicates a random port should be used.
  7.         :returns: None
  8.         """
  9.         if self.manager:
  10.             self.manager.init_host()
  11.         self.server.start()
  12.         self.port = self.server.port    def start(self, backlog=128):
  13.         """
  14.         Start serving a WSGI application.
  15.         启动一个WSGI应用;
  16.         :param backlog: Maximum number of queued connections.
  17.         队列连接的最大数目;
  18.         :returns: None
  19.         :raises: cinder.exception.InvalidInput
  20.         """
  21.         if backlog     def _start(self):
  22.         """
  23.         Run the blocking eventlet WSGI server.
  24.         :returns: None
  25.         """
  26.         eventlet.wsgi.server(self._socket,
  27.                              self.app,
  28.                              protocol=self._protocol,
  29.                              custom_pool=self._pool,
  30.                              log=self._wsgi_logger)
复制代码

我们可以看到,这里实现的就是启动了'osapi_volume'所指定的阻塞式eventlet WSGI服务。
我们再来看cinder.service.Service所对应的start方法的实现:
   
  1. def start(self):
  2.         version_string = version.version_string()
  3.         LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'),
  4.                   {'topic': self.topic, 'version_string': version_string})
  5.         self.model_disconnected = False
  6.         # get_admin_context:获取admin用户的上下文信息;
  7.         ctxt = context.get_admin_context()
  8.         try:
  9.             # 通过主机节点名称和binary获取服务的状态信息;
  10.             # 注:binary为指定的服务名称;
  11.             service_ref = db.service_get_by_args(ctxt,
  12.                                                  self.host,
  13.                                                  self.binary)
  14.             self.service_id = service_ref['id']
  15.         except exception.NotFound:
  16.             self._create_service_ref(ctxt)
  17.         # create_connection:建立到用于rpc的信息总线的连接;
  18.         # 建立一个新的连接,或者从连接池中获取一个连接;
  19.         self.conn = rpc.create_connection(new=True)
  20.         LOG.debug(_("Creating Consumer connection for Service %s") %
  21.                   self.topic)
  22.         # 为管理器获取RPC调度器;
  23.         rpc_dispatcher = self.manager.create_rpc_dispatcher()
  24.         # Share this same connection for these Consumers
  25.         # 为消费者共享同样的连接;
  26.         # 建立不同的消息消费者;
  27.         # 创建以服务的topic为路由键的消费者;
  28.         self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
  29.         # 创建以服务的topic和本机名为路由键的消费者(基于topic&host,可用来接收定向消息);
  30.         node_topic = '%s:%s' % (self.topic, self.host)
  31.         self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
  32.         # fanout直接投递消息,不进行匹配,速度最快(fanout类型,可用于接收广播消息);
  33.         self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
  34.         # Consume from all consumers in a thread
  35.         # 启动消费者线程;
  36.         # consume_in_thread用evelent.spawn创建一个协程一直运行;
  37.         # 等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
  38.         self.conn.consume_in_thread()
  39.         self.manager.init_host()
  40.         if self.report_interval:
  41.             pulse = utils.LoopingCall(self.report_state)
  42.             pulse.start(interval=self.report_interval,
  43.                         initial_delay=self.report_interval)
  44.             self.timers.append(pulse)
  45.         if self.periodic_interval:
  46.             if self.periodic_fuzzy_delay:
  47.                 initial_delay = random.randint(0, self.periodic_fuzzy_delay)
  48.             else:
  49.                 initial_delay = None
  50.             periodic = utils.LoopingCall(self.periodic_tasks)
  51.             periodic.start(interval=self.periodic_interval,
  52.                            initial_delay=initial_delay)
  53.             self.timers.append(periodic)
复制代码
这个方法主要实现了获取所有服务、创建到RPC的连接,创建不同类型的消息消费者,启动消费者线程用来执行未来服务运行过程中所要获取到的消息。这个方法在我们之前的博客中已经进行过详细的解析,具体请看OpenStack建立实例完整过程源码详细分析(14)----依据AMQP通信架构实现消息接收机制解析之一
在这里我们就可以看到,因为我们这里启动了两个cinder.service.Service object类型的服务,即'cinder-volume'和'cinder-scheduler',由上面的代码可以知道,每个服务都会启动和拥有属于自己的消息队列,用于处理和执行将要获取到的消息 。

最后我们再回到服务启动脚本文件cinder-all中,在程序的最后service.wait()语句中调用了方法wait,实现了等待所有服务运行结束或停止,然后关闭RPC协议框架,我们来看代码实现:
  1. def wait():
  2.     LOG.debug(_('Full set of CONF:'))
  3.     for flag in CONF:
  4.         flag_get = CONF.get(flag, None)
  5.         # hide flag contents from log if contains a password
  6.         # should use secret flag when switch over to openstack-common
  7.         if ("_password" in flag or "_key" in flag or
  8.                 (flag == "sql_connection" and "mysql:" in flag_get)):
  9.             LOG.debug(_('%s : FLAG SET ') % flag)
  10.         else:
  11.             LOG.debug('%(flag)s : %(flag_get)s' %
  12.                       {'flag': flag, 'flag_get': flag_get})
  13.     try:
  14.         _launcher.wait()
  15.     except KeyboardInterrupt:
  16.         _launcher.stop()
  17.     rpc.cleanup()
复制代码

好啦,现在我们以cinder-all为例,简要的进行了cinder服务启动流程的解析,谢谢大家!!!!



作者:溜溜小哥

本文转载自:http://blog.csdn.net/gaoxingnengjisuan














没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条