分享

openstack Neutron分析(2)—— neutron-l3-agent

pig2 发表于 2014-10-15 11:40:43 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 65970

问题导读:
1.neutron l3像租户提供了什么功能?
2.l3通过API来创建router或则floating ip包含几个步骤?











一.Layer-3 Networking Extension
neutron l3作为一种API扩展,向租户提供了路由和NAT功能。
l3扩展包含两种资源:
  • router:在不同内部子网中转发数据包;通过指定内部网关做NAT。每一个子网对应router上的一个端口,这个端口的ip就是子网的网关。
  • floating ip:代表一个外部网络的IP,映射到内部网络的端口上。当网络的router:external属性为True时,floating ip才能定义。
这两种资源都对应有不同的属性。支持CRUD操作。
二.代码分析
既然neutron中支持了l3扩展,那么怎样通过API来创建router或者floating ip,以提供路由以及NAT的功能的呢?
主要有以下几个步骤:
1.租户通过horizon,nova命令或者自定义的脚本,发送与router或floating ip相关的操作。
2.这些API请求发送到neutron server,通过neutron提供的API extension相对应。
3.实现这些API extension的操作,比如说create_router,则由具体的plugin和database来共同完成。
4.plugin会通过rpc机制与计算网络节点上运行的l3 agent来执行l3 转发和NAT的功能。
l3.py
源代码目录:neutron/extensions/l3.py
  1. class RouterPluginBase(object):
  2.     @abc.abstractmethod
  3.     def create_router(self, context, router):
  4.         pass
  5.     @abc.abstractmethod
  6.     def update_router(self, context, id, router):
  7.         pass
  8.     @abc.abstractmethod
  9.     def get_router(self, context, id, fields=None):
  10.         pass
  11.     @abc.abstractmethod
  12.     def delete_router(self, context, id):
  13.         pass
  14.     @abc.abstractmethod
  15.     def get_routers(self, context, filters=None, fields=None,
  16.                     sorts=None, limit=None, marker=None, page_reverse=False):
  17.         pass
  18.     @abc.abstractmethod
  19.     def add_router_interface(self, context, router_id, interface_info):
  20.         pass
  21.     @abc.abstractmethod
  22.     def remove_router_interface(self, context, router_id, interface_info):
  23.         pass
  24.     @abc.abstractmethod
  25.     def create_floatingip(self, context, floatingip):
  26.         pass
  27.     @abc.abstractmethod
  28.     def update_floatingip(self, context, id, floatingip):
  29.         pass
  30.     @abc.abstractmethod
  31.     def get_floatingip(self, context, id, fields=None):
  32.         pass
  33.     @abc.abstractmethod
  34.     def delete_floatingip(self, context, id):
  35.         pass
  36.     @abc.abstractmethod
  37.     def get_floatingips(self, context, filters=None, fields=None,
  38.                         sorts=None, limit=None, marker=None,
  39.                         page_reverse=False):
  40.         pass
  41.     def get_routers_count(self, context, filters=None):
  42.         raise NotImplementedError()
  43.     def get_floatingips_count(self, context, filters=None):
  44.         raise NotImplementedError()
复制代码

这个模块中,class RouterPluginBase定义了plugin中需要实现的方法。

l3_db.py
源码目录:/neutron/db/l3_db.py
这个模块中,class L3_NAT_db_mixin继承了上面l3模块的class RouterPluginBase,因此在RouterPluginBase中定义的抽象方法就要在这里实现了。
类注释中写道,Mixin class to add L3/NAT router methods to db_plugin_base_v2。
在类的开始,有这样一段代码:
  1. @property
  2.     def l3_rpc_notifier(self):
  3.         if not hasattr(self, '_l3_rpc_notifier'):
  4.             self._l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
  5.         return self._l3_rpc_notifier
复制代码

说明l3_rpc_notifier,是模块l3_rpc_agent_api中类L3AgentNotifyAPI的一个实例。

