问题导读
1.openvswitch agent启动做了哪些工作?
2.你认为neutron-openvswitch-agent的作用是什么?
neutron-openvswitch-agent只是Neutron的一个网络插件,那么什么是openvswitch,这里我们需要了解下:
OpenvSwitch,简称OVS是一个虚拟交换软件,主要用于虚拟机VM环境,作为一个虚拟交换机,支持Xen/XenServer, KVM, and VirtualBox多种虚拟化技术。
在这种某一台机器的虚拟化的环境中,一个虚拟交换机(vswitch)主要有两个作用:传递虚拟机VM之间的流量,以及实现VM和外界网络的通信。
简单了解,我们接着继续
在openstack中目前用的比较多的L2层agent应该就是openvswitch agent了。本文大致分析了一下openvswithc agent做了哪些事。
看一下openvswitch agent的启动:
- neutron/plugins/openvswitch/agent/ovs_neutron_agent.py:main()
- plugin = OVSNeutronAgent(**agent_config)
- self.setup_rpc()
- self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
- self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
- self.connection = agent_rpc.create_consumers(...)
- heartbeat = loopingcall.FixedIntervalLoopingCall(self._report_state)
- self.setup_integration_br()
- self.setup_physical_bridges(bridge_mappings)
- self.sg_agent = OVSSecurityGroupAgent(...)
- plugin.daemon_loop()
- self.rpc_loop()
- port_info = self.update_ports(ports)
- sync = self.process_network_ports(port_info)
复制代码
启动时做了以下工作:
1. 设置plugin_rpc,这是用来与neutron-server通信的。
2. 设置state_rpc,用于agent状态信息上报。
3. 设置connection,用于接收neutron-server的消息。
4. 启动状态周期上报。
5. 设置br-int。
6. 设置bridge_mapping对应的网桥。
7. 初始化sg_agent,用于处理security group。
8. 周期检测br-int上的端口变化,调用process_network_ports处理添加/删除端口。
neutron-server和neutron-openvswitch-agent的消息队列如下:
neutron-server可能会发生上述四种消息广播给neutron-openvswitch-agent。openvswitch agent会先看一下端口是否在本地,如果在本地则进行对应动作。
最后看下nova与neutron-openvswitch-agent的交互,这张图片来源于GongYongSheng在香港峰会的PPT:
首先boot虚机时,nova-compute发消息给neutron-server请求创建port。之后,在driver里面在br-int上建立port后,neutron-openvswitch-port循环检测br-int会发现新增端口,对其设定合适的openflow规则以及localvlan,最后将port状态设置为ACTIVE。
neutron-openvswitch-agent代码分析neutron.plugins.openvswitch.agent.ovs_neutron_agent:main
- # init ovs first by agent_config:
- # setup plugin_rpc, state_rpc, msgq consumer, periodically state report
- # setup br-int, br-tun, bridge_mapping
- # start sg_agent
- agent = OVSNeutronAgent(**agent_config)
- # start to process rpc events
- # process port up/down and related flow update/local vlan bounding
- # process security group updates
- agent.daemon_loop()
复制代码
OVSNeutronAgent的初始化
- class OVSNeutronAgent(n_rpc.RpcCallback,
- sg_rpc.SecurityGroupAgentRpcCallbackMixin,
- l2population_rpc.L2populationRpcCallBackMixin):
- '''Implements OVS-based tunneling, VLANs and flat networks.
-
- Two local bridges are created: an integration bridge (defaults to
- 'br-int') and a tunneling bridge (defaults to 'br-tun'). An
- additional bridge is created for each physical network interface
- used for VLANs and/or flat networks.
-
- All VM VIFs are plugged into the integration bridge. VM VIFs on a
- given virtual network share a common "local" VLAN (i.e. not
- propagated externally). The VLAN id of this local VLAN is mapped
- to the physical networking details realizing that virtual network.
-
- For virtual networks realized as GRE tunnels, a Logical Switch
- (LS) identifier is used to differentiate tenant traffic on
- inter-HV tunnels. A mesh of tunnels is created to other
- Hypervisors in the cloud. These tunnels originate and terminate on
- the tunneling bridge of each hypervisor. Port patching is done to
- connect local VLANs on the integration bridge to inter-hypervisor
- tunnels on the tunnel bridge.
-
- For each virtual network realized as a VLAN or flat network, a
- veth or a pair of patch ports is used to connect the local VLAN on
- the integration bridge with the physical network bridge, with flow
- rules adding, modifying, or stripping VLAN tags as necessary.
- '''
-
- # history
- # 1.0 Initial version
- # 1.1 Support Security Group RPC
- RPC_API_VERSION = '1.1'
-
- def __init__(self, integ_br, tun_br, local_ip,
- bridge_mappings, root_helper,
- polling_interval, tunnel_types=None,
- veth_mtu=None, l2_population=False,
- minimize_polling=False,
- ovsdb_monitor_respawn_interval=(
- constants.DEFAULT_OVSDBMON_RESPAWN),
- arp_responder=False,
- use_veth_interconnection=False):
- '''Constructor.
-
- :param integ_br: name of the integration bridge.
- :param tun_br: name of the tunnel bridge.
- :param local_ip: local IP address of this hypervisor.
- :param bridge_mappings: mappings from physical network name to bridge.
- :param root_helper: utility to use when running shell cmds.
- :param polling_interval: interval (secs) to poll DB.
- :param tunnel_types: A list of tunnel types to enable support for in
- the agent. If set, will automatically set enable_tunneling to
- True.
- :param veth_mtu: MTU size for veth interfaces.
- :param l2_population: Optional, whether L2 population is turned on
- :param minimize_polling: Optional, whether to minimize polling by
- monitoring ovsdb for interface changes.
- :param ovsdb_monitor_respawn_interval: Optional, when using polling
- minimization, the number of seconds to wait before respawning
- the ovsdb monitor.
- :param arp_responder: Optional, enable local ARP responder if it is
- supported.
- :param use_veth_interconnection: use veths instead of patch ports to
- interconnect the integration bridge to physical bridges.
- '''
- super(OVSNeutronAgent, self).__init__()
- self.use_veth_interconnection = use_veth_interconnection
- self.veth_mtu = veth_mtu
- self.root_helper = root_helper
- self.available_local_vlans = set(moves.xrange(q_const.MIN_VLAN_TAG, # 1-1094
- q_const.MAX_VLAN_TAG))
- self.tunnel_types = tunnel_types or []
- self.l2_pop = l2_population
- # TODO(ethuleau): Initially, local ARP responder is be dependent to the
- # ML2 l2 population mechanism driver.
- self.arp_responder_enabled = (arp_responder and
- self._check_arp_responder_support() and
- self.l2_pop)
- self.agent_state = {
- 'binary': 'neutron-openvswitch-agent',
- 'host': cfg.CONF.host,
- 'topic': q_const.L2_AGENT_TOPIC,
- 'configurations': {'bridge_mappings': bridge_mappings,
- 'tunnel_types': self.tunnel_types,
- 'tunneling_ip': local_ip,
- 'l2_population': self.l2_pop,
- 'arp_responder_enabled':
- self.arp_responder_enabled},
- 'agent_type': q_const.AGENT_TYPE_OVS,
- 'start_flag': True}
-
- # Keep track of int_br's device count for use by _report_state()
- self.int_br_device_count = 0
-
- self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper)
- # Create integration bridge and patch ports,
- # remove all existing flows, and then witch all traffic using L2 learning
- # add a canary flow to int_br to track OVS restarts
- self.setup_integration_br()
-
- # Stores port update notifications for processing in main rpc loop
- self.updated_ports = set()
-
- # state_rpc: periodically report state by topic q-plugin
- # plugin_rpc:communicate with plugin by topic q-agent-notifier
- # consumer topic id includes:
- # q-agent-notifier-port_update
- # q-agent-notifier-network_delete
- # q-agent-notifier-tunnel_update
- # q-agent-notifier-security_group_update
- # q-agent-notifier-l2population_update (需要启动l2populcation)
- self.setup_rpc()
-
- # bridge_mappings = default:br-eth1
- self.bridge_mappings = bridge_mappings
- # Creates physical network bridges and links them to integration bridge using veths.
- self.setup_physical_bridges(self.bridge_mappings)
-
- self.local_vlan_map = {}
- self.tun_br_ofports = {p_const.TYPE_GRE: {},
- p_const.TYPE_VXLAN: {}}
-
- self.polling_interval = polling_interval
- self.minimize_polling = minimize_polling
- self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval
-
- if tunnel_types:
- self.enable_tunneling = True
- else:
- self.enable_tunneling = False
- self.local_ip = local_ip
- self.tunnel_count = 0
- self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
- self.dont_fragment = cfg.CONF.AGENT.dont_fragment
- self.tun_br = None
-
- # Creates tunnel bridge, and links it to the integration bridge
- # using a patch port.
- # create default flow tables
- if self.enable_tunneling:
- self.setup_tunnel_br(tun_br)
-
- # Collect additional bridges to monitor
- # Setup ancillary bridges - for example br-ex
- self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br)
-
- # Security group agent support
- # firewall_driver = neutron.agent.linux.iptables_firewall.OVSHybridIptablesFirewallDriver
- self.sg_agent = OVSSecurityGroupAgent(self.context,
- self.plugin_rpc,
- root_helper)
- # Initialize iteration counter
- self.iter_num = 0
- self.run_daemon_loop = True
复制代码
OVSNeutronAgent初始化完成后启动agent.daemon_loop()
- def daemon_loop(self):
- with polling.get_polling_manager(
- self.minimize_polling,
- self.root_helper,
- self.ovsdb_monitor_respawn_interval) as pm:
-
- self.rpc_loop(polling_manager=pm)
复制代码
rpc_loop- def rpc_loop(self, polling_manager=None):
- # sync tunnel and check ovs restart
- if not polling_manager:
- polling_manager = polling.AlwaysPoll()
-
- sync = True
- ports = set()
- updated_ports_copy = set()
- ancillary_ports = set()
- tunnel_sync = True
- ovs_restarted = False
- while self.run_daemon_loop:
- start = time.time()
- port_stats = {'regular': {'added': 0,
- 'updated': 0,
- 'removed': 0},
- 'ancillary': {'added': 0,
- 'removed': 0}}
- LOG.debug(_("Agent rpc_loop - iteration:%d started"),
- self.iter_num)
- if sync:
- LOG.info(_("Agent out of sync with plugin!"))
- ports.clear()
- ancillary_ports.clear()
- sync = False
- polling_manager.force_polling()
- # check ovs restart by checking exist for the canary flow
- ovs_restarted = self.check_ovs_restart()
- if ovs_restarted:
- # reset br-int, br-tun, and other physical bridges
- self.setup_integration_br()
- self.setup_physical_bridges(self.bridge_mappings)
- if self.enable_tunneling:
- self.setup_tunnel_br()
- tunnel_sync = True
-
- # Notify the plugin of tunnel IP
- if self.enable_tunneling and tunnel_sync:
- LOG.info(_("Agent tunnel out of sync with plugin!"))
- try:
- tunnel_sync = self.tunnel_sync()
- except Exception:
- LOG.exception(_("Error while synchronizing tunnels"))
- tunnel_sync = True
-
- # check if ports/security_groups updates and perform update
- if self._agent_has_updates(polling_manager) or ovs_restarted:
- try:
- LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
- "starting polling. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- # Save updated ports dict to perform rollback in
- # case resync would be needed, and then clear
- # self.updated_ports. As the greenthread should not yield
- # between these two statements, this will be thread-safe
- updated_ports_copy = self.updated_ports
- self.updated_ports = set()
- reg_ports = (set() if ovs_restarted else ports)
- # {
- # 'current': ['ovs-vsctl', --format=json', '--', '--columns=name,external_ids,ofport','list', 'Interface'] and then filted in get_vif_port_set()
- # 'updated': check_changed_vlans() returns a set of port ids of the ports concerned by a vlan tag loss, and then updated_ports &= cur_ports
- # 'added' : cur_ports - registered_ports
- # 'removed': registered_ports - cur_ports
- # }
- port_info = self.scan_ports(reg_ports, updated_ports_copy)
- LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
- "port information retrieved. "
- "Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- # Secure and wire/unwire VIFs and update their status
- # on Neutron server
- if (self._port_info_has_changes(port_info) or
- self.sg_agent.firewall_refresh_needed() or
- ovs_restarted):
- LOG.debug(_("Starting to process devices in:%s"),
- port_info)
- # If treat devices fails - must resync with plugin
- sync = self.process_network_ports(port_info,
- ovs_restarted)
- LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
- "ports processed. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- port_stats['regular']['added'] = (
- len(port_info.get('added', [])))
- port_stats['regular']['updated'] = (
- len(port_info.get('updated', [])))
- port_stats['regular']['removed'] = (
- len(port_info.get('removed', [])))
- ports = port_info['current']
- # Treat ancillary devices if they exist
- if self.ancillary_brs:
- port_info = self.update_ancillary_ports(
- ancillary_ports)
- LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
- "ancillary port info retrieved. "
- "Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
-
- if port_info:
- rc = self.process_ancillary_network_ports(
- port_info)
- LOG.debug(_("Agent rpc_loop - iteration:"
- "%(iter_num)d - ancillary ports "
- "processed. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- ancillary_ports = port_info['current']
- port_stats['ancillary']['added'] = (
- len(port_info.get('added', [])))
- port_stats['ancillary']['removed'] = (
- len(port_info.get('removed', [])))
- sync = sync | rc
-
- polling_manager.polling_completed()
- except Exception:
- LOG.exception(_("Error while processing VIF ports"))
- # Put the ports back in self.updated_port
- self.updated_ports |= updated_ports_copy
- sync = True
-
- # sleep till end of polling interval
- elapsed = (time.time() - start)
- LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d "
- "completed. Processed ports statistics: "
- "%(port_stats)s. Elapsed:%(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'port_stats': port_stats,
- 'elapsed': elapsed})
- if (elapsed < self.polling_interval):
- time.sleep(self.polling_interval - elapsed)
- else:
- LOG.debug(_("Loop iteration exceeded interval "
- "(%(polling_interval)s vs. %(elapsed)s)!"),
- {'polling_interval': self.polling_interval,
- 'elapsed': elapsed})
- self.iter_num = self.iter_num + 1
复制代码
rpc_loop()中最重要的两个函数为tunnel_sync(查询并建立隧道)和process_network_ports(处理port和安全组变更) tunnel_sync 查询并建立隧道
- def tunnel_sync(self):
- resync = False
- try:
- for tunnel_type in self.tunnel_types: # tunnel_types = vxlan, gre
- # query tunnel details from plugin_rpc by local_ip and tunnel_type
- details = self.plugin_rpc.tunnel_sync(self.context,
- self.local_ip,
- tunnel_type)
- # establish tunnel with all other tunnel_ip if l2_pop disabled
- # if l2_pop enabled, tunnel sync is processed by agent in q-agent-notifier-l2population_update consumer id
- if not self.l2_pop:
- tunnels = details['tunnels']
- for tunnel in tunnels:
- if self.local_ip != tunnel['ip_address']:
- tunnel_id = tunnel.get('id')
- # Unlike the OVS plugin, ML2 doesn't return an id
- # key. So use ip_address to form port name instead.
- # Port name must be <=15 chars, so use shorter hex.
- remote_ip = tunnel['ip_address']
- remote_ip_hex = self.get_ip_in_hex(remote_ip)
- if not tunnel_id and not remote_ip_hex:
- continue
- tun_name = '%s-%s' % (tunnel_type,
- tunnel_id or remote_ip_hex)
- # setup tunnel_port and related flows
- self.setup_tunnel_port(tun_name,
- tunnel['ip_address'],
- tunnel_type)
- except Exception as e:
- LOG.debug(_("Unable to sync tunnel IP %(local_ip)s: %(e)s"),
- {'local_ip': self.local_ip, 'e': e})
- resync = True
- return resync
复制代码
process_network_ports 处理port和安全组变更
- def process_network_ports(self, port_info, ovs_restarted):
- resync_a = False
- resync_b = False
- # TODO(salv-orlando): consider a solution for ensuring notifications
- # are processed exactly in the same order in which they were
- # received. This is tricky because there are two notification
- # sources: the neutron server, and the ovs db monitor process
- # If there is an exception while processing security groups ports
- # will not be wired anyway, and a resync will be triggered
- # TODO(salv-orlando): Optimize avoiding applying filters unnecessarily
- # (eg: when there are no IP address changes)
- # 通过plugin_rpc到plugin查询安全组,然后通过sg_agent应用安全组
- self.sg_agent.setup_port_filters(port_info.get('added', set()),
- port_info.get('updated', set()))
- # VIF wiring needs to be performed always for 'new' devices.
- # For updated ports, re-wiring is not needed in most cases, but needs
- # to be performed anyway when the admin state of a device is changed.
- # A device might be both in the 'added' and 'updated'
- # list at the same time; avoid processing it twice.
- devices_added_updated = (port_info.get('added', set()) |
- port_info.get('updated', set()))
- if devices_added_updated:
- start = time.time()
- try:
- # 添加或更新的ports: 查询port详细信息后通知plugin port up/down,接着再port_bound/port_dead
- skipped_devices = self.treat_devices_added_or_updated(
- devices_added_updated, ovs_restarted)
- LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
- "treat_devices_added_or_updated completed. "
- "Skipped %(num_skipped)d devices of "
- "%(num_current)d devices currently available. "
- "Time elapsed: %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'num_skipped': len(skipped_devices),
- 'num_current': len(port_info['current']),
- 'elapsed': time.time() - start})
- # Update the list of current ports storing only those which
- # have been actually processed.
- port_info['current'] = (port_info['current'] -
- set(skipped_devices))
- except DeviceListRetrievalError:
- # Need to resync as there was an error with server
- # communication.
- LOG.exception(_("process_network_ports - iteration:%d - "
- "failure while retrieving port details "
- "from server"), self.iter_num)
- resync_a = True
- if 'removed' in port_info:
- start = time.time()
- # 删除的port:删除安全组,通知plugin port down, 并unbound local_vlan,
- resync_b = self.treat_devices_removed(port_info['removed'])
- LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
- "treat_devices_removed completed in %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
- # If one of the above operations fails => resync with plugin
- return (resync_a | resync_b)
复制代码
neutron-ovs-cleanupThis service starts on boot and ensures that Networking has full control over the creation and management of tap devices.
- def main():
- """Main method for cleaning up OVS bridges.
-
- The utility cleans up the integration bridges used by Neutron.
- """
-
- conf = setup_conf()
- conf()
- config.setup_logging(conf)
-
- configuration_bridges = set([conf.ovs_integration_bridge,
- conf.external_network_bridge])
- ovs_bridges = set(ovs_lib.get_bridges(conf.AGENT.root_helper))
- available_configuration_bridges = configuration_bridges & ovs_bridges
-
- if conf.ovs_all_ports:
- bridges = ovs_bridges
- else:
- bridges = available_configuration_bridges
-
- # Collect existing ports created by Neutron on configuration bridges.
- # After deleting ports from OVS bridges, we cannot determine which
- # ports were created by Neutron, so port information is collected now.
- ports = collect_neutron_ports(available_configuration_bridges,
- conf.AGENT.root_helper)
-
- for bridge in bridges:
- LOG.info(_("Cleaning %s"), bridge)
- ovs = ovs_lib.OVSBridge(bridge, conf.AGENT.root_helper)
- ovs.delete_ports(all_ports=conf.ovs_all_ports)
-
- # Remove remaining ports created by Neutron (usually veth pair)
- delete_neutron_ports(ports, conf.AGENT.root_helper)
-
- LOG.info(_("OVS cleanup completed successfully"))
复制代码
相关文章:
openstack Neutron分析(2)—— neutron-l3-agent
openstack Neutron分析(3)—— neutron-dhcp-agent源码分析
openstack Neutron分析(4)—— neutron-l3-agent中的iptables
Openstack之neutron入门一
Openstack之neutron入门二
Openstack之neutron入门三
|