OpenStack: Neutron Service Code Deep Dive

This post is a dummy walkthrough of neutron services code. I recommend being familiar with the following modules (not a hard requirement, but will help you avoid jumping between this post and other docs):

I created the following drawing to make it easier to track several of the files and classes mentioned in this post.

L3 Agent

My purpose with doing this was to learn more about how neutron handles services. As I believe in keeping it down to earth. I decided to follow a real example of service in neutron – the l3 agent.

So let’s start by looking at l3 agent main function (there is actually another one before it, for the cmd, but it contains only a call to this function).

def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
    register_opts(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    config.setup_privsep()
    server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)
    service.launch(cfg.CONF, server).wait()

The location of this code is  here. First relevant block, in this function, for creating the actual service are the following lines

server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)

It uses the create function from neuton.service. You’ll notice the ‘create’ method is a classmethod and the way the instance of the Service class is created, is by calling this ‘create’ method and not directly by creating an instance of the Service class.

   @classmethod
    def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_interval=None,
               periodic_fuzzy_delay=None):
        """Instantiates class and passes back application object.
        :param host: defaults to cfg.CONF.host
        :param binary: defaults to basename of executable
        :param topic: defaults to bin_name - 'neutron-' part
        :param manager: defaults to cfg.CONF.<topic>_manager
        :param report_interval: defaults to cfg.CONF.report_interval
        :param periodic_interval: defaults to cfg.CONF.periodic_interval
        :param periodic_fuzzy_delay: defaults to cfg.CONF.periodic_fuzzy_delay
        """
        if not host:
            host = cfg.CONF.host
        if not binary:
            binary = os.path.basename(inspect.stack()[-1][1])
        if not topic:
            topic = binary.rpartition('neutron-')[2]
            topic = topic.replace("-", "_")
        if not manager:
            manager = cfg.CONF.get('%s_manager' % topic, None)
        if report_interval is None:
            report_interval = cfg.CONF.report_interval
        if periodic_interval is None:
            periodic_interval = cfg.CONF.periodic_interval
        if periodic_fuzzy_delay is None:
            periodic_fuzzy_delay = cfg.CONF.periodic_fuzzy_delay
        service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_interval=periodic_interval,
                          periodic_fuzzy_delay=periodic_fuzzy_delay)

        return service_obj

Let’s go over the parameters:

  • host – where the agent runs, on which server
  • binary – determines the binary name used for running the service. It does so by using the inspect module from the Python’s standard library. inspect.stack() will return a list of frame records from the current stack.
  • topic – service listening or consuming queues based on the topic.
  • manager -usually a class which defines how the services behaves at certain point (e.g what to execute when initialized, what to execute after starting).
  • report_interval – the seconds between state reports to the server. We set this argument by passing ‘cfg.CONF.AGENT.report_interval’. It set here and the default is 30.
  • periodic_interval – the seconds between running periodic tasks (this description taken directly from the code). Default is 40 as can be seen here.
  • periodic_fuzzy_delay – used as a range of seconds to randomly delay the start of a certain loop.
  • service_obj – the last and the most important one. This is an object of the class, created by calling the class with all the above parameters.

Let’s see how service_obj is created (the code is located in the same file):

class Service(n_rpc.Service):
    """Service object for binaries running on hosts.
    A service takes a manager and enables rpc by listening to queues based
    on topic. It also periodically runs tasks on the manager.
    """

    def __init__(self, host, binary, topic, manager, report_interval=None,
                 periodic_interval=None, periodic_fuzzy_delay=None,
                 *args, **kwargs):

        self.binary = binary
        self.manager_class_name = manager
        manager_class = importutils.import_class(self.manager_class_name)
        self.manager = manager_class(host=host, *args, **kwargs)
        self.report_interval = report_interval
        self.periodic_interval = periodic_interval
        self.periodic_fuzzy_delay = periodic_fuzzy_delay
        self.saved_args, self.saved_kwargs = args, kwargs
        self.timers = []
        profiler.setup(binary, host)
        super(Service, self).__init__(host, topic, manager=self.manager)