l3_rpc_agent_api模块源码在/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py。
  1. class L3AgentNotifyAPI(n_rpc.RpcProxy):
  2.     """API for plugin to notify L3 agent."""
  3.     BASE_RPC_API_VERSION = '1.0'
  4.     def __init__(self, topic=topics.L3_AGENT):
  5.         super(L3AgentNotifyAPI, self).__init__(
  6.             topic=topic, default_version=self.BASE_RPC_API_VERSION)
  7.     def _notification_host(self, context, method, payload, host):
  8.         """Notify the agent that is hosting the router."""        ...
  9.     def _agent_notification(self, context, method, router_ids,
  10.                             operation, data):
  11.         """Notify changed routers to hosting l3 agents."""
  12.         ...
  13. def _notification(self, context, method, router_ids, operation, data):
  14.         """Notify all the agents that are hosting the routers."""
  15.         ...def _notification_fanout(self, context, method, router_id):
  16.         """Fanout the deleted router to all L3 agents."""
  17.         ...def agent_updated(self, context, admin_state_up, host):
  18.         self._notification_host(context, 'agent_updated',
  19.                                 {'admin_state_up': admin_state_up},
  20.                                 host)
  21.     def router_deleted(self, context, router_id):
  22.         self._notification_fanout(context, 'router_deleted', router_id)
  23.     def routers_updated(self, context, router_ids, operation=None, data=None):
  24.         if router_ids:
  25.             self._notification(context, 'routers_updated', router_ids,
  26.                                operation, data)
  27.     def router_removed_from_agent(self, context, router_id, host):
  28.         self._notification_host(context, 'router_removed_from_agent',
  29.                                 {'router_id': router_id}, host)
  30.     def router_added_to_agent(self, context, router_ids, host):
  31.         self._notification_host(context, 'router_added_to_agent',
  32.                                 router_ids, host)
复制代码

这个类主要用于plugin发送rpc通知给l3 agent。
rpc处理
在上面的l3_db.py中,会将涉及router和floating ip的处理读取或者写入到数据中。但是还有一些操作不仅如此,还需要通过rpc(通过调用l3_rpc_agent_api中的函数,这些操作大部分会去 调用routers_updated),通知l3 agent进行处理。
这些需要处理的地方包括:update_router,delete_router,add_router_interface,remove_router_interface,create_floatingip,update_floatingip,delete_floatingip,disassociate_floatingips 等操作。
l3_agent.py
源码目录:neutron/agent/l3_agent.py
l3 agent使用Linux ip协议栈和iptables来实现router和NAT的功能。
这时候,如果在horizon的界面创建一个路由,不进行任何操作的话,plugin只会操作数据库,l3 agent不会作处理。而当update router,如设置外部网关时,l3才会去处理请求。
l3 agent使用service框架启动服务,其manager类为neutron.agent.l3_agent.L3NATAgentWithStateReport,该类继承自L3NATAgent,主要实现了基于rpc的_report_state向PluginReportStateAPI(topic为q-plugin)汇报状态信息,这些信息由各个plugin来处理(比如ml2中通过start_rpc_listeners来注册该topic的消费者)。
L3NATAgent类是最主要的L3 Manager类,该类继承关系为 class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager)FWaaSL3AgentRpcCallback主要是加载防火墙驱动,并创建RPC与Plugin通信。
再来看L3NATAgent的创建过程:
  1. def __init__(self, host, conf=None):
  2.         if conf:
  3.             self.conf = conf
  4.         else:
  5.             self.conf = cfg.CONF
  6.         self.root_helper = config.get_root_helper(self.conf)
  7.         self.router_info = {}
  8.         self._check_config_params()
  9.         try:
  10.         # import driver from l3_agent.init
  11.             # Example: interface_driver = neutron.agent.linux.interface.OVSInterfaceDriver
  12.             self.driver = importutils.import_object(
  13.                 self.conf.interface_driver,
  14.                 self.conf
  15.             )
  16.         except Exception:
  17.             msg = _("Error importing interface driver "
  18.                     "'%s'") % self.conf.interface_driver
  19.             LOG.error(msg)
  20.             raise SystemExit(1)
  21.         self.context = context.get_admin_context_without_session()
  22.        # Agent side of the l3 agent RPC API, topic is 'q-l3-plugin'
  23.         self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
  24.         self.fullsync = True
  25.         self.updated_routers = set()
  26.         self.removed_routers = set()
  27.         self.sync_progress = False
  28.         self._clean_stale_namespaces = self.conf.use_namespaces
  29.         # Start RPC Loop
  30.         self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
  31.             self._rpc_loop)
  32.         self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
  33.         super(L3NATAgent, self).__init__(conf=self.conf)
  34.         self.target_ex_net_id = None
