Loading src/service/service/task_scheduler/TaskScheduler.py +106 −117 Original line number Diff line number Diff line Loading @@ -309,7 +309,6 @@ class TasksScheduler: return (has_media_channel, has_optical_band) def compose_from_opticalcontroller_reply( self, pathcomp_reply : PathCompReply, is_delete : bool = False ) -> None: Loading @@ -322,25 +321,24 @@ class TasksScheduler: has_optical_band = None for service in pathcomp_reply.services: connections = self._context_client.ListConnections(service.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=service.service_id ) include_service(service.service_id , has_media_channel=has_media_channel, has_optical_band=has_optical_band) include_service( service.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_service_to_executor_cache(service) for connection in connections.connections: self._add_connection_to_executor_cache(connection) for connection in pathcomp_reply.connections: connection_key = include_connection( connection.connection_id, connection.service_id, has_media_channel=has_media_channel, connection.connection_id, connection.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) Loading @@ -358,7 +356,6 @@ class TasksScheduler: LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_service_expansion( self, service :Service, ) -> None: Loading @@ -374,16 +371,15 @@ class TasksScheduler: 'service not found ' ]) connections = self._context_client.ListConnections(service.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=service.service_id ) _,service_key_done= include_service(service.service_id , has_media_channel=has_media_channel, has_optical_band=has_optical_band) _,service_key_done= include_service( service.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) # self._add_service_to_executor_cache(service) service_updating_key = self._add_task_if_not_exists(Task_ServiceSetStatus( self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_UPDATING Loading @@ -391,7 +387,8 @@ class TasksScheduler: self._add_service_to_executor_cache(service) for connection in connections.connections: connection_key = include_connection( connection.connection_id, connection.service_id, has_media_channel=has_media_channel, connection.connection_id, connection.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) Loading @@ -399,7 +396,9 @@ class TasksScheduler: t1 = time.time() LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_optical_service(self, service : Service, params:dict, is_delete : bool = False) -> None: def compose_from_optical_service( self, service : Service, params:dict, is_delete : bool = False ) -> None: t0 = time.time() include_service = self._optical_service_remove if is_delete else self._service_create include_connection = self._optical_connection_deconfigure if is_delete else self._connection_configure Loading @@ -416,16 +415,16 @@ class TasksScheduler: while not pending_items_to_explore.empty(): try: item = pending_items_to_explore.get(block=False) except queue.Empty: break if isinstance(item, Service): str_item_key = grpc_message_to_json_string(item.service_id) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item.service_id) has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=item.service_id ) oc_type = 1 if len(service.service_config.config_rules) > 0: for constraint in service.service_constraints: Loading @@ -433,50 +432,47 @@ class TasksScheduler: oc_type = OpticalServiceType(str(constraint.custom.constraint_value)) if oc_type == 2: reply, code = delete_lightpath( params['src'] , params ['dst'] , params['bitrate'] , flow_id= params['flow_id'] params['src'], params ['dst'], params['bitrate'], flow_id= params['flow_id'] ) else: reply, code = DelFlexLightpath( params['src'] , params ['dst'] , params['bitrate'] , params['ob_id'] , flow_id=params['flow_id'] params['src'], params ['dst'], params['bitrate'], params['ob_id'], flow_id=params['flow_id'] ) if code == 400 and reply_not_allowed in reply : MSG = 'Deleteion for the service is not Allowed , Served Lightpaths is not empty' raise Exception(MSG) include_service(item.service_id,has_media_channel=has_media_channel,has_optical_band=has_optical_band) include_service( item.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_service_to_executor_cache(item) for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) explored_items.add(str_item_key) elif isinstance(item, ServiceId): if code == 400 and reply_not_allowed in reply: break str_item_key = grpc_message_to_json_string(item) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item) has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item) include_service(item,has_media_channel=has_media_channel,has_optical_band=has_optical_band) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=item ) include_service( item, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._executor.get_service(item) for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) Loading @@ -487,48 +483,42 @@ class TasksScheduler: str_item_key = grpc_message_to_json_string(item.connection_id) if str_item_key in explored_items: continue connection_key = include_connection( item.connection_id , item.service_id , has_media_channel=has_media_channel , has_optical_band=has_optical_band ) connection_key = include_connection( item.connection_id, item.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) if include_service_config is not None : connections_list = ConnectionList() connections_list.connections.append(item) is_media_channel,_=self.check_service_for_media_channel(connections=connections_list,item=service) is_media_channel,_=self.check_service_for_media_channel( connections=connections_list,item=service ) if has_optical_band and is_media_channel: include_service_config(item.connection_id , item.service_id ) include_service_config(item.connection_id, item.service_id) self._executor.get_service(item.service_id) pending_items_to_explore.put(item.service_id) for sub_service_id in item.sub_service_ids: _,service_key_done = include_service(sub_service_id ,has_media_channel=has_media_channel ,has_optical_band=has_optical_band) _,service_key_done = include_service( sub_service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._executor.get_service(sub_service_id) self._dag.add(service_key_done, connection_key) pending_items_to_explore.put(sub_service_id) explored_items.add(str_item_key) else: MSG = 'Unsupported item {:s}({:s})' raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item))) t1 = time.time() LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_from_optical_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_service(self, service : Service, is_delete : bool = False) -> None: Loading Loading @@ -641,13 +631,12 @@ class TasksScheduler: self._dag.add(service_active_key, new_connection_configure_key) t1 = time.time() LOGGER.debug('[RRERRSF] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_optical_service_update] elapsed_time: {:f} sec'.format(t1-t0)) def compose_optical_service_update1( self, service : Service, old_connection : Connection, new_connection : Connection ) -> None: LOGGER.debug('[ttttttttttt] elapsed_time inside update1') t0 = time.time() self._add_service_to_executor_cache(service) Loading Loading @@ -686,7 +675,7 @@ class TasksScheduler: self._dag.add(service_active_key, new_connection_configure_key) t1 = time.time() LOGGER.debug('[RRERRSF] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_optical_service_update1] elapsed_time: {:f} sec'.format(t1-t0)) def compose_service_connection_update( Loading Loading @@ -743,7 +732,7 @@ class TasksScheduler: task = self._tasks.get(task_key) succeeded = True if dry_run else task.execute() results.append(succeeded) LOGGER.debug('[execute_allRRRR] finished task {:s} ; succeeded={:s}'.format(str_task_name, str(succeeded))) LOGGER.debug('[execute_all] finished task {:s} ; succeeded={:s}'.format(str_task_name, str(succeeded))) LOGGER.debug('[execute_all] results={:s}'.format(str(results))) return zip(ordered_task_keys, results) Loading
src/service/service/task_scheduler/TaskScheduler.py +106 −117 Original line number Diff line number Diff line Loading @@ -309,7 +309,6 @@ class TasksScheduler: return (has_media_channel, has_optical_band) def compose_from_opticalcontroller_reply( self, pathcomp_reply : PathCompReply, is_delete : bool = False ) -> None: Loading @@ -322,25 +321,24 @@ class TasksScheduler: has_optical_band = None for service in pathcomp_reply.services: connections = self._context_client.ListConnections(service.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=service.service_id ) include_service(service.service_id , has_media_channel=has_media_channel, has_optical_band=has_optical_band) include_service( service.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_service_to_executor_cache(service) for connection in connections.connections: self._add_connection_to_executor_cache(connection) for connection in pathcomp_reply.connections: connection_key = include_connection( connection.connection_id, connection.service_id, has_media_channel=has_media_channel, connection.connection_id, connection.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) Loading @@ -358,7 +356,6 @@ class TasksScheduler: LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_service_expansion( self, service :Service, ) -> None: Loading @@ -374,16 +371,15 @@ class TasksScheduler: 'service not found ' ]) connections = self._context_client.ListConnections(service.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=service.service_id ) _,service_key_done= include_service(service.service_id , has_media_channel=has_media_channel, has_optical_band=has_optical_band) _,service_key_done= include_service( service.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) # self._add_service_to_executor_cache(service) service_updating_key = self._add_task_if_not_exists(Task_ServiceSetStatus( self._executor, service.service_id, ServiceStatusEnum.SERVICESTATUS_UPDATING Loading @@ -391,7 +387,8 @@ class TasksScheduler: self._add_service_to_executor_cache(service) for connection in connections.connections: connection_key = include_connection( connection.connection_id, connection.service_id, has_media_channel=has_media_channel, connection.connection_id, connection.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) Loading @@ -399,7 +396,9 @@ class TasksScheduler: t1 = time.time() LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_optical_service(self, service : Service, params:dict, is_delete : bool = False) -> None: def compose_from_optical_service( self, service : Service, params:dict, is_delete : bool = False ) -> None: t0 = time.time() include_service = self._optical_service_remove if is_delete else self._service_create include_connection = self._optical_connection_deconfigure if is_delete else self._connection_configure Loading @@ -416,16 +415,16 @@ class TasksScheduler: while not pending_items_to_explore.empty(): try: item = pending_items_to_explore.get(block=False) except queue.Empty: break if isinstance(item, Service): str_item_key = grpc_message_to_json_string(item.service_id) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item.service_id) has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item.service_id) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=item.service_id ) oc_type = 1 if len(service.service_config.config_rules) > 0: for constraint in service.service_constraints: Loading @@ -433,50 +432,47 @@ class TasksScheduler: oc_type = OpticalServiceType(str(constraint.custom.constraint_value)) if oc_type == 2: reply, code = delete_lightpath( params['src'] , params ['dst'] , params['bitrate'] , flow_id= params['flow_id'] params['src'], params ['dst'], params['bitrate'], flow_id= params['flow_id'] ) else: reply, code = DelFlexLightpath( params['src'] , params ['dst'] , params['bitrate'] , params['ob_id'] , flow_id=params['flow_id'] params['src'], params ['dst'], params['bitrate'], params['ob_id'], flow_id=params['flow_id'] ) if code == 400 and reply_not_allowed in reply : MSG = 'Deleteion for the service is not Allowed , Served Lightpaths is not empty' raise Exception(MSG) include_service(item.service_id,has_media_channel=has_media_channel,has_optical_band=has_optical_band) include_service( item.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_service_to_executor_cache(item) for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) explored_items.add(str_item_key) elif isinstance(item, ServiceId): if code == 400 and reply_not_allowed in reply: break str_item_key = grpc_message_to_json_string(item) if str_item_key in explored_items: continue connections = self._context_client.ListConnections(item) has_media_channel,has_optical_band=self.check_service_for_media_channel(connections=connections,item=item) include_service(item,has_media_channel=has_media_channel,has_optical_band=has_optical_band) has_media_channel, has_optical_band = self.check_service_for_media_channel( connections=connections, item=item ) include_service( item, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._executor.get_service(item) for connection in connections.connections: self._add_connection_to_executor_cache(connection) pending_items_to_explore.put(connection) Loading @@ -487,48 +483,42 @@ class TasksScheduler: str_item_key = grpc_message_to_json_string(item.connection_id) if str_item_key in explored_items: continue connection_key = include_connection( item.connection_id , item.service_id , has_media_channel=has_media_channel , has_optical_band=has_optical_band ) connection_key = include_connection( item.connection_id, item.service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._add_connection_to_executor_cache(connection) if include_service_config is not None : connections_list = ConnectionList() connections_list.connections.append(item) is_media_channel,_=self.check_service_for_media_channel(connections=connections_list,item=service) is_media_channel,_=self.check_service_for_media_channel( connections=connections_list,item=service ) if has_optical_band and is_media_channel: include_service_config(item.connection_id , item.service_id ) include_service_config(item.connection_id, item.service_id) self._executor.get_service(item.service_id) pending_items_to_explore.put(item.service_id) for sub_service_id in item.sub_service_ids: _,service_key_done = include_service(sub_service_id ,has_media_channel=has_media_channel ,has_optical_band=has_optical_band) _,service_key_done = include_service( sub_service_id, has_media_channel=has_media_channel, has_optical_band=has_optical_band ) self._executor.get_service(sub_service_id) self._dag.add(service_key_done, connection_key) pending_items_to_explore.put(sub_service_id) explored_items.add(str_item_key) else: MSG = 'Unsupported item {:s}({:s})' raise Exception(MSG.format(type(item).__name__, grpc_message_to_json_string(item))) t1 = time.time() LOGGER.debug('[compose_from_service] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_from_optical_service] elapsed_time: {:f} sec'.format(t1-t0)) def compose_from_service(self, service : Service, is_delete : bool = False) -> None: Loading Loading @@ -641,13 +631,12 @@ class TasksScheduler: self._dag.add(service_active_key, new_connection_configure_key) t1 = time.time() LOGGER.debug('[RRERRSF] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_optical_service_update] elapsed_time: {:f} sec'.format(t1-t0)) def compose_optical_service_update1( self, service : Service, old_connection : Connection, new_connection : Connection ) -> None: LOGGER.debug('[ttttttttttt] elapsed_time inside update1') t0 = time.time() self._add_service_to_executor_cache(service) Loading Loading @@ -686,7 +675,7 @@ class TasksScheduler: self._dag.add(service_active_key, new_connection_configure_key) t1 = time.time() LOGGER.debug('[RRERRSF] elapsed_time: {:f} sec'.format(t1-t0)) LOGGER.debug('[compose_optical_service_update1] elapsed_time: {:f} sec'.format(t1-t0)) def compose_service_connection_update( Loading Loading @@ -743,7 +732,7 @@ class TasksScheduler: task = self._tasks.get(task_key) succeeded = True if dry_run else task.execute() results.append(succeeded) LOGGER.debug('[execute_allRRRR] finished task {:s} ; succeeded={:s}'.format(str_task_name, str(succeeded))) LOGGER.debug('[execute_all] finished task {:s} ; succeeded={:s}'.format(str_task_name, str(succeeded))) LOGGER.debug('[execute_all] results={:s}'.format(str(results))) return zip(ordered_task_keys, results)