We can see most of the attributes are the same. Note the profiler line:

profiler.setup(binary, host)

This will set up the profiler for this specific service.

You may also notice that this class inherits from n_rpc.Service which is in neutron.common.rpc .The last line is calling this class

super(Service, self).__init__(host, topic, manager=self.manager)

Let’s see how it looks:

@profiler.trace_cls("rpc")
class Service(service.Service):
    """Service object for binaries running on hosts.
    A service enables rpc by listening to queues based on topic and host.
    """
    def __init__(self, host, topic, manager=None, serializer=None):
        super(Service, self).__init__()
        self.host = host
        self.topic = topic
        self.serializer = serializer
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

This Service class inherits from oslo service and this is the last inheritance in neutron for Service. This class basically subclasses oslo_service.service.ServiceBase. which provides an interface for a service.

Back to L3 agent main function.

def main(manager='neutron.agent.l3.agent.L3NATAgentWithStateReport'):
    register_opts(cfg.CONF)
    common_config.init(sys.argv[1:])
    config.setup_logging()
    config.setup_privsep()
    server = neutron_service.Service.create(
        binary='neutron-l3-agent',
        topic=topics.L3_AGENT,
        report_interval=cfg.CONF.AGENT.report_interval,
        manager=manager)
    service.launch(cfg.CONF, server).wait()

We now reached the last line which runs the actual service. We are providing the launch method, two arguments:

1. the configuration (this is mostly composed by regsiter_opts in the same file).

2. ‘server’ which is the service we just created, in the previous line. We also use “wait” to not return anything, until the service is stopped.

Without deep diving too much into oslo service, ‘service.launch’ will eventually run the ‘start’ method of the service. You can take a look at it here.

In our case, it will run the start method in neutron.service which we already have seen:

    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        ...

First, it runs the ‘init_host’ method of our manager. Reminder: our manager is “neutron.agent.l3.agent.L3NATAgentWithStateReport”  as passed to main l3 agent function.

L3NATAgentWithStateReport inherits from L3NATAgent which inherits from multiple classes when one of them is neutron.manager.Manager which includes the method ‘init_host’

‘init_host’ is an empty method in neutron.manager.Manager. This method is expected to be overridden by a subclasses. In reality, this method implemented only by the DHCP agent at the moment.

Next, it calls the start method of the parent class, which is in neutron.common.rpc

    def start(self):
        super(Service, self).start()
        self.conn = create_connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)
        endpoints = [self.manager]
        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

The first line is calling oslo service start method. Which is an abstract (empty method, implementation is done by neutron.common.rpc service).

Next, it creates self.conn by calling ‘create_connection’ which basically creates and returns a Connection object from the same file.

Then, after assigning the manager to endpoints variable, it will call ‘create_consumer’ of our Connection object (self.conn).

‘create_consumer’ will create an RPC server that will consume every message sent to the topic of the L3 agent (which is ‘l3_agent’, surprisingly). Let’s see how it works.

    def create_consumer(self, topic, endpoints, fanout=False):
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        server = get_server(target, endpoints)
        self.servers.append(server)

The first line is creating a target. A target in oslo messaging encapsulates all the information to identify what messages a service is listening for or where the messages should be sent. In our case, the topic is ‘l3_agent‘, the host is where we are running the agent and fanout is False.

Next it calls ‘get_server‘ in the same file (neutron.common.rpc) to consturct an RPCServer from oslo.messaging.

The last line is adding the server from the previous line to the servers list of our connection object (self.conn).

Back to the start method in neutron.common.rpc where we left it.

   def start(self):
        super(Service, self).start()
        self.conn = create_connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)
        endpoints = [self.manager]
        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
                                                                             ------------> WE ARE HERE
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

So now that we have a consumer, we can move to the next step, which calls the method ‘initialize_service_hook’ of our manager, if it exists. In our case, our manager (neutron.agent.l3.agent.L3NATAgentWithStateReport) doesn’t have such attribute ( to be honest, it doesn’t exist at all in neutron tree), so this part is skipped.