复制代码

上面的self.plugin_rpc会处理neutron-server转发过来的请求,这个请求是通过service_plugins的方式处理的:

  1. neutron.service_plugins =
  2.     dummy = neutron.tests.unit.dummy_plugin:DummyServicePlugin
  3.     router = neutron.services.l3_router.l3_router_plugin:L3RouterPlugin
  4.     firewall = neutron.services.firewall.fwaas_plugin:FirewallPlugin
  5.     lbaas = neutron.services.loadbalancer.plugin:LoadBalancerPlugin
  6.     vpnaas = neutron.services.vpn.plugin:VPNDriverPlugin
  7.     metering = neutron.services.metering.metering_plugin:MeteringPlugin
复制代码

self.rpc_loop会循环检测从plugin发送过来的rpc请求:

  1. @lockutils.synchronized('l3-agent', 'neutron-')
  2.     def _rpc_loop(self):
  3.         # _rpc_loop and _sync_routers_task will not be
  4.         # executed in the same time because of lock.
  5.         # so we can clear the value of updated_routers
  6.         # and removed_routers, but they can be updated by
  7.         # updated_routers and removed_routers rpc call
  8.         try:
  9.             LOG.debug(_("Starting RPC loop for %d updated routers"),
  10.                       len(self.updated_routers))
  11.             if self.updated_routers:  # 保存了需要本次处理的router信息
  12.                 # We're capturing and clearing the list, and will
  13.                 # process the "captured" updates in this loop,
  14.                 # and any updates that happen due to a context switch
  15.                 # will be picked up on the next pass.
  16.                 updated_routers = set(self.updated_routers)
  17.                 self.updated_routers.clear()
  18.                 router_ids = list(updated_routers)
  19.                 routers = self.plugin_rpc.get_routers(
  20.                     self.context, router_ids)
  21.                 # routers with admin_state_up=false will not be in the fetched
  22.                 fetched = set([r['id'] for r in routers])
  23.                 #不在fetched中而在updated_routers中,说明需删除
  24.                 self.removed_routers.update(updated_routers - fetched)   
  25.                 self._process_routers(routers)
  26.             self._process_router_delete()
  27.             LOG.debug(_("RPC loop successfully completed"))
  28.         except Exception:
  29.             LOG.exception(_("Failed synchronizing routers"))
  30.             self.fullsync = True
复制代码

