diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index a39e72c58a3db6262bb1053267d0395cd2de40df..992b11cf3edaaed8211cd70a08452cd091cdf19a 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -17,7 +17,8 @@ from typing import Optional from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.method_wrappers.ServiceExceptions import ( AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException, - OperationFailedException) + OperationFailedException +) from common.proto.context_pb2 import ( Connection, ConstraintActionEnum, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, TopologyId @@ -42,8 +43,8 @@ from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler from .tools.GeodesicDistance import gps_distance from .tools.OpticalTools import ( - add_lightpath, delete_lightpath, adapt_reply, get_device_name_from_uuid, get_optical_band,refresh_opticalcontroller, - DelFlexLightpath + add_lightpath, delete_lightpath, adapt_reply, get_device_name_from_uuid, + get_optical_band, refresh_opticalcontroller, DelFlexLightpath ) @@ -304,7 +305,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): LOGGER.debug('Using existing optical band') else: LOGGER.debug('Using existing optical band') - + if reply_txt is not None: optical_reply = adapt_reply( devices, _service, reply_json, context_uuid_x, topology_uuid_x, optical_band_txt @@ -344,10 +345,10 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request))) # pylint: disable=no-member service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL - + if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY: service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE - + context_client.SetService(service) if is_deployed_te() and service.service_type == ServiceTypeEnum.SERVICETYPE_TE: @@ -358,12 +359,12 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): return Empty() if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY: - params={ - "src":None, - "dst":None, - "bitrate":None, - 'ob_id':None, - 'flow_id':None + params = { + "src" : None, + "dst" : None, + "bitrate" : None, + 'ob_id' : None, + 'flow_id' : None } devs = [] @@ -383,30 +384,30 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): bitrate = int(float(constraint.custom.constraint_value)) break - bitrate = int( - float(service.service_constraints[0].custom.constraint_value)) + bitrate = int(float( + service.service_constraints[0].custom.constraint_value + )) if len(service.service_config.config_rules) > 0: c_rules_dict = json.loads( service.service_config.config_rules[0].custom.resource_value) - ob_id=None - flow_id=None + ob_id = None + flow_id = None if "ob_id" in c_rules_dict: - ob_id=c_rules_dict["ob_id"] + ob_id = c_rules_dict["ob_id"] if ("flow_id" in c_rules_dict): flow_id = c_rules_dict["flow_id"] #if ("ob_id" in c_rules_dict): # ob_id = c_rules_dict["ob_id"] - params['bitrate']=bitrate - params['dst']=dst - params['src']=src - params['ob_id']=ob_id - params['flow_id']=flow_id - - + params['bitrate'] = bitrate + params['dst' ] = dst + params['src' ] = src + params['ob_id' ] = ob_id + params['flow_id'] = flow_id + tasks_scheduler = TasksScheduler(self.service_handler_factory) - tasks_scheduler.compose_from_optical_service(service, params=params,is_delete=True ) + tasks_scheduler.compose_from_optical_service(service, params=params, is_delete=True) tasks_scheduler.execute_all() return Empty() diff --git a/src/service/service/task_scheduler/TaskExecutor.py b/src/service/service/task_scheduler/TaskExecutor.py index d58029afe35b19fd9fc948adfc04d1d81a647aa1..ac06e321d342fc7cddb9f54958a38ed87067c922 100644 --- a/src/service/service/task_scheduler/TaskExecutor.py +++ b/src/service/service/task_scheduler/TaskExecutor.py @@ -21,7 +21,7 @@ from typing import List from common.proto.qkd_app_pb2 import QKDAppStatusEnum from common.proto.context_pb2 import ( Connection, ConnectionId, Device, DeviceDriverEnum, DeviceId, Service, ServiceId, - OpticalConfig, OpticalConfigId,ConnectionList,ServiceConfigRule + OpticalConfig, OpticalConfigId, ConnectionList, ServiceConfigRule ) from common.proto.qkd_app_pb2 import App, AppId from common.proto.context_pb2 import ContextId @@ -39,7 +39,6 @@ from service.service.service_handler_api.Exceptions import ( from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory, get_service_handler_class from service.service.tools.ObjectKeys import get_connection_key, get_device_key, get_service_key, get_qkd_app_key from service.service.tools.object_uuid import opticalconfig_get_uuid -from common.DeviceTypes import DeviceTypeEnum if TYPE_CHECKING: from service.service.service_handler_api._ServiceHandler import _ServiceHandler @@ -128,47 +127,44 @@ class TaskExecutor: self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) # New function Andrea for Optical Devices - def configure_optical_device(self, device : Device, settings : str, flows : list, is_opticalband : bool): + def configure_optical_device( + self, device : Device, settings : str, flows : list, is_opticalband : bool + ): device_key = get_device_key(device.device_id) optical_config_id = OpticalConfigId() optical_config_id.opticalconfig_uuid = opticalconfig_get_uuid(device.device_id) - + optical_config = OpticalConfig() - + setting = settings.value if settings else "" - config_type=None + config_type = None try: - result = self._context_client.SelectOpticalConfig(optical_config_id) - + new_config = json.loads(result.config) if 'type' in new_config: config_type=new_config['type'] if config_type == 'optical-transponder': setting['status']='ENABLED' if result is not None : - new_config["new_config"] = setting - new_config["is_opticalband"] = is_opticalband new_config["flow"] = flows result.config = json.dumps(new_config) optical_config.CopyFrom(result) - self._device_client.ConfigureOpticalDevice(optical_config) self._store_grpc_object(CacheableObjectType.DEVICE, device_key, device) - except Exception as e: LOGGER.info("error in configure_optical_device %s",e) - - # Deconfiguring Optical Devices ( CNIT ) - def deconfigure_optical_device(self, device : Device, channel_indexes :list,is_opticalband:bool,dev_flow:list): - - errors=[] - dev_flow=dev_flow + + # Deconfiguring Optical Devices ( CNIT ) + def deconfigure_optical_device( + self, device : Device, channel_indexes : list, is_opticalband : bool, dev_flow : list + ): + errors = [] flows = [] - indexes={} + indexes = {} new_config = {} optical_config_id = OpticalConfigId() optical_config_id.opticalconfig_uuid = opticalconfig_get_uuid(device.device_id) @@ -177,24 +173,20 @@ class TaskExecutor: for index in channel_indexes : flows.append(index) # if Roadm the channel index is the flow_id ,or ob_id - else : - for index in channel_indexes: - if ( not is_opticalband): - indexes["flow_id"]=index - else : - indexes["ob_id"]=index - - + else: + for index in channel_indexes: + if not is_opticalband: + indexes["flow_id"] = index + else: + indexes["ob_id"] = index + try: - result = self._context_client.SelectOpticalConfig(optical_config_id) # for extractor in device service to extract the index , dummy data for freq and band required - indexes["frequency"]=None - indexes["band"]=None - if result is not None : + indexes["frequency"] = None + indexes["band"] = None + if result is not None: new_config = json.loads(result.config) - - new_config["new_config"]=indexes new_config["flow"] = flows if len(flows)>0 else dev_flow new_config["is_opticalband"] = is_opticalband @@ -203,56 +195,47 @@ class TaskExecutor: # new_optical_config.opticalconfig_id.CopyFrom (optical_config_id) # new_optical_config.device_id.CopyFrom(device.device_id) self._device_client.DisableOpticalDevice(result) - - except Exception as e: errors.append(e) LOGGER.info("error in deconfigure_optical_device %s",e) return errors - - - def delete_setting (self,service_id:ServiceId, config_key:str,config_value:str) : + + def delete_setting( + self, service_id : ServiceId, config_key : str, config_value : str + ): service_configRule = ServiceConfigRule() service_configRule.service_id.CopyFrom( service_id) - service_configRule.configrule_custom.resource_key=config_key - service_configRule.configrule_custom.resource_value=config_value + service_configRule.configrule_custom.resource_key = config_key + service_configRule.configrule_custom.resource_value = config_value try: ctxt = ContextClient() ctxt.connect() ctxt.DeleteServiceConfigRule(service_configRule) ctxt.close() except Exception as e : - LOGGER.info("error in delete service config rule %s",e) - - - - - def check_service_for_media_channel (self,connections:ConnectionList,item)->bool: - service=item - if (isinstance(item,ServiceId)): - service=self.get_service(item) + LOGGER.info("error in delete service config rule %s",e) + + def check_service_for_media_channel(self, connections : ConnectionList, item) -> bool: + service = item + if (isinstance(item, ServiceId)): + service = self.get_service(item) class_service_handler = None service_handler_settings = {} for connection in connections.connections: connection_uuid=connection.connection_id.connection_uuid if class_service_handler is None: - class_service_handler=self.get_service_handler(connection, service,**service_handler_settings) - + class_service_handler = self.get_service_handler(connection, service,**service_handler_settings) if class_service_handler.check_media_channel(connection_uuid): return True - return False - - def check_connection_for_media_channel (self,connection:Connection,service:Service)->bool: + return False + def check_connection_for_media_channel( + self, connection : Connection, service : Service + ) -> bool: service_handler_settings = {} - connection_uuid=connection.connection_id.connection_uuid - service_handler_class=self.get_service_handler(connection, service,**service_handler_settings) - - + connection_uuid = connection.connection_id.connection_uuid + service_handler_class = self.get_service_handler(connection, service, **service_handler_settings) return service_handler_class.check_media_channel(connection_uuid) - - - def get_device_controller(self, device : Device) -> Optional[Device]: #json_controller = None @@ -403,4 +386,3 @@ class TaskExecutor: except Exception as e: LOGGER.error(f"Failed to delete QKD app with AppId {app_id}: {str(e)}") raise e - diff --git a/src/service/service/task_scheduler/TaskScheduler.py b/src/service/service/task_scheduler/TaskScheduler.py index 47b7b870ede9a049f09b656b559e9df7c5bff412..c2d8423156fd145a262108701860aeb17df3e6f9 100644 --- a/src/service/service/task_scheduler/TaskScheduler.py +++ b/src/service/service/task_scheduler/TaskScheduler.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import graphlib, logging, queue, time ,json +import graphlib, logging, queue, time from typing import TYPE_CHECKING, Dict, Tuple -from common.proto.context_pb2 import (Connection, ConnectionId, Service - , ServiceId, ServiceStatusEnum,ConnectionList - ,DeviceList) +from common.proto.context_pb2 import ( + Connection, ConnectionId, Service, ServiceId, ServiceStatusEnum, ConnectionList +) from common.proto.pathcomp_pb2 import PathCompReply from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient @@ -86,12 +86,16 @@ class TasksScheduler: self._dag.add(service_delete_key, service_removing_key) return service_removing_key, service_delete_key - def _optical_service_remove(self, service_id : ServiceId,has_media_channel:bool,has_optical_band=True) -> Tuple[str, str]: - + def _optical_service_remove( + self, service_id : ServiceId, has_media_channel : bool, has_optical_band = True + ) -> Tuple[str, str]: service_removing_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE + )) - service_delete_key = self._add_task_if_not_exists(Task_OpticalServiceDelete(self._executor, service_id,has_media_channel,has_optical_band)) + service_delete_key = self._add_task_if_not_exists(Task_OpticalServiceDelete( + self._executor, service_id, has_media_channel, has_optical_band + )) # deleting a service requires the service is in removing state self._dag.add(service_delete_key, service_removing_key) @@ -114,57 +118,57 @@ class TasksScheduler: return connection_configure_key def _connection_deconfigure(self, connection_id : ConnectionId, service_id : ServiceId) -> str: - connection_deconfigure_key = self._add_task_if_not_exists(Task_ConnectionDeconfigure( - self._executor, connection_id)) + self._executor, connection_id + )) # the connection deconfiguration depends on its connection's service being in removing state service_pending_removal_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL)) + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL + )) self._dag.add(connection_deconfigure_key, service_pending_removal_key) # the connection's service depends on the connection deconfiguration to transition to delete service_delete_key = self._add_task_if_not_exists(Task_ServiceDelete( - self._executor, service_id)) + self._executor, service_id + )) self._dag.add(service_delete_key, connection_deconfigure_key) return connection_deconfigure_key - - - - def _optical_connection_deconfigure(self, connection_id : ConnectionId, service_id : ServiceId,has_media_channel:bool,has_optical_band=True) -> str: + def _optical_connection_deconfigure( + self, connection_id : ConnectionId, service_id : ServiceId, + has_media_channel : bool, has_optical_band = True + ) -> str: connection_deconfigure_key = self._add_task_if_not_exists(Task_OpticalConnectionDeconfigure( - self._executor, connection_id,has_media_channel=has_media_channel)) + self._executor, connection_id, has_media_channel=has_media_channel + )) # the connection deconfiguration depends on its connection's service being in removing state service_pending_removal_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE + )) self._dag.add(connection_deconfigure_key, service_pending_removal_key) - - + service_delete_key = self._add_task_if_not_exists(Task_OpticalServiceDelete( - self._executor, service_id,has_media_channel,has_optical_band)) + self._executor, service_id, has_media_channel, has_optical_band + )) self._dag.add(service_delete_key, connection_deconfigure_key) return connection_deconfigure_key - def _optical_service_config_remove(self, connection_id : ConnectionId - , service_id : ServiceId - ) -> str: - + def _optical_service_config_remove( + self, connection_id : ConnectionId, service_id : ServiceId + ) -> str: service_config_key = self._add_task_if_not_exists(Task_OpticalServiceConfigDelete( - self._executor,connection_id, service_id - - )) - + self._executor, connection_id, service_id + )) service_pending_removal_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) - + self._executor, service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE + )) self._dag.add(service_config_key, service_pending_removal_key) return service_config_key - - + def compose_from_pathcompreply(self, pathcomp_reply : PathCompReply, is_delete : bool = False) -> None: t0 = time.time() include_service = self._service_remove if is_delete else self._service_create @@ -185,29 +189,32 @@ class TasksScheduler: t1 = time.time() LOGGER.debug('[compose_from_pathcompreply] elapsed_time: {:f} sec'.format(t1-t0)) - - - def check_service_for_media_channel (self,connections:ConnectionList,item)->Tuple[bool,bool]: - service=item - has_media_channel=False - has_optical_band=False - if (isinstance(item,ServiceId)): - service=self._executor.get_service(item) + + def check_service_for_media_channel( + self, connections : ConnectionList, item + )->Tuple[bool, bool]: + service = item + has_media_channel = False + has_optical_band = False + if isinstance(item, ServiceId): + service = self._executor.get_service(item) class_service_handler = None service_handler_settings = {} for connection in connections.connections: connection_uuid = connection.connection_id.connection_uuid.uuid if class_service_handler is None: - class_service_handler=self._executor.get_service_handler(connection, service,**service_handler_settings) - + class_service_handler = self._executor.get_service_handler( + connection, service, **service_handler_settings + ) if class_service_handler.check_media_channel(connection_uuid): - has_media_channel= True + has_media_channel = True else : - has_optical_band=True - - return (has_media_channel,has_optical_band) - - def compose_from_optical_service(self, service : Service,params:dict, is_delete : bool = False) -> None: + has_optical_band = True + return (has_media_channel, has_optical_band) + + def compose_from_optical_service( + self, service : Service, params : dict, is_delete : bool = False + ) -> None: t0 = time.time() include_service = self._optical_service_remove if is_delete else self._service_create include_connection = self._optical_connection_deconfigure if is_delete else self._connection_configure @@ -216,125 +223,103 @@ class TasksScheduler: explored_items = set() pending_items_to_explore = queue.Queue() pending_items_to_explore.put(service) - has_media_channel=None - has_optical_band=None - reply=None - code=0 - reply_not_allowed="DELETE_NOT_ALLOWED" + has_media_channel = None + has_optical_band = None + reply = None + code = 0 + reply_not_allowed = "DELETE_NOT_ALLOWED" while not pending_items_to_explore.empty(): try: item = pending_items_to_explore.get(block=False) - except queue.Empty: break - + if isinstance(item, Service): - str_item_key = grpc_message_to_json_string(item.service_id) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item.service_id) - has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item.service_id) - + has_media_channel, has_optical_band = self.check_service_for_media_channel( + connections=connections, item=item.service_id + ) + if len(service.service_config.config_rules) > 0: - - - reply,code = delete_lightpath( - params['src'] - ,params ['dst'] - , params['bitrate'] - , params['ob_id'] - ,delete_band=not has_media_channel - , flow_id= params['flow_id'] - ) - - - if code == 400 and reply_not_allowed in reply : + reply, code = delete_lightpath( + params['src'], params ['dst'], params['bitrate'], params['ob_id'], + delete_band=not has_media_channel, flow_id= params['flow_id'] + ) + + if code == 400 and reply_not_allowed in reply: MSG = 'Deleteion for the service is not Allowed , Served Lightpaths is not empty' raise Exception(MSG) - include_service(item.service_id,has_media_channel=has_media_channel,has_optical_band=has_optical_band) + include_service( + item.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band + ) self._add_service_to_executor_cache(item) - - + for connection in connections.connections: - self._add_connection_to_executor_cache(connection) - pending_items_to_explore.put(connection) + self._add_connection_to_executor_cache(connection) + pending_items_to_explore.put(connection) explored_items.add(str_item_key) - elif isinstance(item, ServiceId): - - if code == 400 and reply_not_allowed in reply:break - + if code == 400 and reply_not_allowed in reply: break str_item_key = grpc_message_to_json_string(item) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item) - has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item) - - - include_service(item,has_media_channel=has_media_channel,has_optical_band=has_optical_band) - - + has_media_channel, has_optical_band = self.check_service_for_media_channel( + connections=connections, item=item + ) + include_service( + item, has_media_channel=has_media_channel, has_optical_band=has_optical_band + ) self._executor.get_service(item) - + for connection in connections.connections: - self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) - - - - explored_items.add(str_item_key) elif isinstance(item, Connection): - - if code == 400 and reply_not_allowed in reply:break - str_item_key = grpc_message_to_json_string(item.connection_id) if str_item_key in explored_items: continue - - - connection_key = include_connection(item.connection_id, item.service_id,has_media_channel=has_media_channel,has_optical_band=has_optical_band) + connection_key = include_connection( + item.connection_id, item.service_id, has_media_channel=has_media_channel, + has_optical_band=has_optical_band + ) self._add_connection_to_executor_cache(connection) - - if include_service_config is not None : - connections_list = ConnectionList() - connections_list.connections.append(item) - - is_media_channel,_=self.check_service_for_media_channel(connections=connections_list,item=service) - - if has_optical_band and is_media_channel: - include_service_config(item.connection_id - , item.service_id - - ) - + if include_service_config is not None : + connections_list = ConnectionList() + connections_list.connections.append(item) + is_media_channel, _ = self.check_service_for_media_channel( + connections=connections_list, item=service + ) + if has_optical_band and is_media_channel: + include_service_config( + item.connection_id, item.service_id + ) self._executor.get_service(item.service_id) pending_items_to_explore.put(item.service_id) - - + for sub_service_id in item.sub_service_ids: - _,service_key_done = include_service(sub_service_id,has_media_channel=has_media_channel,has_optical_band=has_optical_band) + _,service_key_done = include_service( + sub_service_id, has_media_channel=has_media_channel, + has_optical_band=has_optical_band + ) self._executor.get_service(sub_service_id) self._dag.add(service_key_done, connection_key) pending_items_to_explore.put(sub_service_id) - - explored_items.add(str_item_key) - - else: MSG = 'Unsupported item {:s}({:s})' raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item))) - + t1 = time.time() LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) - def compose_from_service(self, service : Service, is_delete : bool = False) -> None: t0 = time.time() include_service = self._service_remove if is_delete else self._service_create @@ -357,12 +342,9 @@ class TasksScheduler: include_service(item.service_id) self._add_service_to_executor_cache(item) connections = self._context_client.ListConnections(item.service_id) - for connection in connections.connections: - self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) - explored_items.add(str_item_key) elif isinstance(item, ServiceId): @@ -375,7 +357,6 @@ class TasksScheduler: for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) - explored_items.add(str_item_key) elif isinstance(item, Connection): @@ -384,7 +365,6 @@ class TasksScheduler: connection_key = include_connection(item.connection_id, item.service_id) self._add_connection_to_executor_cache(connection) - self._executor.get_service(item.service_id) pending_items_to_explore.put(item.service_id) @@ -413,16 +393,20 @@ class TasksScheduler: self._add_connection_to_executor_cache(new_connection) service_updating_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_UPDATING)) + self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_UPDATING + )) old_connection_deconfigure_key = self._add_task_if_not_exists(Task_ConnectionDeconfigure( - self._executor, old_connection.connection_id)) + self._executor, old_connection.connection_id + )) new_connection_configure_key = self._add_task_if_not_exists(Task_ConnectionConfigure( - self._executor, new_connection.connection_id)) + self._executor, new_connection.connection_id + )) service_active_key = self._add_task_if_not_exists(Task_ServiceSetStatus( - self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE)) + self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_ACTIVE + )) # the old connection deconfiguration depends on service being in updating state self._dag.add(old_connection_deconfigure_key, service_updating_key) diff --git a/src/service/service/tools/OpticalTools.py b/src/service/service/tools/OpticalTools.py index f66c64f97db828cd348ffdd9b5bebaf65b84aecd..f62c60ac9d256e277478f1703ed75e992961a4da 100644 --- a/src/service/service/tools/OpticalTools.py +++ b/src/service/service/tools/OpticalTools.py @@ -65,13 +65,6 @@ def get_optical_controller_base_url() -> str: log.debug('Optical Controller: base_url={:s}'.format(str(base_url))) return base_url -opticalcontrollers_url = find_environment_variables([ - VAR_NAME_OPTICAL_CTRL_HOST, - VAR_NAME_OPTICAL_CTRL_PORT, -]) -OPTICAL_IP = opticalcontrollers_url.get(VAR_NAME_OPTICAL_CTRL_HOST) -OPTICAL_PORT = opticalcontrollers_url.get(VAR_NAME_OPTICAL_CTRL_PORT) - def get_uuids_from_names(devices: List[Device], device_name: str, port_name: str): device_uuid = "" @@ -98,6 +91,7 @@ def get_names_from_uuids(devices: List[Device], device_uuid: str, port_uuid: str return device_name, port_name return "", "" + def get_device_name_from_uuid(devices: List[Device], device_uuid: str): device_name = "" @@ -107,19 +101,17 @@ def get_device_name_from_uuid(devices: List[Device], device_uuid: str): return device_name return "" -def refresh_opticalcontroller (topology_id:dict): - topo_id_str= topology_id["topology_uuid"]["uuid"] - cxt_id_str=topology_id["context_id"]["context_uuid"]["uuid"] +def refresh_opticalcontroller(topology_id : dict): + topo_id_str = topology_id["topology_uuid"]["uuid"] + cxt_id_str = topology_id["context_id"]["context_uuid"]["uuid"] headers = {"Content-Type": "application/json"} - urlx = f"http://{OPTICAL_IP}:{OPTICAL_PORT}/OpticalTFS/GetTopology/{cxt_id_str}/{topo_id_str}" - + base_url = get_optical_controller_base_url() + urlx = "{:s}/GetTopology/{:s}/{:s}".format(base_url, cxt_id_str, topo_id_str) res = requests.get(urlx, headers=headers) + if res is not None: + log.debug(f"DELETELIGHTPATH Response {res}") - if (res is not None ): - log.debug(f"DELETELIGHTPATH Response {res} ") - - def add_lightpath(src, dst, bitrate, bidir, ob_band) -> str: if not TESTING: @@ -160,31 +152,29 @@ def get_optical_band(idx) -> str: return optical_band_uni_txt -def delete_lightpath( src, dst, bitrate, ob_id,delete_band,flow_id=None) -> str: +def delete_lightpath( src, dst, bitrate, ob_id, delete_band, flow_id=None) -> str: reply = "200" - delete_band=1 if delete_band else 0 + delete_band = 1 if delete_band else 0 + base_url = get_optical_controller_base_url() if not TESTING: - urlx=None if flow_id is not None: - urlx = "http://{}:{}/OpticalTFS/DelFlexLightpath/{}/{}/{}/{}/{}".format(OPTICAL_IP, OPTICAL_PORT, src, dst, bitrate, ob_id,flow_id) + urlx = "{:s}/DelFlexLightpath/{}/{}/{}/{}/{}".format(base_url, src, dst, bitrate, ob_id, flow_id) else : - urlx = "http://{}:{}/OpticalTFS/DelOpticalBand/{}/{}/{}".format(OPTICAL_IP, OPTICAL_PORT, src, dst, ob_id) - + urlx = "{:s}/DelOpticalBand/{}/{}/{}".format(base_url, src, dst, ob_id) headers = {"Content-Type": "application/json"} r = requests.delete(urlx, headers=headers) reply = r.text code = r.status_code + return (reply, code) - return (reply,code) - -def DelFlexLightpath (flow_id,src,dst,bitrate,o_band_id): +def DelFlexLightpath (flow_id, src, dst, bitrate, o_band_id): reply = "200" + base_url = get_optical_controller_base_url() if not TESTING: - urlx = "http://{}:{}/OpticalTFS/DelFlexLightpath/{}/{}/{}/{}/{}".format(OPTICAL_IP, OPTICAL_PORT, flow_id, src, dst, bitrate,o_band_id) - + urlx = "{:s}/DelFlexLightpath/{}/{}/{}/{}/{}".format(base_url, flow_id, src, dst, bitrate, o_band_id) headers = {"Content-Type": "application/json"} r = requests.delete(urlx, headers=headers) - reply = r.text + reply = r.text return reply def get_lightpaths() -> str: @@ -196,10 +186,12 @@ def get_lightpaths() -> str: reply = r.text return reply - def adapt_reply(devices, service, reply_json, context_id, topology_id, optical_band_txt) -> PathCompReply: opt_reply = PathCompReply() - topo = TopologyId(context_id=ContextId(context_uuid=Uuid(uuid=context_id)),topology_uuid=Uuid(uuid=topology_id)) + topo = TopologyId( + context_id=ContextId(context_uuid=Uuid(uuid=context_id)), + topology_uuid=Uuid(uuid=topology_id) + ) #add optical band connection first rules_ob= [] ob_id = 0 @@ -210,6 +202,8 @@ def adapt_reply(devices, service, reply_json, context_id, topology_id, optical_b ob_id = r["parent_opt_band"] if "bidir" in r.keys(): bidir_f = r["bidir"] + else: + bidir_f = False if optical_band_txt != "": ob_json = json.loads(optical_band_txt) ob = ob_json @@ -232,7 +226,15 @@ def adapt_reply(devices, service, reply_json, context_id, topology_id, optical_b #+1 is added to avoid overlap in the WSS of MGONs lf = int(int(freq)-int(bx/2))+1 uf = int(int(freq)+int(bx/2)) - val_ob = {"band_type": band_type, "low-freq": lf, "up-freq": uf, "frequency": freq, "band": bx, "ob_id": ob_id, "bidir": bidir_f} + val_ob = { + "band_type" : band_type, + "low-freq" : lf, + "up-freq" : uf, + "frequency" : freq, + "band" : bx, + "ob_id" : ob_id, + "bidir" : bidir_f + } rules_ob.append(ConfigRule_Custom(resource_key="/settings-ob_{}".format(uuuid_x), resource_value=json.dumps(val_ob))) bidir_ob = ob["bidir"] for devxb in ob["flows"].keys(): @@ -320,12 +322,13 @@ def adapt_reply(devices, service, reply_json, context_id, topology_id, optical_b end_point = EndPointId(topology_id=topo, device_id=DeviceId(device_uuid=Uuid(uuid=d)), endpoint_uuid=Uuid(uuid=p)) connection_f.path_hops_endpoint_ids.add().CopyFrom(end_point) else: - log.info("no map device port for device {} port {}".format(devx, out_end_point_b)) + log.info("no map device port for device {} port {}".format(devx, out_end_point_b)) + #check that list of endpoints is not empty if connection_ob is not None and len(connection_ob.path_hops_endpoint_ids) == 0: log.debug("deleting empty optical-band connection") opt_reply.connections.remove(connection_ob) - + #inizialize custom optical parameters band = r["band"] if "band" in r else None op_mode = r["op-mode"] if "op-mode" in r else None @@ -348,19 +351,18 @@ def adapt_reply(devices, service, reply_json, context_id, topology_id, optical_b service.service_config.config_rules.add().CopyFrom(rule) if len(rules_ob) > 0: - for rulex in rules_ob: - rule_ob = ConfigRule(action=ConfigActionEnum.CONFIGACTION_SET, custom=rulex) - service.service_config.config_rules.add().CopyFrom(rule_ob) - + for rulex in rules_ob: + rule_ob = ConfigRule(action=ConfigActionEnum.CONFIGACTION_SET, custom=rulex) + service.service_config.config_rules.add().CopyFrom(rule_ob) + opt_reply.services.add().CopyFrom(service) - - return opt_reply + return opt_reply -def add_service_to_reply(reply : PathCompReply, service : Service)-> Service: +def add_service_to_reply(reply : PathCompReply, service : Service) -> Service: service_x = reply.services.add() service_x.CopyFrom(service) return service_x - -def add_connection_to_reply(reply : PathCompReply)-> Connection: + +def add_connection_to_reply(reply : PathCompReply) -> Connection: conn = reply.connections.add() return conn diff --git a/src/service/service/tools/object_uuid.py b/src/service/service/tools/object_uuid.py index e9454048a6afaa0684f282bcd2dfc3e671af4bf6..cbcbc71b9b9d4edeb31858e3c22cd0af508210b9 100644 --- a/src/service/service/tools/object_uuid.py +++ b/src/service/service/tools/object_uuid.py @@ -50,4 +50,4 @@ def opticalconfig_get_uuid(device_id: DeviceId) -> str: raise InvalidArgumentsException([ ('DeviceId ', device_id), - ], extra_details=['device_id is required to produce a OpticalConfig UUID']) \ No newline at end of file + ], extra_details=['device_id is required to produce a OpticalConfig UUID'])