问题导读
1.class CellsManager(manager.Manager)定义和实现了什么功能?
2.class _BaseMessageMethods(base.Base)的作用是什么?
这篇我将对nova-cell服务的源码进行解析。需要说明的是,这里我都是以OpenStack的Grizzly版本为例进行解析,在这个版本中,默认是不启动这个服务的,而且在具体的cell虚拟机建立应用中,调度器还只是以随机的方式来选择建立虚拟机实例的cell,在后续的版本中,具体的调度器算法将会改进。
1.nova-cell服务的源码结构图
2.nova-cell服务的源码结构解析
/nova/cells/driver.py
class BaseCellsDriver(object):
cells通讯驱动基类,这个类主要实现了消息消费者处理的相关方法;
def start_consumers(self, msg_runner):
启动处理消息的消费者服务;
def stop_consumers(self):
关闭处理消息的消费者服务;
def send_message_to_cell(self, cell_state, message):
发送消息到一个cell;
/nova/cells/manager.py
注:在这个文件中,只有一个类CellsManager,定义和实现了cell管理的API方法;在类CellsManager所定义的方法中主要分为两种类型,即针对特定cell的处理方法和针对所有cell的处理方法;
class CellsManager(manager.Manager):
这个类主要定义和实现了用于管理cell的API;
大多数的方法属于“有针对性”或者“广播”的消息处理方式,分别对应于路由信息到特定的cell和路由信息到多个cell之上;
def post_start_hook(self):
为通过RPC来处理cell间的通讯,启动了两个独立的消费者;
如果本cell有子cell;
通知子cell发送它们的capabilities值和capacities值;
并对子cell的所有父cell的capabilities值和capacities值相关信息进行更新操作;
如果本cell在cell树的底层(即没有子cell);
发送本cell的capabilities值和capacities值到所有父cell,并对所有父cell的相关信息进行更新操作;
def _update_our_parents(self, ctxt):
如果本cell在cell树的底层(即没有子cell),则更新本cell的所有父cell,应用本cell的capabilities值和capacity值;
发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;
发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;
def _heal_instances(self, ctxt):
为一些实例周期性的发送更新任务到父cell;
def _sync_instance(self, ctxt, instance):
广播instance_update或instance_destroy操作信息给父cell来执行;
def schedule_run_instance(self, ctxt, host_sched_kwargs):
选择一个合适的cell来建立新的实例,并转发相应的请求;
def get_cell_info_for_neighbors(self, _ctxt):
返回所有相邻cell的信息(本cell的所有父cell和所有子cell);
def run_compute_api_method(self, ctxt, cell_name, method_info, call):
在特定的cell中调用一个compute API方法;
运行变量compute_api指定的类中一个指定的方法;
def instance_update_at_top(self, ctxt, instance):
在top等级的cell上,更新实例;
def instance_destroy_at_top(self, ctxt, instance):
在cell树的顶层删除一个实例;
这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;
def instance_delete_everywhere(self, ctxt, instance, delete_type):
在每一个cell中调用compute API的delete()或者soft_delete()方法;
这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
所以,我们需要在所有的cell上运行这个方法;
def instance_fault_create_at_top(self, ctxt, instance_fault):
在顶层cell上建立一个实例的断点(???);
def bw_usage_update_at_top(self, ctxt, bw_update_info):
更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;
def sync_instances(self, ctxt, project_id, updated_since, deleted):
强制对所有实例实行同步操作;
def service_get_all(self, ctxt, filters):
返回在本cell中和所有子cell中的服务;
并把每个服务以及相关的计算节点都和cell相关联起来;
def service_get_by_compute_host(self, ctxt, host_name):
在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:
通过host_name获取cell_name的信息;
根据compute host信息获取相关服务;
把获取的服务和计算节点和cell关联起来;
def proxy_rpc_to_manager(self, ctxt, topic, rpc_message, call, timeout):
为给定的compute topic代理PRC;
def task_log_get_all(self, ctxt, task_name, period_beginning, period_ending, host=None, state=None):
从所有的cell或者特定的一个cell的数据库中获取任务日志;
def compute_node_get(self, ctxt, compute_id):
在一个特定的cell上,通过ID值获取计算节点;
并把计算节点和这个cell相关联起来;
def compute_node_get_all(self, ctxt, hypervisor_match=None):
返回所有cell中的计算节点列表;
def compute_node_stats(self, ctxt):
通过各个cell的各自计算操作,实现获取所有cell上的各个资源参数的总和;
def actions_get(self, ctxt, cell_name, instance_uuid):
为给定的实例获取所有的实例操作;
def action_get_by_request_id(self, ctxt, cell_name, instance_uuid, request_id):
通过request_id和instance_uuid为给定的实例获取操作信息;
def action_events_get(self, ctxt, cell_name, action_id):
在cell_name指定的cell上,通过action id获取相关事件信息;
def consoleauth_delete_tokens(self, ctxt, instance_uuid):
在API cell中为指定的实例删除consoleauth令牌;
def validate_console_port(self, ctxt, instance_uuid, console_port, console_type):
验证子cell中计算节点的控制台端口;
/nova/cells/messaging.py
在这个文件中,有如下些类:
class _BaseMessage(object):
cell通信模块的基类,主要定义和实现了消息队列和消息响应相关的处理方法;
下面的三个实现通信模块的类,分别对应三种cell处理的消息类型:
class _TargetedMessage(_BaseMessage):
这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是特定的cell的情况;
class _BroadcastMessage(_BaseMessage):
这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是所有的cell的情况;
class _ResponseMessage(_TargetedMessage):
这个类继承自类_TargetedMessage,实现通信模块的若干方法,针对于执行操作后返回的响应信息的处理;
class _BaseMessageMethods(base.Base):
实现所有处理cell方法的基类;
下面三个实现处理消息方法的类,分别对应三种cell处理的消息类型:
class _ResponseMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现了针对处理响应信息方法;
class _TargetedMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是特定的cell情况;
class _BroadcastMessageMethods(_BaseMessageMethods):
这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是所有的cell情况;
class MessageRunner(object):
这个类实现了若干方法,分别对应CellsManager类(管理cell方法API)中的方法,根据不同的消息类型(特定cell、所有cell和响应消息)进行封装,用于从上述的类_ResponseMessageMethods、_TargetedMessageMethods和_BroadcastMessageMethods中调用对应的方法,实现对不同的消息类型的处理。
来看类中具体的方法:
class _BaseMessage(object):
def _append_hop(self):
加入跳信息到routing_path;
def _at_max_hop_count(self,do_raise=True):
检测本cell是否处于最大的跳数;
def _process_locally(self):
方法确定了我们应该在本cell中执行处理这个消息的方法;
通过类MessageRunner调用适当的方法用来处理消息;
捕获响应或者异常,并把捕获的响应或者异常编码成类Response的实例对象,并返回;
def _setup_response_queue(self):
通过类MessageRunner中的方法_setup_response_queue,建立一个响应队列;
def _cleanup_response_queue(self):
调用类MessageRunner中的方法_cleanup_response_queue来删除一个响应队列;
def _wait_for_json_responses(self,num_responses=1):
在允许等待的时间内,获取响应列表responses,并返回;
def _send_json_responses(self,json_responses, neighbor_only=False, fanout=False):
执行发送响应列表到目标cell;
Targeted型的消息只有一个响应,而Broadcast型的消息可能有多个响应;
如果本cell是消息的源,则响应将会从self.process()被返回;
def _send_response(self,response, neighbor_only=False):
发送执行消息处理方法的结果的响应对象到源cell;
如果本cell就是源cell,则直接获取这个响应对象;
def _send_response_from_exception(self,exc_info):
从sys.exc_info()返回一个异常,编码成Response类型响应,并发送它;
def _to_dict(self):
转换消息到字典格式;
def to_json(self):
转换消息到JSON格式,用于发送到相邻的cell;
def source_is_us(self):
判断本cell是否建立了这个消息,即是否是这个消息的源;
def process(self):
执行处理消息的方法;
class _TargetedMessage(_BaseMessage):
def _get_next_hop(self):
返回本cell的下一跳(hop),如果下一跳(hop)就是当前的cell,则返回none;
def process(self):
执行处理针对性的消息的方法,并返回执行方法所获取的结果响应到源cell;
如果本cell就是源cell,则还要实现在允许等待的时间内,获取远程执行消息处理方法返回的响应列表;
根据所处理消息的类型不同,这个响应列表中的元素可以是一个也可以是多个;
class _BroadcastMessage(_BaseMessage):
def _get_next_hops(self):
设置下一层次的跳(hops),并返回跳(hops)的数目;
def _send_to_cells(self,target_cells):
发送信息到多个cell;
def _send_json_responses(self,json_responses):
发送信息的响应列表;
def process(self):
运行广播消息程序;
class _ResponseMessage(_TargetedMessage):
def process(self):
执行一个响应消息;
class _BaseMessageMethods(base.Base):
def task_log_get_all(self,
message, task_name, period_beginning, period_ending, host, state):
从数据库获取任务日志;
class _ResponseMessageMethods(_BaseMessageMethods):
def parse_responses(self,message, orig_message, responses):
添加响应到响应队列;
class _TargetedMessageMethods(_BaseMessageMethods):
def schedule_run_instance(self,
message, host_sched_kwargs):
父cell通知本cell来调度新的实例用于建立;
def run_compute_api_method(self,message, method_info):
运行变量compute_api指定的类中一个指定的方法;
def update_capabilities(self,
message, cell_name, capabilities):
一个子cell通知我们关于它的capabilities值;
def update_capacities(self,message, cell_name, capacities):
一个子cell通知我们关于它的capacity值;
def announce_capabilities(self,message):
一个父cell通知本cell发送我们的capabilities值;
所以执行发送capabilities值到父cell的操作,并更新父cell的capabilities值;
def announce_capacities(self,message):
一个父cell通知本cell发送我们的capacity值;
所以执行发送capabilities值到父cell的操作,并更新父cell的capacity值;
def service_get_by_compute_host(self,message, host_name):
为计算主机返回服务入口;
根据compute host信息获取相关服务;
def proxy_rpc_to_manager(self,message, host_name, rpc_message, topic, timeout):
为给定的compute topic代理PRC;
def compute_node_get(self,message, compute_id):
通过ID值获取相应的计算节点;
def actions_get(self, message,instance_uuid):
为给定的实例获取所有的实例操作;
def action_get_by_request_id(self,message, instance_uuid, request_id):
通过request_id和instance_uuid为给定的实例获取操作信息;
def action_events_get(self,message, action_id):
通过action_id获取事件信息;
def validate_console_port(self,message, instance_uuid, console_port, console_type):
验证子cell中计算节点的控制台端口;
class _BroadcastMessageMethods(_BaseMessageMethods):
def _at_the_top(self):
确定是否是API级别的cell;
def instance_update_at_top(self,
message, instance, **kwargs):
如果是top级别的cell,则更新数据库中的实例;
def instance_destroy_at_top(self,message, instance, **kwargs):
如果本cell是一个顶层的cell,则从DB中删除指定的实例;
def instance_delete_everywhere(self,message, instance, delete_type, **kwargs):
在每一个cell中调用compute API的delete()或者soft_delete()方法;
这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
所以,我们需要在所有的cell上运行这个方法;
def instance_fault_create_at_top(self,message, instance_fault, **kwargs):
如果我们是顶层cell,则执行从DB删除一个实例的操作;
def bw_usage_update_at_top(self,message, bw_update_info, **kwargs):
更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;
def _sync_instance(self,ctxt, instance):
实例数据的同步;
def sync_instances(self,message, project_id, updated_since, deleted, **kwargs):
实例数据的同步实现;
def service_get_all(self,message, filters):
获取message和filters所限制的所有的服务;
def compute_node_get_all(self,message, hypervisor_match):
返回本cell中的所有计算节点;
def compute_node_stats(self,message):
从本cell上所有的计算节点获取各个资源参数信息,并求和,即获取本cell上所有计算节点的资源参数分别的总和;
def consoleauth_delete_tokens(self,message, instance_uuid):
在API cell中为指定的实例删除consoleauth令牌;
class MessageRunner(object):
def _process_message_locally(self,message):
消息处理进程会调用这个方法,当确定了应该在本cell上处理的时候;(也就是确定了本cell是目标cell)
寻找到基于消息类型的合适方法,并调用它;
如果需要的话,调用者捕获异常,并返回结果到cell;
def _put_response(self,response_uuid, response):
添加响应到响应队列;
def _setup_response_queue(self,message):
设置一个eventlet队列,用于存储获取的响应;
响应是被目标cell以_ResponseMessage的形式发送回源cell的;
def _cleanup_response_queue(self,message):
当正在接受响应或者已经时间超时的时候,停止跟踪响应队列;
def _create_response_message(self,ctxt, direction, target_cell, response_uuid, response_kwargs, **kwargs):
建立一个ResponseMessage类的对象;
def message_from_json(self,json_message):
转换一个JSON格式的消息到一个适当的消息实例;
def ask_children_for_capabilities(self,ctxt):
通知子cell发送它们的capabilities值;
并对父cell的capabilities值相关信息进行更新操作;
这个方法将会在nova-cell服务启动时调用;
def ask_children_for_capacities(self,ctxt):
通知子cell发送它们的capacities值;
并对父cell的capacities值相关信息进行更新操作;
这个方法将会在nova-cell服务启动时调用;
def tell_parents_our_capabilities(self,ctxt):
发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;
def tell_parents_our_capacities(self,ctxt):
发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;
def schedule_run_instance(self,ctxt, target_cell, host_sched_kwargs):
这个方法被调度器所调用,通知子cell来调度一个新的实例用于建立;
def run_compute_api_method(self,ctxt, cell_name, method_info, call):
在特定的cell中调用一个compute API方法;
运行变量compute_api指定的类中一个指定的方法;
def instance_update_at_top(self,ctxt, instance):
在top等级的cell上,更新实例;
这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_update_at_top;
def instance_destroy_at_top(self,ctxt, instance):
在cell树的顶层删除一个实例,这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;
def instance_delete_everywhere(self,ctxt, instance, delete_type):
在每一个cell中调用compute API的delete()或者soft_delete()方法;
这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;
所以,我们需要在所有的cell上运行这个方法;
def instance_fault_create_at_top(self,ctxt, instance_fault):
在顶层cell上建立一个实例的断点(???);
def bw_usage_update_at_top(self,ctxt, bw_update_info):
更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;
def sync_instances(self,ctxt, project_id, updated_since, deleted):
强制对所有实例实行同步操作;
def service_get_all(self,ctxt, filters=None):
获取所有的服务;
def service_get_by_compute_host(self,ctxt, cell_name, host_name):
在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:
通过host_name获取cell_name的信息;
根据compute host信息获取相关服务;
把获取的服务和计算节点和cell关联起来;
def proxy_rpc_to_manager(self,ctxt, cell_name, host_name, topic, rpc_message, call, timeout):
为给定的compute topic代理PRC,并获取方法的返回响应;
def task_log_get_all(self,ctxt, cell_name, task_name, period_beginning, period_ending, host=None, state=None):
从所有的cell或者特定的一个cell的数据库中获取任务日志;
返回响应对象的列表;
def compute_node_get_all(self,ctxt, hypervisor_match=None):
返回本cell的所有子cell的计算节点列表;
这里是采用广播的方式,获取每一个cell下的计算节点;
而且这里获取的方向是'down',所以实现的就是获取所有子cell的相关计算节点;
def compute_node_stats(self,ctxt):
采用广播的方式实现每一个cell计算自己的各个资源参数的总和;
而且这里获取的方向是'down',所以实现的就是获取所有子cell的各个资源参数的总和;
def compute_node_get(self,ctxt, cell_name, compute_id):
在一个特定的cell上,通过ID值获取一个计算节点;
def actions_get(self, ctxt, cell_name,
instance_uuid):
为给定的实例获取所有的实例操作;
def action_get_by_request_id(self,ctxt, cell_name, instance_uuid, request_id):
通过request_id和instance_uuid为给定的实例获取操作信息;
def action_events_get(self,ctxt, cell_name, action_id):
在cell_name指定的cell上,通过action id获取相关事件信息;
def consoleauth_delete_tokens(self,ctxt, instance_uuid):
在API cell中为指定的实例删除consoleauth令牌;
方法在cell树中的执行方向是'up';
def validate_console_port(self,ctxt, cell_name, instance_uuid, console_port, console_type):
验证子cell中计算节点的控制台端口;
def get_message_types():
获取处理消息的类型,为指定类型消息、广播类型消息或者响应类型消息;
class Response(object):
处理cell返回的响应方法类;
def to_json(self):
def from_json(cls, json_message):
def value_or_raise(self):
/nova/cells/rpc_driver.py
cell RPC通信驱动,通过RPC实现cell的通信;
class CellsRPCDriver(driver.BaseCellsDriver):
这个类继承自类BaseCellsDriver,是通过RPC实现cell间通信的驱动类;
主要用于处理通信中的消费者相关;
def _start_consumer(self,dispatcher, topic):
启动两个RPC消费者,topic类型和fanout类型;
建立绿色线程,实现处理所有的队列/消费者信息;
def start_consumers(self,msg_runner):
为通过RPC来处理cell间的通讯,启动两个RPC消费者,topic类型消费者和fanout类型消费者;
def stop_consumers(self):
关闭RPC消费者;
def send_message_to_cell(self,cell_state, message):
调用类IntercellRPCAPI下的方法send_message_to_cell,来实现发送消息到cell;
class InterCellRPCAPI(rpc_proxy.RpcProxy):
这个类继承自类RpcProxy,主要实现cell间通过RPC实现通信的客户端方法;
def _get_server_params_for_cell(next_hop):
为rpc的调用获取服务的相关参数;
def send_message_to_cell(self,cell_state, message):
实现发送消息到cell;
class InterCellRPCDispatcher(object):
这个类主要实现了RPC的分发程序类,用来处理从其它cell接收的信息;
def process_message(self,_ctxt, message):
实现从其它的cell接收消息;
/nova/cells/rpcapi.py
文件中定义了一个类CellsAPI,即Cell RPC API客户端类,用来通过RPC实现对远程cell处理方法的调用;
class CellsAPI(rpc_proxy.RpcProxy):
即Cell RPC API客户端类;
def cast_compute_api_method(self,ctxt, cell_name, method, *args, **kwargs):
在一个特定的cell中,以cast的方式发送消息,实现远程节点执行指定的compute
API方法;
def call_compute_api_method(self,ctxt, cell_name, method, *args, **kwargs):
在一个特定的cell中,以call的方式发送消息,实现远程节点执行指定的compute
API方法;
def schedule_run_instance(self,ctxt, **kwargs):
调度一个新的实例用于建立;
def nstance_update_at_top(self,ctxt, instance):
在API的级别更新实例信息;
def instance_destroy_at_top(self,ctxt, instance):
在API这个级别销毁要建立的实例;
def instance_delete_everywhere(self,ctxt, instance, delete_type):
在每个cell上都运行删除一个指定实例的操作(因为不知道实例位于哪个cell上);
def instance_fault_create_at_top(self,ctxt, instance_fault):
在cell树的顶层建立一个实例故障;
def bw_usage_update_at_top(self,ctxt, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):
广播信息实现节点带宽使用率的更新;
def instance_info_cache_update_at_top(self,ctxt, instance_info_cache):
广播通知实例缓存信息的改变;
def get_cell_info_for_neighbors(self,ctxt):
获取本cell相邻的所有cell的信息;
def sync_instances(self,ctxt, project_id=None, updated_since=None, deleted=False):
要求所有cell实现实例数据的同步;
def service_get_all(self,ctxt, filters=None):
def service_get_by_compute_host(self,ctxt, host_name):
def proxy_rpc_to_manager(self,ctxt, rpc_message, topic, call=False, timeout=None):
def task_log_get_all(self,ctxt, task_name, period_beginning, period_ending, host=None, state=None):
def compute_node_get(self,ctxt, compute_id):
在指定的cell中获取计算节点;
def compute_node_get_all(self,ctxt, hypervisor_match=None):
获取所有cell中的由hypervisor过滤的计算节点列表;
def compute_node_stats(self,ctxt):
返回所有cell中计算节点的统计信息;
def actions_get(self, ctxt,instance):
def action_get_by_request_id(self,ctxt, instance, request_id):
def action_events_get(self,ctxt, instance, action_id):
def consoleauth_delete_tokens(self,ctxt, instance_uuid):
def validate_console_port(self,ctxt, instance_uuid, console_port, console_type):
/nova/cells/scheduler.py
这个文件中主要实现了类CellsScheduler,即cell调度器实现类;
主要针对的就是在cell中建立虚拟机实例方面的应用;
class CellsScheduler(base.Base):
这个类继承自Base类,即cell调度器实现类;
def _create_instances_here(self,ctxt, request_spec):
在本cell上建立虚拟机实例;
def _create_action_here(self,ctxt, instance_uuids):
获取instance_uuids中各个实例的启动信息;
并把这些信息加入到要启动的实例属性中;
def _get_possible_cells(self):
获取所有合适的cell;
def _run_instance(self,message, host_sched_kwargs):
尝试调度实例;
如果没有合适的cell使用,则引发异常;
def run_instance(self,message, host_sched_kwargs):
选择一个cell,在它上面我们要建立一个新的实例;
/nova/cells/state.py
这个文件中主要定义了两个类CellState和CellStateManager,主要实现的时对cell一些状态信息进行处理的操作,方法都比较简单,这里就不进行一一解析了;
/nova/cells/utils.py
这个文件中主要定义了一些cell实用的操作方法,方法都比较简单,这里就不进行一一解析了;
到目前为止,nova-cell服务的源码架构进行了简单的分析,下一篇博客中我将以几个例子对nova-cell服务的源码架构和nova-cell服务的具体实现流程进行解析。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
|
|