Skip to content
Snippets Groups Projects
DeviceServiceServicerImpl.py 13.2 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from typing import Dict
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
import grpc, logging
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.proto.context_pb2 import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    Device, DeviceConfig, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Link)
from common.proto.device_pb2 import MonitoringSettings
from common.proto.device_pb2_grpc import DeviceServiceServicer
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.context_queries.Device import get_device
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from common.tools.mutex_queues.MutexQueues import MutexQueues
from context.client.ContextClient import ContextClient
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .driver_api._Driver import _Driver
from .driver_api.DriverInstanceCache import DriverInstanceCache, get_driver
from .monitoring.MonitoringLoops import MonitoringLoops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .ErrorMessages import ERROR_MISSING_DRIVER, ERROR_MISSING_KPI
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
from .Tools import (
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    check_connect_rules, check_no_endpoints, compute_rules_to_add_delete, configure_rules, deconfigure_rules,
    get_device_controller_uuid, populate_config_rules, populate_endpoint_monitoring_resources, populate_endpoints,
    populate_initial_config_rules, subscribe_kpi, unsubscribe_kpi, update_endpoints)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
METRICS_POOL = MetricsPool('Device', 'RPC')
class DeviceServiceServicerImpl(DeviceServiceServicer):
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None:
        self.driver_instance_cache = driver_instance_cache
        self.monitoring_loops = monitoring_loops
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues = MutexQueues()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        device_uuid = request.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        connection_config_rules = check_connect_rules(request.device_config)
        check_no_endpoints(request.device_endpoints)

        context_client = ContextClient()
        device = get_device(context_client, device_uuid, rw_copy=True)
        if device is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # not in context, create blank one to get UUID, and populate it below
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = Device()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device.device_id.CopyFrom(request.device_id)            # pylint: disable=no-member
            device.name = request.name
            device.device_type = request.device_type
            device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
            device.device_drivers.extend(request.device_drivers)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device.device_config.CopyFrom(request.device_config)    # pylint: disable=no-member
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
            device = get_device(context_client, device_id.device_uuid.uuid, rw_copy=True)

        # update device_uuid to honor UUID provided by Context
        device_uuid = device.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = get_driver(self.driver_instance_cache, device)

            errors = []

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Sub-devices and sub-links are exposed by intermediate controllers or represent mgmt links.
            # They are used to assist in path computation algorithms, and/or to identify dependencies
            # (which controller is in charge of which sub-device).
            new_sub_devices : Dict[str, Device] = dict()
            new_sub_links : Dict[str, Link] = dict()

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(device.device_endpoints) == 0:
                # created from request, populate endpoints using driver
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                errors.extend(populate_endpoints(
                    device, driver, self.monitoring_loops, new_sub_devices, new_sub_links))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed

            if len(device.device_config.config_rules) == len(connection_config_rules):
                # created from request, populate config rules using driver
                errors.extend(populate_config_rules(device, driver))

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: populate components

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('AddDevice', extra_details=errors)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device.device_operational_status = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            for sub_device in new_sub_devices.values():
                context_client.SetDevice(sub_device)

            for sub_links in new_sub_links.values():
                context_client.SetLink(sub_links)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Update endpoint monitoring resources with UUIDs
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_with_uuids = get_device(
                context_client, device_id.device_uuid.uuid, rw_copy=False, include_endpoints=True,
                include_components=False, include_config_rules=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            populate_endpoint_monitoring_resources(device_with_uuids, self.monitoring_loops)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client.close()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=True, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in ConfigureDevice')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_controller_uuid = get_device_controller_uuid(device)
            if device_controller_uuid is not None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device = get_device(
                    context_client, device_controller_uuid, rw_copy=True, include_endpoints=False,
                    include_components=False, include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                if device is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                    raise NotFoundException(
                        'Device', device_controller_uuid, extra_details='loading in ConfigureDevice')
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_uuid = device.device_id.device_uuid.uuid
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('ConfigureDevice', extra_details=msg)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if DeviceDriverEnum.DEVICEDRIVER_P4 in device.device_drivers:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                device = get_device(
                    context_client, device_uuid, rw_copy=False, include_endpoints=True, include_components=False,
                    include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                # P4 Driver, by now, has no means to retrieve endpoints
                # We allow defining the endpoints manually
                update_endpoints(request, device)

                # Update endpoints to get UUIDs
                device_id = context_client.SetDevice(device)
                device = context_client.GetDevice(device_id)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
                device.device_operational_status = request.device_operational_status

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # TODO: use of datastores (might be virtual ones) to enable rollbacks
            resources_to_set, resources_to_delete = compute_rules_to_add_delete(device, request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            errors = []
            errors.extend(configure_rules(device, driver, resources_to_set))
            errors.extend(deconfigure_rules(device, driver, resources_to_delete))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if len(errors) > 0:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                for error in errors: LOGGER.error(error)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('ConfigureDevice', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Context Performance+Scalability enhancement:
            # This method, besides P4 logic, does not add/update/delete endpoints.
            # Remove endpoints to reduce number of inserts done by Context.
            # TODO: Add logic to inspect endpoints and keep only those ones modified with respect to Context.
            del device.device_endpoints[:]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            # Note: Rules are updated by configure_rules() and deconfigure_rules() methods.
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device_id = context_client.SetDevice(device)
            return device_id
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_config_rules=False,
                include_components=False)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')
            device_uuid = device.device_id.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.monitoring_loops.remove_device(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            self.driver_instance_cache.delete(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client.RemoveDevice(request)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')

            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('GetInitialConfig', extra_details=msg)

            device_config = DeviceConfig()
            errors = populate_initial_config_rules(device_uuid, device_config, driver)

            if len(errors) > 0:
                for error in errors: LOGGER.error(error)
                raise OperationFailedException('GetInitialConfig', extra_details=errors)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return device_config
        finally:
            self.mutex_queues.signal_done(device_uuid)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        subscribe = (request.sampling_duration_s > 0.0) and (request.sampling_interval_s > 0.0)
        manage_kpi_method = subscribe_kpi if subscribe else unsubscribe_kpi

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        if subscribe:
            device_uuid = request.kpi_descriptor.device_id.device_uuid.uuid
        else:
            # unsubscribe only carries kpi_uuid; take device_uuid from recorded KPIs
            kpi_uuid = request.kpi_id.kpi_id.uuid
            kpi_details = self.monitoring_loops.get_kpi_by_uuid(kpi_uuid)
            if kpi_details is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_KPI.format(kpi_uuid=str(kpi_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = kpi_details[0]

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
        self.mutex_queues.wait_my_turn(device_uuid)
        try:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            context_client = ContextClient()
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            device = get_device(
                context_client, device_uuid, rw_copy=False, include_endpoints=False, include_components=False,
                include_config_rules=True)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if device is None:
                raise NotFoundException('Device', device_uuid, extra_details='loading in DeleteDevice')

            driver : _Driver = get_driver(self.driver_instance_cache, device)
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            if driver is None:
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                msg = ERROR_MISSING_DRIVER.format(device_uuid=str(device_uuid))
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)

            errors = manage_kpi_method(request, driver, self.monitoring_loops)
            if len(errors) > 0: raise OperationFailedException('MonitorDeviceKpi', extra_details=errors)

Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
            return Empty()
        finally:
            self.mutex_queues.signal_done(device_uuid)