Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy, grpc, json, logging, random, uuid
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException,
OperationFailedException)
from common.proto.context_pb2 import (
Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum)
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.tools.context_queries.Service import get_service_by_id
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string

Lluis Gifre Renom
committed
from context.client.ContextClient import ContextClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.tools.ConnectionToString import connection_to_string

Lluis Gifre Renom
committed
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .task_scheduler.TaskScheduler import TasksScheduler
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:

Lluis Gifre Renom
committed
self.service_handler_factory = service_handler_factory
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:

Lluis Gifre Renom
committed
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))

Lluis Gifre Renom
committed
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))

Lluis Gifre Renom
committed
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)

Lluis Gifre Renom
committed
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
# check that service does not exist
context_client = ContextClient()
current_service = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))

Lluis Gifre Renom
committed
# just create the service in the Context database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
# Set service status to "SERVICESTATUS_PLANNED" to ensure rest of components are aware the service is
# being modified.
_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=False,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
service.CopyFrom(request if _service is None else _service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN: # pylint: disable=no-member
service.service_type = request.service_type # pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member
del service.service_endpoint_ids[:] # pylint: disable=no-member
for endpoint_id in request.service_endpoint_ids:
service.service_endpoint_ids.add().CopyFrom(endpoint_id) # pylint: disable=no-member
LOGGER.warning('[before] request={:s}'.format(grpc_message_to_json_string(request)))
for constraint in request.service_constraints:
if constraint.action == ConstraintActionEnum.CONSTRAINTACTION_UNDEFINED:
# Field action is new; assume if not set, it means SET
constraint.action = ConstraintActionEnum.CONSTRAINTACTION_SET
if constraint.action != ConstraintActionEnum.CONSTRAINTACTION_SET: continue
if constraint.WhichOneof('constraint') != 'endpoint_location': continue
if constraint.endpoint_location.HasField('endpoint_id'): continue
device_list = context_client.ListDevices(Empty())
service_location = constraint.endpoint_location.location
distances = {}
for device in device_list.devices:
for endpoint in device.device_endpoints:
if not endpoint.endpoint_location.HasField('gps_position'): continue
distance = gps_distance(service_location.gps_position, endpoint.endpoint_location.gps_position)
distances[distance] = endpoint.endpoint_id
closer_endpoint_id = distances[min(distances)]
constraint.endpoint_location.endpoint_id.CopyFrom(closer_endpoint_id)
service_endpoint_ids = [
endpoint_id.endpoint_uuid
for endpoint_id in service.service_endpoint_ids
]
if closer_endpoint_id not in service_endpoint_ids:
service.service_endpoint_ids.append(closer_endpoint_id)
LOGGER.warning('[after] request={:s}'.format(grpc_message_to_json_string(request)))
LOGGER.warning('[after] service={:s}'.format(grpc_message_to_json_string(service)))
del service.service_constraints[:] # pylint: disable=no-member
for constraint in request.service_constraints:
service.service_constraints.add().CopyFrom(constraint) # pylint: disable=no-member
del service.service_config.config_rules[:] # pylint: disable=no-member
for config_rule in request.service_config.config_rules:
service.service_config.config_rules.add().CopyFrom(config_rule) # pylint: disable=no-member
service_id_with_uuids = context_client.SetService(service)
# PathComp requires endpoints, constraints and config rules
service_with_uuids = get_service_by_id(
context_client, service_id_with_uuids, rw_copy=False,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
for constraint in request.service_constraints:
if constraint.WhichOneof('constraint') == 'sla_availability':
num_disjoint_paths = constraint.sla_availability.num_disjoint_paths
break
num_disjoint_paths = 1 if num_disjoint_paths is None or num_disjoint_paths == 0 else num_disjoint_paths
num_expected_endpoints = num_disjoint_paths * 2
tasks_scheduler = TasksScheduler(self.service_handler_factory)
if len(service_with_uuids.service_endpoint_ids) >= num_expected_endpoints:
pathcomp_request.services.append(service_with_uuids) # pylint: disable=no-member
if num_disjoint_paths is None or num_disjoint_paths in {0, 1}:
pathcomp_request.shortest_path.Clear() # pylint: disable=no-member
pathcomp_request.k_disjoint_path.num_disjoint = num_disjoint_paths # pylint: disable=no-member
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp_reply = pathcomp.Compute(pathcomp_request)
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
# Feed TaskScheduler with this path computation reply. TaskScheduler identifies inter-dependencies among
# the services and connections retrieved and produces a schedule of tasks (an ordered list of tasks to be
# executed) to implement the requested create/update operation.
tasks_scheduler.compose_from_pathcompreply(pathcomp_reply, is_delete=False)
tasks_scheduler.execute_all()
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
# Set service status to "SERVICESTATUS_PENDING_REMOVAL" to ensure rest of components are aware the service is
# being modified.
service : Optional[Service] = get_service_by_id(context_client, request, rw_copy=True)
if service is None: raise Exception('Service({:s}) not found'.format(grpc_message_to_json_string(request)))
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
context_client.SetService(service)
# Feed TaskScheduler with this service and the sub-services and sub-connections related to this service.
# TaskScheduler identifies inter-dependencies among them and produces a schedule of tasks (an ordered list of
# tasks to be executed) to implement the requested delete operation.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_from_service(service, is_delete=True)
tasks_scheduler.execute_all()

Lluis Gifre Renom
committed
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RecomputeConnections(self, request : Service, context : grpc.ServicerContext) -> Empty:
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
if len(request.service_endpoint_ids) > 0:
raise NotImplementedException('update-endpoints')
if len(request.service_constraints) > 0:
raise NotImplementedException('update-constraints')
if len(request.service_config.config_rules) > 0:
raise NotImplementedException('update-config-rules')
context_client = ContextClient()
updated_service : Optional[Service] = get_service_by_id(
context_client, request.service_id, rw_copy=True,
include_config_rules=False, include_constraints=False, include_endpoint_ids=False)
if updated_service is None:
raise NotFoundException('service', request.service_id.service_uuid.uuid)
# pylint: disable=no-member
if updated_service.service_type == ServiceTypeEnum.SERVICETYPE_UNKNOWN:
raise InvalidArgumentException(
'request.service_type', ServiceTypeEnum.Name(updated_service.service_type)
)
# Set service status to "SERVICESTATUS_UPDATING" to ensure rest of components are aware the service is
# being modified.
# pylint: disable=no-member
updated_service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_UPDATING
# Update endpoints
# pylint: disable=no-member
#del updated_service.service_endpoint_ids[:]
#updated_service.service_endpoint_ids.extend(request.service_endpoint_ids)
# Update constraints
# pylint: disable=no-member
#del updated_service.service_constraints[:]
#updated_service.service_constraints.extend(request.service_constraints)
# Update config rules
# pylint: disable=no-member
#del updated_service.service_config.config_rules[:]
#updated_service.service_config.config_rules.extend(request.service_config.config_rules)
updated_service_id_with_uuids = context_client.SetService(updated_service)
# PathComp requires endpoints, constraints and config rules
updated_service_with_uuids = get_service_by_id(
context_client, updated_service_id_with_uuids, rw_copy=True,
include_config_rules=True, include_constraints=True, include_endpoint_ids=True)
# Get active connection
connections = context_client.ListConnections(updated_service_id_with_uuids)
if len(connections.connections) == 0:
MSG = 'Service({:s}) has no connections'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_extra_details = MSG.format(str_service_id)
raise NotImplementedException('service-with-no-connections', extra_details=str_extra_details)
if len(connections.connections) > 1:
MSG = 'Service({:s}) has multiple ({:d}) connections({:s})'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
num_connections = len(connections.connections)
str_connections = grpc_message_to_json_string(connections)
str_extra_details = MSG.format(str_service_id, num_connections, str_connections)
raise NotImplementedException('service-with-multiple-connections', extra_details=str_extra_details)
old_connection = connections.connections[0]
if len(old_connection.sub_service_ids) > 0:
MSG = 'Service({:s})/Connection({:s}) has sub-services: {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_connection_id = grpc_message_to_json_string(old_connection.connection_id)
str_connection = grpc_message_to_json_string(old_connection)
str_extra_details = MSG.format(str_service_id, str_connection_id, str_connection)
raise NotImplementedException('service-connection-with-subservices', extra_details=str_extra_details)
# Find alternative connections
# pylint: disable=no-member
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(updated_service_with_uuids)
#pathcomp_request.k_disjoint_path.num_disjoint = 100
pathcomp_request.k_shortest_path.k_inspection = 100
pathcomp_request.k_shortest_path.k_return = 3
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
LOGGER.debug('pathcomp_request={:s}'.format(grpc_message_to_json_string(pathcomp_request)))
pathcomp = PathCompClient()
pathcomp_reply = pathcomp.Compute(pathcomp_request)
pathcomp.close()
LOGGER.debug('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
if len(pathcomp_reply.services) == 0:
MSG = 'KDisjointPath reported no services for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-services', extra_details=str_extra_details)
if len(pathcomp_reply.services) > 1:
MSG = 'KDisjointPath reported subservices for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-subservices', extra_details=str_extra_details)
if len(pathcomp_reply.connections) == 0:
MSG = 'KDisjointPath reported no connections for Service({:s}): {:s}'
str_service_id = grpc_message_to_json_string(updated_service_id_with_uuids)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_extra_details = MSG.format(str_service_id, str_pathcomp_reply)
raise NotImplementedException('kdisjointpath-no-connections', extra_details=str_extra_details)
# compute a string representing the old connection
str_old_connection = connection_to_string(old_connection)
LOGGER.debug('old_connection={:s}'.format(grpc_message_to_json_string(old_connection)))
for candidate_new_connection in pathcomp_reply.connections:
str_candidate_new_connection = connection_to_string(candidate_new_connection)
if str_candidate_new_connection == str_old_connection: continue
candidate_new_connections.append(candidate_new_connection)
if len(candidate_new_connections) == 0:
MSG = 'Unable to find a new suitable path: pathcomp_request={:s} pathcomp_reply={:s} old_connection={:s}'
str_pathcomp_request = grpc_message_to_json_string(pathcomp_request)
str_pathcomp_reply = grpc_message_to_json_string(pathcomp_reply)
str_old_connection = grpc_message_to_json_string(old_connection)
extra_details = MSG.format(str_pathcomp_request, str_pathcomp_reply, str_old_connection)
raise OperationFailedException('no-new-path-found', extra_details=extra_details)
str_candidate_new_connections = [
grpc_message_to_json_string(candidate_new_connection)
for candidate_new_connection in candidate_new_connections
]
LOGGER.debug('candidate_new_connections={:s}'.format(str(str_candidate_new_connections)))
new_connection = random.choice(candidate_new_connections)
LOGGER.debug('new_connection={:s}'.format(grpc_message_to_json_string(new_connection)))
# Change UUID of new connection to prevent collisions
tmp_connection = Connection()
tmp_connection.CopyFrom(new_connection)
tmp_connection.connection_id.connection_uuid.uuid = str(uuid.uuid4())
new_connection = tmp_connection
# Feed TaskScheduler with the service to update, the old connection to
# deconfigure and the new connection to configure. It will produce a
# schedule of tasks (an ordered list of tasks to be executed) to
# implement the requested changes.
tasks_scheduler = TasksScheduler(self.service_handler_factory)
tasks_scheduler.compose_service_connection_update(
updated_service_with_uuids, old_connection, new_connection)
tasks_scheduler.execute_all()