Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, json, logging, random, uuid
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
OperationFailedException)
from common.proto.context_pb2 import (
Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum)
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string

Lluis Gifre Renom
committed
from context.client.ContextClient import ContextClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.tools.ConnectionToString import connection_to_string
from service.client.TEServiceClient import TEServiceClient

Lluis Gifre Renom
committed
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .task_scheduler.TaskScheduler import TasksScheduler
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:

Lluis Gifre Renom
committed
self.service_handler_factory = service_handler_factory
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:

Lluis Gifre Renom
committed
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))

Lluis Gifre Renom
committed
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))

Lluis Gifre Renom
committed
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)

Lluis Gifre Renom
committed
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
# check that service does not exist
context_client = ContextClient()
current_service = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))

Lluis Gifre Renom
committed
# just create the service in the Context database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
# Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
# being modified.
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Identify service constraints
num_disjoint_paths = None
is_diverse = False

Lluis Gifre Renom
committed
gps_location_aware = False
for constraint in request.service_constraints:
constraint_kind = constraint.WhichOneof('constraint')
if constraint_kind == 'sla_availability':
num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
elif constraint_kind == 'custom':
if constraint.custom.constraint_type == 'diversity': is_diverse = True
elif constraint_kind == 'endpoint_location':
location = constraint.endpoint_location.location
if location.WhichOneof('location') == 'gps_position': gps_location_aware = True
else:
continue
LOGGER.debug('num_disjoint_paths={:s}'.format(str(num_disjoint_paths)))
LOGGER.debug('is_diverse={:s}'.format(str(is_diverse)))

