Loading src/service/client/TEServiceClient.py 0 → 100644 +67 −0 Original line number Original line Diff line number Diff line # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatus from common.proto.te_pb2_grpc import TEServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class TEServiceClient: def __init__(self, host=None, port=None): if not host: host = get_service_host(ServiceNameEnum.TE) if not port: port = get_service_port_grpc(ServiceNameEnum.TE) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None self.connect() LOGGER.debug('Channel created') def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = TEServiceStub(self.channel) def close(self): if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR def RequestLSP(self, request : Service) -> ServiceStatus: LOGGER.debug('RequestLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RequestLSP(request) LOGGER.debug('RequestLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def UpdateLSP(self, request : ServiceId) -> ServiceStatus: LOGGER.debug('UpdateLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.UpdateLSP(request) LOGGER.debug('UpdateLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def DeleteLSP(self, request : ServiceId) -> Empty: LOGGER.debug('DeleteLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.DeleteLSP(request) LOGGER.debug('DeleteLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response src/service/service/ServiceServiceServicerImpl.py +73 −30 Original line number Original line Diff line number Diff line Loading @@ -27,6 +27,7 @@ from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_s from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string from service.service.tools.ConnectionToString import connection_to_string from service.client.TEServiceClient import TEServiceClient from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler from .task_scheduler.TaskScheduler import TasksScheduler from .tools.GeodesicDistance import gps_distance from .tools.GeodesicDistance import gps_distance Loading Loading @@ -168,7 +169,35 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): context_client, service_id_with_uuids, rw_copy=False, context_client, service_id_with_uuids, rw_copy=False, include_config_rules=True, include_constraints=True, include_endpoint_ids=True) include_config_rules=True, include_constraints=True, include_endpoint_ids=True) num_disjoint_paths = 0 if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: # TE service: te_service_client = TEServiceClient() # Note: TE should update the service in Context. # By now we update it manually for debugging purposes service = Service() service.CopyFrom(request) service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED context_client.SetService(request) service_status = te_service_client.RequestLSP(service) if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE: _service : Optional[Service] = get_service(context_client, request.service_id) service = Service() service.CopyFrom(_service) service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE context_client.SetService(service) else: MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})' context_uuid = request.service_id.context_id.context_uuid.uuid service_uuid = request.service_id.service_uuid.uuid service_key = '{:s}/{:s}'.format(context_uuid, service_uuid) str_service_status = ServiceStatusEnum.Name(service_status.service_status) raise Exception(MSG.format(service_key, str_service_status)) else: # Normal service: num_disjoint_paths = None for constraint in request.service_constraints: for constraint in request.service_constraints: if constraint.WhichOneof('constraint') == 'sla_availability': if constraint.WhichOneof('constraint') == 'sla_availability': num_disjoint_paths = constraint.sla_availability.num_disjoint_paths num_disjoint_paths = constraint.sla_availability.num_disjoint_paths Loading @@ -177,6 +206,11 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths num_expected_endpoints = num_disjoint_paths * 2 num_expected_endpoints = num_disjoint_paths * 2 tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request = PathCompRequest() Loading @@ -187,11 +221,11 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): else: else: pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) pathcomp = PathCompClient() pathcomp = PathCompClient() pathcomp_reply = pathcomp.Compute(pathcomp_request) pathcomp_reply = pathcomp.Compute(pathcomp_request) pathcomp.close() pathcomp.close() LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be Loading @@ -199,6 +233,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) tasks_scheduler.execute_all() tasks_scheduler.execute_all() return service_with_uuids.service_id return service_with_uuids.service_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) Loading @@ -213,12 +248,20 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL context_client.SetService(service) context_client.SetService(service) if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: # TE service te_service_client = TEServiceClient() te_service_client.DeleteLSP(request) context_client.RemoveService(request) else: # Normal service # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of # tasks to be executed) to implement the requested delete operation. # tasks to be executed) to implement the requested delete operation. tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler.compose_from_service(service, is_delete=True) tasks_scheduler.compose_from_service(service, is_delete=True) tasks_scheduler.execute_all() tasks_scheduler.execute_all() return Empty() return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) Loading Loading
src/service/client/TEServiceClient.py 0 → 100644 +67 −0 Original line number Original line Diff line number Diff line # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatus from common.proto.te_pb2_grpc import TEServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class TEServiceClient: def __init__(self, host=None, port=None): if not host: host = get_service_host(ServiceNameEnum.TE) if not port: port = get_service_port_grpc(ServiceNameEnum.TE) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None self.stub = None self.connect() LOGGER.debug('Channel created') def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = TEServiceStub(self.channel) def close(self): if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR def RequestLSP(self, request : Service) -> ServiceStatus: LOGGER.debug('RequestLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RequestLSP(request) LOGGER.debug('RequestLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def UpdateLSP(self, request : ServiceId) -> ServiceStatus: LOGGER.debug('UpdateLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.UpdateLSP(request) LOGGER.debug('UpdateLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def DeleteLSP(self, request : ServiceId) -> Empty: LOGGER.debug('DeleteLSP request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.DeleteLSP(request) LOGGER.debug('DeleteLSP result: {:s}'.format(grpc_message_to_json_string(response))) return response
src/service/service/ServiceServiceServicerImpl.py +73 −30 Original line number Original line Diff line number Diff line Loading @@ -27,6 +27,7 @@ from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_s from context.client.ContextClient import ContextClient from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string from service.service.tools.ConnectionToString import connection_to_string from service.client.TEServiceClient import TEServiceClient from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler from .task_scheduler.TaskScheduler import TasksScheduler from .tools.GeodesicDistance import gps_distance from .tools.GeodesicDistance import gps_distance Loading Loading @@ -168,7 +169,35 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): context_client, service_id_with_uuids, rw_copy=False, context_client, service_id_with_uuids, rw_copy=False, include_config_rules=True, include_constraints=True, include_endpoint_ids=True) include_config_rules=True, include_constraints=True, include_endpoint_ids=True) num_disjoint_paths = 0 if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: # TE service: te_service_client = TEServiceClient() # Note: TE should update the service in Context. # By now we update it manually for debugging purposes service = Service() service.CopyFrom(request) service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED context_client.SetService(request) service_status = te_service_client.RequestLSP(service) if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE: _service : Optional[Service] = get_service(context_client, request.service_id) service = Service() service.CopyFrom(_service) service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE context_client.SetService(service) else: MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})' context_uuid = request.service_id.context_id.context_uuid.uuid service_uuid = request.service_id.service_uuid.uuid service_key = '{:s}/{:s}'.format(context_uuid, service_uuid) str_service_status = ServiceStatusEnum.Name(service_status.service_status) raise Exception(MSG.format(service_key, str_service_status)) else: # Normal service: num_disjoint_paths = None for constraint in request.service_constraints: for constraint in request.service_constraints: if constraint.WhichOneof('constraint') == 'sla_availability': if constraint.WhichOneof('constraint') == 'sla_availability': num_disjoint_paths = constraint.sla_availability.num_disjoint_paths num_disjoint_paths = constraint.sla_availability.num_disjoint_paths Loading @@ -177,6 +206,11 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths num_expected_endpoints = num_disjoint_paths * 2 num_expected_endpoints = num_disjoint_paths * 2 tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request = PathCompRequest() Loading @@ -187,11 +221,11 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): else: else: pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) pathcomp = PathCompClient() pathcomp = PathCompClient() pathcomp_reply = pathcomp.Compute(pathcomp_request) pathcomp_reply = pathcomp.Compute(pathcomp_request) pathcomp.close() pathcomp.close() LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be Loading @@ -199,6 +233,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) tasks_scheduler.execute_all() tasks_scheduler.execute_all() return service_with_uuids.service_id return service_with_uuids.service_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) Loading @@ -213,12 +248,20 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL context_client.SetService(service) context_client.SetService(service) if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: # TE service te_service_client = TEServiceClient() te_service_client.DeleteLSP(request) context_client.RemoveService(request) else: # Normal service # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of # tasks to be executed) to implement the requested delete operation. # tasks to be executed) to implement the requested delete operation. tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler = TasksScheduler(self.service_handler_factory) tasks_scheduler.compose_from_service(service, is_delete=True) tasks_scheduler.compose_from_service(service, is_delete=True) tasks_scheduler.execute_all() tasks_scheduler.execute_all() return Empty() return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) Loading