_process_routers
如果有rpc请求过来,即需要更新路由信息,或者添加路由子接口,创建floating ip等操作,都会在这里执行。这个函数里会去调用_process_routers函数,在_process_routers函数中会去创建绿色线程,执行process_router函数。可以说,l3 agent调用网络设备的工作都会在process_router中进行。

  1. def process_router(self, ri):
  2.         ri.iptables_manager.defer_apply_on()
  3.         ex_gw_port = self._get_ex_gw_port(ri)
  4.         internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, [])
  5.         existing_port_ids = set([p['id'] for p in ri.internal_ports])
  6.         current_port_ids = set([p['id'] for p in internal_ports
  7.                                 if p['admin_state_up']])
  8.         new_ports = [p for p in internal_ports if
  9.                      p['id'] in current_port_ids and
  10.                      p['id'] not in existing_port_ids]
  11.         old_ports = [p for p in ri.internal_ports if
  12.                      p['id'] not in current_port_ids]
  13.         for p in new_ports:
  14.             self._set_subnet_info(p)
  15.             self.internal_network_added(ri, p['network_id'], p['id'],
  16.                                         p['ip_cidr'], p['mac_address'])
  17.             ri.internal_ports.append(p)
  18.         for p in old_ports:
  19.             self.internal_network_removed(ri, p['id'], p['ip_cidr'])
  20.             ri.internal_ports.remove(p)
  21.         existing_devices = self._get_existing_devices(ri)
  22.         current_internal_devs = set([n for n in existing_devices
  23.                                      if n.startswith(INTERNAL_DEV_PREFIX)])
  24.         current_port_devs = set([self.get_internal_device_name(id) for
  25.                                  id in current_port_ids])
  26.         stale_devs = current_internal_devs - current_port_devs
  27.         for stale_dev in stale_devs:
  28.             LOG.debug(_('Deleting stale internal router device: %s'),
  29.                       stale_dev)
  30.             self.driver.unplug(stale_dev,
  31.                                namespace=ri.ns_name,
  32.                                prefix=INTERNAL_DEV_PREFIX)
  33.         # Get IPv4 only internal CIDRs
  34.         internal_cidrs = [p['ip_cidr'] for p in ri.internal_ports
  35.                           if netaddr.IPNetwork(p['ip_cidr']).version == 4]
  36.         # TODO(salv-orlando): RouterInfo would be a better place for
  37.         # this logic too
  38.         ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
  39.                          ri.ex_gw_port and ri.ex_gw_port['id'])
  40.         interface_name = None
  41.         if ex_gw_port_id:
  42.             interface_name = self.get_external_device_name(ex_gw_port_id)
  43.         if ex_gw_port and ex_gw_port != ri.ex_gw_port:
  44.             self._set_subnet_info(ex_gw_port)
  45.             self.external_gateway_added(ri, ex_gw_port,
  46.                                         interface_name, internal_cidrs)
  47.         elif not ex_gw_port and ri.ex_gw_port:
  48.             self.external_gateway_removed(ri, ri.ex_gw_port,
  49.                                           interface_name, internal_cidrs)
  50.         stale_devs = [dev for dev in existing_devices
  51.                       if dev.startswith(EXTERNAL_DEV_PREFIX)
  52.                       and dev != interface_name]
  53.         for stale_dev in stale_devs:
  54.             LOG.debug(_('Deleting stale external router device: %s'),
  55.                       stale_dev)
  56.             self.driver.unplug(stale_dev,
  57.                                bridge=self.conf.external_network_bridge,
  58.                                namespace=ri.ns_name,
  59.                                prefix=EXTERNAL_DEV_PREFIX)
  60.         # Process static routes for router
  61.         self.routes_updated(ri)
  62.         # Process SNAT rules for external gateway
  63.         ri.perform_snat_action(self._handle_router_snat_rules,
  64.                                internal_cidrs, interface_name)
  65.         # Process SNAT/DNAT rules for floating IPs
  66.         fip_statuses = {}
  67.         try:
  68.             if ex_gw_port:
  69.                 existing_floating_ips = ri.floating_ips
  70.                 self.process_router_floating_ip_nat_rules(ri)
  71.                 ri.iptables_manager.defer_apply_off()
  72.                 # Once NAT rules for floating IPs are safely in place
  73.                 # configure their addresses on the external gateway port
  74.                 fip_statuses = self.process_router_floating_ip_addresses(
  75.                     ri, ex_gw_port)
  76.         except Exception:
  77.             # TODO(salv-orlando): Less broad catching
  78.             # All floating IPs must be put in error state
  79.             for fip in ri.router.get(l3_constants.FLOATINGIP_KEY, []):
  80.                 fip_statuses[fip['id']] = l3_constants.FLOATINGIP_STATUS_ERROR
  81.         if ex_gw_port:
  82.             # Identify floating IPs which were disabled
  83.             ri.floating_ips = set(fip_statuses.keys())
  84.             for fip_id in existing_floating_ips - ri.floating_ips:
  85.                 fip_statuses[fip_id] = l3_constants.FLOATINGIP_STATUS_DOWN
  86.             # Update floating IP status on the neutron server
  87.             self.plugin_rpc.update_floatingip_statuses(
  88.                 self.context, ri.router_id, fip_statuses)
  89.         # Update ex_gw_port and enable_snat on the router info cache
  90.         ri.ex_gw_port = ex_gw_port
  91.         ri.enable_snat = ri.router.get('enable_snat')