Lluis Gifre Renom
committed
LOGGER.debug('gps_location_aware={:s}'.format(str(gps_location_aware)))
if _service is not None and num_disjoint_paths is None and not is_diverse and gps_location_aware:
LOGGER.debug(' Removing previous service')
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(_service, is_delete=True)
tasks_scheduler.execute_all()
service.CopyFrom(request if _service is None else _service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN: # pylint: disable=no-member
service.service_type = request.service_type # pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service:
context_client.SetService(request)
te_service_client = TEServiceClient()
service_status = te_service_client.RequestLSP(service)
if service_status.service_status == ServiceStatusEnum.SERVICESTATUS_ACTIVE:
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE
service_id = context_client.SetService(_service)
return service_id
else:
MSG = 'RequestLSP for Service({:s}) returned ServiceStatus({:s})'
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
service_key = '{:s}/{:s}'.format(context_uuid, service_uuid)
str_service_status = ServiceStatusEnum.Name(service_status.service_status)
raise Exception(MSG.format(service_key, str_service_status))
del service.service_endpoint_ids[:] # pylint: disable=no-member
for endpoint_id in request.service_endpoint_ids:
service.service_endpoint_ids.add().CopyFrom(endpoint_id) # pylint: disable=no-member
device_list = context_client.ListDevices(Empty())
LOGGER.debug('[before] request={:s}'.format(grpc_message_to_json_string(request)))
for constraint in request.service_constraints:
if constraint.action == ConstraintActionEnum.CONSTRAINTACTION_UNDEFINED:
# Field action is new; assume if not set, it means SET
constraint.action = ConstraintActionEnum.CONSTRAINTACTION_SET
if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
if constraint.WhichOneof('constraint') != 'endpoint_location': continue
if constraint.endpoint_location.HasField('endpoint_id'): continue
service_location = constraint.endpoint_location.location
distances = {}
for device in device_list.devices:
for endpoint in device.device_endpoints:
if not endpoint.endpoint_location.HasField('gps_position'): continue
distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
distances[distance] = endpoint.endpoint_id
closer_endpoint_id = distances[min(distances)]
constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)
service_endpoint_ids = [
endpoint_id.endpoint_uuid
for endpoint_id in service.service_endpoint_ids
]
if closer_endpoint_id not in service_endpoint_ids:
service.service_endpoint_ids.append(closer_endpoint_id)
LOGGER.debug('[after] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.debug('[after] service={:s}'.format(grpc_message_to_json_string(service)))
del service.service_constraints[:] # pylint: disable=no-member
for constraint in request.service_constraints:
service.service_constraints.add().CopyFrom(constraint) # pylint: disable=no-member
del service.service_config.config_rules[:] # pylint: disable=no-member
for config_rule in request.service_config.config_rules:
service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
service_id_with_uuids = context_client.SetService(service)
# PathComp requires endpoints, constraints and config rules
service_with_uuids = get_service_by_id(
context_client, service_id_with_uuids, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
num_expected_endpoints = num_disjoint_paths * 2
tasks_scheduler = TasksScheduler(self.service_handler_factory)
if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member
if num_disjoint_paths is None or num_disjoint_paths in {0, 1}:
pathcomp_request.shortest_path.Clear() # pylint: disable=no-member
else:
pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
# Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
# the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
# executed) to implement the requested create/update operation.
tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
# Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
# being modified.
service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
context_client.SetService(service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service
te_service_client = TEServiceClient()
te_service_client.DeleteLSP(request)
context_client.RemoveService(request)
# Normal service
# Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
# TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
# tasks to be executed) to implement the requested delete operation.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(service, is_delete=True)
tasks_scheduler.execute_all()

Lluis Gifre Renom
committed
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
if len(request.service_endpoint_ids) > 0:
raise NotImplementedException('update-endpoints')
if len(request.service_constraints) > 0:
raise NotImplementedException('update-constraints')
if len(request.service_config.config_rules) > 0:
raise NotImplementedException('update-config-rules')
context_client = ContextClient()
updated_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if updated_service is None:
raise NotFoundException('service', request.service_id.service_uuid.uuid)
# pylint: disable=no-member
if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
raise InvalidArgumentException(
'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
)
# Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
# being modified.
# pylint: disable=no-member
updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING
# Update endpoints
# pylint: disable=no-member
#del updated_service.service_endpoint_ids[:]
#updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)
# Update constraints
# pylint: disable=no-member
#del updated_service.service_constraints[:]
#updated_service.service_constraints.extend(request.service_constraints)
# Update config rules
# pylint: disable=no-member
#del updated_service.service_config.config_rules[:]
#updated_service.service_config.config_rules.extend(request.service_config.config_rules)
updated_service_id_with_uuids = context_client.SetService(updated_service)
# PathComp requires endpoints, constraints and config rules
updated_service_with_uuids = get_service_by_id(
context_client, updated_service_id_with_uuids, rw_copy=True,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Get active connection
connections = context_client.ListConnections(updated_service_id_with_uuids)
if len(connections.connections) == 0:
MSG = 'Service({:s}) has no connections'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_extra_details = MSG.format(str_service_id)
raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
if len(connections.connections) > 1:
MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
num_connections = len(connections.connections)
str_connections = grpc_message_to_json_string(connections)
str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)
old_connection = connections.connections[0]
if len(old_connection.sub_service_ids) > 0:
MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
str_connection = grpc_message_to_json_string(old_connection)
str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)
# Find alternative connections
# pylint: disable=no-member
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(updated_service_with_uuids)
#pathcomp_request.k_disjoint_path.num_disjoint = 100
pathcomp_request.k_shortest_path.k_inspection = 100
pathcomp_request.k_shortest_path.k_return = 3
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
if len(pathcomp_reply.services) == 0:
MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)
if len(pathcomp_reply.services) > 1:
MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)
if len(pathcomp_reply.connections) == 0:
MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)
# compute a string representing the old connection
str_old_connection = connection_to_string(old_connection)
LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))
for candidate_new_connection in pathcomp_reply.connections:
str_candidate_new_connection = connection_to_string(candidate_new_connection)
if str_candidate_new_connection == str_old_connection: continue
candidate_new_connections.append(candidate_new_connection)
if len(candidate_new_connections) == 0:
MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_old_connection = grpc_message_to_json_string(old_connection)
extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
raise OperationFailedException('no-new-path-found', extra_details=extra_details)
str_candidate_new_connections = [
grpc_message_to_json_string(candidate_new_connection)
for candidate_new_connection in candidate_new_connections
]
LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
new_connection = random.choice(candidate_new_connections)
LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))
# Change UUID of new connection to prevent collisions
tmp_connection = Connection()
tmp_connection.CopyFrom(new_connection)
tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
new_connection = tmp_connection
# Feed TaskScheduler with the service to update, the old connection to
# deconfigure and the new connection to configure. It will produce a
# schedule of tasks (an ordered list of tasks to be executed) to
# implement the requested changes.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_service_connection_update(
updated_service_with_uuids, old_connection, new_connection)
tasks_scheduler.execute_all()