diff --git a/src/service/client/TEServiceClient.py b/src/service/client/TEServiceClient.py new file mode 100644 index 0000000000000000000000000000000000000000..8bbbe8f9b4da33066961032623f5e377651eebaf --- /dev/null +++ b/src/service/client/TEServiceClient.py @@ -0,0 +1,67 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc +from common.proto.context_pb2 import Empty, Service, ServiceId, ServiceStatus +from common.proto.te_pb2_grpc import TEServiceStub +from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') + +class TEServiceClient: + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.TE) + if not port: port = get_service_port_grpc(ServiceNameEnum.TE) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) + LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug('Channel created') + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = TEServiceStub(self.channel) + + def close(self): + if self.channel is not None: self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def RequestLSP(self, request : Service) -> ServiceStatus: + LOGGER.debug('RequestLSP request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RequestLSP(request) + LOGGER.debug('RequestLSP result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def UpdateLSP(self, request : ServiceId) -> ServiceStatus: + LOGGER.debug('UpdateLSP request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.UpdateLSP(request) + LOGGER.debug('UpdateLSP result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteLSP(self, request : ServiceId) -> Empty: + LOGGER.debug('DeleteLSP request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.DeleteLSP(request) + LOGGER.debug('DeleteLSP result: {:s}'.format(grpc_message_to_json_string(response))) + return response diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 6d23fd4cee53d1639c9eefbd943d45dab497b253..54303448bff43757adab6df87c89abaec08dc8c8 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -26,6 +26,7 @@ from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_s from context.client.ContextClient import ContextClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string +from service.client.TEServiceClient import TEServiceClient from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler @@ -119,37 +120,71 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): context_client, service_id_with_uuids, rw_copy=False, include_config_rules=True, include_constraints=True, include_endpoint_ids=True) - num_disjoint_paths = 0 - for constraint in request.service_constraints: - if constraint.WhichOneof('constraint') == 'sla_availability': - num_disjoint_paths = constraint.sla_availability.num_disjoint_paths - break - - num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths - num_expected_endpoints = num_disjoint_paths * 2 + if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: + # TE service: + te_service_client = TEServiceClient() + + # Note: TE should update the service in Context. + # By now we update it manually for debugging purposes + service = Service() + service.CopyFrom(request) + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + context_client.SetService(request) + + service_status = te_service_client.RequestLSP(service) + + if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE: + _service : Optional[Service] = get_service(context_client, request.service_id) + service = Service() + service.CopyFrom(_service) + service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE + context_client.SetService(service) + else: + MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})' + context_uuid = request.service_id.context_id.context_uuid.uuid + service_uuid = request.service_id.service_uuid.uuid + service_key = '{:s}/{:s}'.format(context_uuid, service_uuid) + str_service_status = ServiceStatusEnum.Name(service_status.service_status) + raise Exception(MSG.format(service_key, str_service_status)) + else: + # Normal service: + num_disjoint_paths = None + for constraint in request.service_constraints: + if constraint.WhichOneof('constraint') == 'sla_availability': + num_disjoint_paths = constraint.sla_availability.num_disjoint_paths + break + + num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths + num_expected_endpoints = num_disjoint_paths * 2 tasks_scheduler = TasksScheduler(self.service_handler_factory) if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: pathcomp_request = PathCompRequest() pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member - if num_disjoint_paths is None or num_disjoint_paths in {0, 1}: - pathcomp_request.shortest_path.Clear() # pylint: disable=no-member - else: - pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member + tasks_scheduler = TasksScheduler(self.service_handler_factory) + if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints: + pathcomp_request = PathCompRequest() + pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member - LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) - pathcomp = PathCompClient() - pathcomp_reply = pathcomp.Compute(pathcomp_request) - pathcomp.close() - LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) + if num_disjoint_paths is None or num_disjoint_paths in {0, 1}: + pathcomp_request.shortest_path.Clear() # pylint: disable=no-member + else: + pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member - # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among - # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be - # executed) to implement the requested create/update operation. - tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) + LOGGER.info('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request))) + pathcomp = PathCompClient() + pathcomp_reply = pathcomp.Compute(pathcomp_request) + pathcomp.close() + LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply))) + + # Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among + # the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be + # executed) to implement the requested create/update operation. + tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False) + + tasks_scheduler.execute_all() - tasks_scheduler.execute_all() return service_with_uuids.service_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -164,12 +199,20 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL context_client.SetService(service) - # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. - # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of - # tasks to be executed) to implement the requested delete operation. - tasks_scheduler = TasksScheduler(self.service_handler_factory) - tasks_scheduler.compose_from_service(service, is_delete=True) - tasks_scheduler.execute_all() + if service.service_type == ServiceTypeEnum.SERVICETYPE_TE: + # TE service + te_service_client = TEServiceClient() + te_service_client.DeleteLSP(request) + context_client.RemoveService(request) + else: + # Normal service + # Feed TaskScheduler with this service and the sub-services and sub-connections related to this service. + # TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of + # tasks to be executed) to implement the requested delete operation. + tasks_scheduler = TasksScheduler(self.service_handler_factory) + tasks_scheduler.compose_from_service(service, is_delete=True) + tasks_scheduler.execute_all() + return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @@ -327,4 +370,4 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): updated_service_with_uuids, old_connection, new_connection) tasks_scheduler.execute_all() - return Empty() + return Empty() \ No newline at end of file