From 9f4f27cdccb3ec947c04f6aaa9166df8f52cb406 Mon Sep 17 00:00:00 2001 From: gifrerenom <lluis.gifre@cttc.es> Date: Tue, 20 Feb 2024 15:57:07 +0000 Subject: [PATCH] Service component: - Pre-merge cleanup --- src/service/requirements.in | 2 +- .../service/ServiceServiceServicerImpl.py | 107 ++++++------- .../service_handlers/oc/OCServiceHandler.py | 144 ++---------------- .../service/service_handlers/oc/OCTools.py | 9 +- .../service/task_scheduler/TaskExecutor.py | 81 +++------- 5 files changed, 92 insertions(+), 251 deletions(-) diff --git a/src/service/requirements.in b/src/service/requirements.in index ea4d41cc4..86720da19 100644 --- a/src/service/requirements.in +++ b/src/service/requirements.in @@ -19,4 +19,4 @@ netaddr==0.9.0 networkx==2.6.3 pydot==1.4.2 redis==4.1.2 -requests==2.31.0 +requests==2.27.1 diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index f11a60ade..b06295dee 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -248,65 +248,66 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member + if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY: - context_id_x = json_context_id(DEFAULT_CONTEXT_NAME) - topology_id_x = json_topology_id( - DEFAULT_TOPOLOGY_NAME, context_id_x) - topology_details = context_client.GetTopologyDetails( - TopologyId(**topology_id_x)) - # devices = get_devices_in_topology(context_client, TopologyId(**topology_id_x), ContextId(**context_id_x)) - devices = topology_details.devices - context_uuid_x = topology_details.topology_id.context_id.context_uuid.uuid - topology_uuid_x = topology_details.topology_id.topology_uuid.uuid - devs = [] - ports = [] - for endpoint_id in service.service_endpoint_ids: - devs.append(endpoint_id.device_id.device_uuid.uuid) - ports.append(endpoint_id.endpoint_uuid.uuid) - src = devs[0] - dst = devs[1] - bidir = None - ob_band = None - bitrate = 100 - for constraint in service.service_constraints: - if "bandwidth" in constraint.custom.constraint_type: - bitrate = int(float(constraint.custom.constraint_value)) - elif "bidirectionality" in constraint.custom.constraint_type: - bidir = int(constraint.custom.constraint_value) - elif "optical-band-width" in constraint.custom.constraint_type: - ob_band = int(constraint.custom.constraint_value) - - - # to get the reply form the optical module - reply_txt = add_lightpath(src, dst, bitrate, bidir, ob_band) - - # reply with 2 transponders and 2 roadms - reply_json = json.loads(reply_txt) - optical_band_txt = "" - if "new_optical_band" in reply_json.keys(): - if reply_json["new_optical_band"] == 1: - if reply_json["parent_opt_band"]: - if "parent_opt_band" in reply_json.keys(): - parent_ob = reply_json["parent_opt_band"] - LOGGER.debug('Parent optical-band={}'.format(parent_ob)) - optical_band_txt = get_optical_band(parent_ob) - LOGGER.debug('optical-band details={}'.format(optical_band_txt)) - else: - LOGGER.debug('expected optical band not found') + context_id_x = json_context_id(DEFAULT_CONTEXT_NAME) + topology_id_x = json_topology_id( + DEFAULT_TOPOLOGY_NAME, context_id_x) + topology_details = context_client.GetTopologyDetails( + TopologyId(**topology_id_x)) + devices = topology_details.devices + context_uuid_x = topology_details.topology_id.context_id.context_uuid.uuid + topology_uuid_x = topology_details.topology_id.topology_uuid.uuid + devs = [] + ports = [] + for endpoint_id in service.service_endpoint_ids: + devs.append(endpoint_id.device_id.device_uuid.uuid) + ports.append(endpoint_id.endpoint_uuid.uuid) + src = devs[0] + dst = devs[1] + bidir = None + ob_band = None + bitrate = 100 + for constraint in service.service_constraints: + if "bandwidth" in constraint.custom.constraint_type: + bitrate = int(float(constraint.custom.constraint_value)) + elif "bidirectionality" in constraint.custom.constraint_type: + bidir = int(constraint.custom.constraint_value) + elif "optical-band-width" in constraint.custom.constraint_type: + ob_band = int(constraint.custom.constraint_value) + + # to get the reply form the optical module + reply_txt = add_lightpath(src, dst, bitrate, bidir, ob_band) + + # reply with 2 transponders and 2 roadms + reply_json = json.loads(reply_txt) + optical_band_txt = "" + if "new_optical_band" in reply_json.keys(): + if reply_json["new_optical_band"] == 1: + if reply_json["parent_opt_band"]: + if "parent_opt_band" in reply_json.keys(): + parent_ob = reply_json["parent_opt_band"] + LOGGER.debug('Parent optical-band={}'.format(parent_ob)) + optical_band_txt = get_optical_band(parent_ob) + LOGGER.debug('optical-band details={}'.format(optical_band_txt)) else: LOGGER.debug('expected optical band not found') else: - LOGGER.debug('Using existing optical band') + LOGGER.debug('expected optical band not found') else: LOGGER.debug('Using existing optical band') - if reply_txt is not None: - optical_reply = adapt_reply(devices, _service, reply_json, context_uuid_x, topology_uuid_x, optical_band_txt) - LOGGER.debug('optical_reply={:s}'.format( - grpc_message_to_json_string(optical_reply))) - - tasks_scheduler.compose_from_pathcompreply( - optical_reply, is_delete=False) - + else: + LOGGER.debug('Using existing optical band') + if reply_txt is not None: + optical_reply = adapt_reply( + devices, _service, reply_json, context_uuid_x, topology_uuid_x, optical_band_txt + ) + LOGGER.debug('optical_reply={:s}'.format( + grpc_message_to_json_string(optical_reply))) + + tasks_scheduler.compose_from_pathcompreply( + optical_reply, is_delete=False) + if num_disjoint_paths is None or num_disjoint_paths in {0, 1}: pathcomp_request.shortest_path.Clear() # pylint: disable=no-member else: diff --git a/src/service/service/service_handlers/oc/OCServiceHandler.py b/src/service/service/service_handlers/oc/OCServiceHandler.py index 89e9d23ed..8ae426393 100644 --- a/src/service/service/service_handlers/oc/OCServiceHandler.py +++ b/src/service/service/service_handlers/oc/OCServiceHandler.py @@ -15,20 +15,19 @@ import json, logging from typing import Any, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method -from common.proto.context_pb2 import ConfigRule, DeviceId, Service +from common.proto.context_pb2 import ConfigRule, DeviceId, EndPointId, Service from common.tools.object_factory.Device import json_device_id from common.type_checkers.Checkers import chk_type from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching from service.service.service_handler_api._ServiceHandler import _ServiceHandler from service.service.service_handler_api.SettingsHandler import SettingsHandler from service.service.task_scheduler.TaskExecutor import TaskExecutor -from .ConfigRules import setup_config_rules, teardown_config_rules - -from common.proto.context_pb2 import EndPointId +#from .ConfigRules import setup_config_rules, teardown_config_rules from .OCTools import convert_endpoints_to_flows, handle_flows_names + LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'l3nm_emulated'}) +METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'oc'}) class OCServiceHandler(_ServiceHandler): def __init__( # pylint: disable=super-init-not-called @@ -38,73 +37,6 @@ class OCServiceHandler(_ServiceHandler): self.__task_executor = task_executor self.__settings_handler = SettingsHandler(service.service_config, **settings) - - - - ''' - @metered_subclass_method(METRICS_POOL) - def SetEndpoint( - self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None - ) -> List[Union[bool, Exception]]: - - chk_type('endpoints', endpoints, list) - if len(endpoints) == 0: return [] - - service_uuid = self.__service.service_id.service_uuid.uuid - if self.__settings_handler.get('/settings-ob_{}'.format(connection_uuid)): - settings = self.__settings_handler.get('/settings-ob_{}'.format(connection_uuid)) - else: - settings = self.__settings_handler.get('/settings') - LOGGER.info("AAAAAAAAAAAAAAAAAAAA settings={}".format(settings)) - - settings = self.__settings_handler.get('/settings') - #new structure - #in, dev, out, topo(opt) - entries = List[Tuple[str, str, str Optional[str]]] - entry_tuple = device_uuid, endpoint_uuid, topology_uuid - entries.append(endpoint_id_tuple) - for i in range (1, len(endpoints)): - endpoint_x = endpoints[i] - dev_x = endpoint_x[0] - if_x = endpoint_x[1] - - - - - - results = [] - for endpoint in endpoints: - try: - device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) - - device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) - endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) - endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) - endpoint_name = endpoint_obj.name - - json_config_rules = setup_config_rules( - service_uuid, connection_uuid, device_uuid, endpoint_uuid, endpoint_name, - settings, endpoint_settings) - LOGGER.info("Start configuring device %s",settings) - if (settings): - self.__task_executor.configure_optical_device(device_obj,settings) - if len(json_config_rules) > 0: - LOGGER.info("Start configuring device") - del device_obj.device_config.config_rules[:] - for json_config_rule in json_config_rules: - device_obj.device_config.config_rules.append(ConfigRule(**json_config_rule)) - self.__task_executor.configure_optical_device(device_obj) - #self.__task_executor.configure_device(device_obj) - - results.append(True) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint))) - results.append(e) - - return results - ''' - - @metered_subclass_method(METRICS_POOL) def SetEndpoint( self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None @@ -121,68 +53,29 @@ class OCServiceHandler(_ServiceHandler): else: settings = self.__settings_handler.get('/settings') - LOGGER.debug("Andrea111 settings={}".format(settings)) + LOGGER.debug("settings={}".format(settings)) # settings = self.__settings_handler.get('/settings') #flow is the new variable that stores input-output relationship flows = convert_endpoints_to_flows(endpoints) #handled_flows=handle_flows_names(flows=flows,task_executor=self.__task_executor) - LOGGER.debug("AndreaXXX dict of flows= {}".format(flows)) + LOGGER.debug("dict of flows= {}".format(flows)) #LOGGER.info("Handled Flows %s",handled_flows) results = [] #new cycle for setting optical devices - for device_uuid in flows.keys(): + for device_uuid, dev_flows in flows.items(): try: - dev_flows = flows[device_uuid] device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) - LOGGER.debug("Andrea567 device_obj={}".format(device_obj)) - ''' - #to be imported in the device handler - endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) - endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) - endpoint_name = endpoint_obj.name - ''' + LOGGER.debug("device_obj={}".format(device_obj)) if (settings): - LOGGER.debug("Andrea234 settings={}".format(settings)) + LOGGER.debug("settings={}".format(settings)) self.__task_executor.configure_optical_device(device_obj, settings, dev_flows, is_opticalband) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to configure Device({:s})'.format(str(device_uuid))) results.append(e) - - ''' - for endpoint in endpoints: - try: - device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) - - device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) - endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) - endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) - endpoint_name = endpoint_obj.name - - # json_config_rules = setup_config_rules( - # service_uuid, connection_uuid, device_uuid, endpoint_uuid, endpoint_name, - # settings, endpoint_settings) - - if (settings): - LOGGER.debug("Andrea234 settings={}".format(settings)) - self.__task_executor.configure_optical_device(device_obj,settings,handled_flows,is_opticalband) - #we don't use config_rules - # if len(json_config_rules) > 0: - # LOGGER.debug("VBNMHGStart configuring device") - # del device_obj.device_config.config_rules[:] - # for json_config_rule in json_config_rules: - # device_obj.device_config.config_rules.append(ConfigRule(**json_config_rule)) - # self.__task_executor.configure_optical_device(device_obj) - #self.__task_executor.configure_device(device_obj) - - results.append(True) - except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint))) - results.append(e) - ''' return results @@ -193,29 +86,14 @@ class OCServiceHandler(_ServiceHandler): chk_type('endpoints', endpoints, list) if len(endpoints) == 0: return [] - service_uuid = self.__service.service_id.service_uuid.uuid - settings = self.__settings_handler.get('/settings') + # TODO: to be checked and elaborated results = [] for endpoint in endpoints: try: device_uuid, endpoint_uuid = get_device_endpoint_uuids(endpoint) - device_obj = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid))) - endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) - endpoint_settings = self.__settings_handler.get_endpoint_settings(device_obj, endpoint_obj) - endpoint_name = endpoint_obj.name - - json_config_rules = teardown_config_rules( - service_uuid, connection_uuid, device_uuid, endpoint_uuid, endpoint_name, - settings, endpoint_settings) - - if len(json_config_rules) > 0: - del device_obj.device_config.config_rules[:] - for json_config_rule in json_config_rules: - device_obj.device_config.config_rules.append(ConfigRule(**json_config_rule)) - self.__task_executor.configure_device(device_obj) - + self.__task_executor.configure_device(device_obj) results.append(True) except Exception as e: # pylint: disable=broad-except LOGGER.exception('Unable to DeleteEndpoint({:s})'.format(str(endpoint))) diff --git a/src/service/service/service_handlers/oc/OCTools.py b/src/service/service/service_handlers/oc/OCTools.py index c8bddefb9..f40cb65be 100644 --- a/src/service/service/service_handlers/oc/OCTools.py +++ b/src/service/service/service_handlers/oc/OCTools.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from service.service.service_handler_api.Tools import get_device_endpoint_uuids, get_endpoint_matching from typing import Dict, Any, List, Optional, Tuple import logging @@ -114,14 +115,10 @@ def convert_endpoints_to_flows(endpoints : List[Tuple[str, str, Optional[str]]]) def get_device_endpint_name (endpoint_uuid:str,device_uuid:str,task_executor)->Tuple: - - - device_obj = task_executor.get_device(DeviceId(**json_device_id(device_uuid))) endpoint_obj = get_endpoint_matching(device_obj, endpoint_uuid) endpoint_name = endpoint_obj.name - - return (device_obj.name,endpoint_name) + return (device_obj.name, endpoint_name) def handle_flows_names (task_executor,flows:dict)->Dict : new_flows={} @@ -140,4 +137,4 @@ def handle_flows_names (task_executor,flows:dict)->Dict : if (device_name not in new_flows): new_flows[device_name]=[] new_flows[device_name].append((source_port,destination_port)) - return new_flows \ No newline at end of file + return new_flows diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index 3718c01f4..6d24da4c0 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -16,7 +16,9 @@ import logging #, json from enum import Enum from typing import TYPE_CHECKING, Any, Dict, Optional, Union from common.method_wrappers.ServiceExceptions import NotFoundException -from common.proto.context_pb2 import Connection, ConnectionId, Device, DeviceDriverEnum, DeviceId, Service, ServiceId,MyConfig,MyConfigId +from common.proto.context_pb2 import ( + Connection, ConnectionId, Device, DeviceDriverEnum, DeviceId, Service, ServiceId, MyConfig, MyConfigId +) from common.tools.context_queries.Connection import get_connection_by_id from common.tools.context_queries.Device import get_device from common.tools.context_queries.Service import get_service_by_id @@ -113,69 +115,32 @@ class TaskExecutor: self._device_client.ConfigureDevice(device) self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) - # New function Andrea for Optical Devices - def configure_optical_device(self,device:Device,settings:str,flows:list,is_opticalband:bool): - - + # New function Andrea for Optical Devices + def configure_optical_device(self, device : Device, settings : str, flows : list, is_opticalband : bool): device_key = get_device_key(device.device_id) - myid=MyConfigId() - myid.myconfig_uuid=device.device_id.device_uuid.uuid - myConfig=MyConfig() + myid = MyConfigId() + myid.myconfig_uuid = device.device_id.device_uuid.uuid + myConfig = MyConfig() - setting =settings.value if settings else "" + setting = settings.value if settings else "" - new_config={} + new_config = {} try: - result=self._context_client.SelectMyConfig(myid) - new_config=eval(result.config) - LOGGER.info("result %s",result) - if result is not None : - - new_config["new_config"]=setting - new_config["is_opticalband"]=is_opticalband - new_config["flow"]=flows - result.config = str(new_config) - myConfig.CopyFrom(result) - - self._device_client.ConfigureOpticalDevice(myConfig) - - self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) + result = self._context_client.SelectMyConfig(myid) + new_config = eval(result.config) + LOGGER.info("result %s",result) + if result is not None : + new_config["new_config"]=setting + new_config["is_opticalband"]=is_opticalband + new_config["flow"]=flows + result.config = str(new_config) + myConfig.CopyFrom(result) + self._device_client.ConfigureOpticalDevice(myConfig) + + self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) except Exception as e: LOGGER.debug("error in config my config %s",e) - - ''' - # For Optical Devices - def configure_optical_device (self,device:Device,settings:str,flows:dict,is_opticalband:bool) : - - - device_key = get_device_key(device.device_id) - myid=MyConfigId() - myid.myconfig_uuid=device.device_id.device_uuid.uuid - myConfig=MyConfig() - - setting =settings.value if settings else "" - - new_config={} - try: - result=self._context_client.SelectMyConfig(myid) - new_config=eval(result.config) - - if result is not None : - - new_config["new_config"]=setting - new_config["is_opticalband"]=is_opticalband - new_config["flow"]=flows - result.config = str(new_config) - myConfig.CopyFrom(result) - - self._device_client.ConfigureOpticalDevice(myConfig) - - self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) - except Exception as e: - LOGGER.debug("error in config my config %s",e) - ''' - - + def get_device_controller(self, device : Device) -> Optional[Device]: #json_controller = None #for config_rule in device.device_config.config_rules: -- GitLab