Last line is calling ‘consume_in_threads’ of our connection object (self.conn)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

It’s pretty basic. All it does is to go over the RPC servers list (in our case, one server which we created with ‘create_consumer’) and start them.

At this point, we are done with the start method in neutron.common.rpc and we are back to neutron.service start method

    def start(self):
        self.manager.init_host()
        super(Service, self).start()
                                    ----------------------------------------------> WE ARE HERE
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

It checks now, if report_interval is defined (again, the seconds between state reports to the server which is set here) since it’s defined, it will create a loop interval, using loopingcall.FixedIntervalLoopingCall from oslo.service.

It will pass it ‘self.report_state’ which is a method of neutron.service Service that supposes to report the state, but at the moment is not implemented. Once created, it will start the loop with the interval set by ‘self.report_interval’ and it will add it to the timers list, which is an empty list at the moment.

Next, it checks if ‘periodic_interval’ (seconds between running periodic tasks) is set. Since it’s set here it will proceed to the next check of whether ‘periodic_fuzzy_delay’ (reminder: used as a range of seconds to randomly delay the start of a certain loop). In our case it’s defined by default to 5, so ‘initial_delay’ will be randomly set to a number between 0 and 5.

Next we’ll create a loopingcall which is a class in oslo service that allows us to run a specific method in loop. In this case we’ll run periodic_tasks method, which defined in the same file:

def periodic_tasks(self, raise_on_error=False):
    """Tasks to be run at a periodic interval."""
    ctxt = context.get_admin_context()
    self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)

First, ‘periodic_tasks’ creates a context object by calling ‘get_admin_context‘ from the neutron-lib project. ‘get_admin_context’ returns an oslo context object which we’ll use for running the periodic tasks.

In the next line, we are calling neutron’s manager periodic_tasks function which is just another call

def periodic_tasks(self, context, raise_on_error=False):
    self.run_periodic_tasks(context, raise_on_error=raise_on_error)

It calls ‘run_periodic_tasks‘ of oslo service module. Back to the start  method of neutron.service

    def start(self):
        self.manager.init_host()
        super(Service, self).start()
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
                                    ----------------------------------------------> WE ARE HERE
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)
        self.manager.after_start()

We reached the point where neutron starts to execute the periodic loop from the previous line. It does so by running the start method of the ‘FixedIntervalLoopingCall’ class.

Next, we are adding ‘periodic’ from the previous line to the list of timers.

Finally, it calls the ‘after_start‘ method of our manager (reminder: our manager is L3NATAgentWithStateReport)

    def after_start(self):
        eventlet.spawn_n(self._process_routers_loop)
        LOG.info(_LI("L3 agent started"))
        # Do the report state before we do the first full sync.
        self._report_state()
        self.pd.after_start()

It starts by spawning a green thread with a call to _process_router_loop. This  loop uses a green thread pool of the size 8 to ensure that the maximum number of workers are either processing a router or waiting on the queue for the next update to come in.

def _process_routers_loop(self):
    LOG.debug("Starting _process_routers_loop")
    pool = eventlet.GreenPool(size=8)
    while True:
        pool.spawn_n(self._process_router_update)

Back to after_start,  it calls next to _report_state which reports on the overall status of all the existing routers. If the agent was just revived from a crash, then it will perform a full sync.

    def after_start(self):
        eventlet.spawn_n(self._process_routers_loop)
        LOG.info(_LI("L3 agent started"))
        # Do the report state before we do the first full sync.
        self._report_state()  --------------------------------------> WE ARE HERE
        self.pd.after_start()

Next, it calls to ‘after_start‘ method of our prefix delegation object, which defines a signal handler.

def after_start(self):
    LOG.debug('SIGUSR1 signal handler set')
    signal.signal(signal.SIGUSR1, self._handle_sigusr1)

At this point, our agent is up and running. Handling the L3 and consuming, publishing messages regarding L3 topics.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s