复制代码

process_router函数所做的工作有:
1.处理内部接口
这个是在router添加和删除子接口时工作。它会调用internal_network_added和internal_network_removed这个两个函数。
在internal_network_added和internal_network_removed这个两个函数会去调用OVSInterfaceDriver的plug和unplug 函数,这两个函数最终会用ip link 和ip addr的命令去处理接口和ip地址。

2.处理外部网关
router添加和删除外部网关。调用external_gateway_added和external_gateway_removed函数,同样也会调用plug和unplug函数,用ip link 和ip addr的命令进行最终处理

3.为外部网关做SNAT

调用_handle_router_snat_rules函数,使用iptables来加链和删除链。
在我的测试网络中,router上有3个接口,外部网关地址为192.168.39.2,内部两个子网的网关为10.1.0.1,10.2.0.1。iptables规则如下:


  1. iptables -t nat -A POSTROUTING ! -i qg-fcb1a762-1f ! -o qg-fcb1a762-1f -m conntrack ! --ctstate DNAT -j ACCEPT
  2. iptables -t nat -A snat -s 10.2.0.1/24 -j SNAT --to-source 192.168.39.2
  3. iptables -t nat -A snat -s 10.1.0.1/24 -j SNAT --to-source 192.168.39.2
复制代码

qg-fcb1a762-1f为外部网关接口的索引,使用ip netns exec $namespace ip link list可查看。

4.为floating ip做SNAT/DNAT

和浮动IP相关,如创建,更新,删除,绑定到一个云主机的接口,解绑定等。

不同neutron版本这部分的处理不同,这里是基于Icehouse rc1版本的,在havava stable版本,只有一个函数来处理iptables规则和floating ip。

process_router_floating_ip_nat_rules :当floating ip与云主机绑定时,会先清除已有的floating_ip规则,再加上要添加的iptables规则,同时重新加载清除的iptables规则。

比如,一个云主机10.1.0.2上绑定了一个floating ip(192.168.39.5)。那么最终会在iptable不同的链中添加iptables规则,float-snat为neutron自定义链。


  1. iptables -t nat -A PREROUTING -d 192.168.39.5 -j DNAT --to 10.1.0.2
  2. iptables -t nat -A OUTPUT -d 192.168.39.5 -j DNAT --to 10.1.0.2
  3. iptables -t nat -A float-snat -s 10.1.0.2 -j SNAT --to 192.168.39.5
复制代码

process_router_floating_ip_addresses:
将floating ip和云主机绑定时,使用ip addr add命令添加ip地址。
解除floating ip和云主机绑定时,使用ip addr del命令将floating ip删除。

类图

1.jpg


相关文章

openstack Neutron分析(3)—— neutron-dhcp-agent源码分析

openstack Neutron分析(4)—— neutron-l3-agent中的iptables

openstack Neutron分析(5)-- neutron openvswitch agent


Openstack之neutron入门


Openstack之neutron入门二


Openstack之neutron入门三

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条