Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List

Lluis Gifre Renom
committed
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import ConnectionId, Empty, Service, ServiceId
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Service import json_service_id

Lluis Gifre Renom
committed
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.DependencyResolver import ObjectType, resolve_dependencies
from service.service.database.DeviceModel import DeviceModel

Lluis Gifre Renom
committed
from .database.DatabaseServiceTools import (
sync_service_from_context, sync_service_to_context, update_service_in_local_database)

Lluis Gifre Renom
committed
from .database.ServiceModel import ServiceModel
from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity

Lluis Gifre Renom
committed
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .Tools import delete_service, get_connection, get_service, sync_devices_from_context, update_service
METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService']
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None:

Lluis Gifre Renom
committed
self.database = database
self.service_handler_factory = service_handler_factory
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom
committed
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))

Lluis Gifre Renom
committed
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))

Lluis Gifre Renom
committed
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)

Lluis Gifre Renom
committed
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
# check that service does not exist
context_client = ContextClient()
current_service = get_service(context_client, request.service_id)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))

Lluis Gifre Renom
committed
# just create the service in the database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom
committed
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(request)
pathcomp_request.services.k_shortest_path.k_inspection = 5
pathcomp_request.services.k_shortest_path.k_return = 5
pathcomp = PathCompClient()
pathcomp_response = pathcomp.Compute(pathcomp_request)
# convert from a unordered lists of services and connections to a list of ordered items
# that fulfill interdependencies among them. E.g., a service cannot be created if connections
# supporting that service still does not exist.
resolution = resolve_dependencies(pathcomp_response)
# implement changes
context_client = ContextClient()
device_client = DeviceClient()
for (obj_type, obj_key), (grpc_objid, grpc_obj) in resolution:
if grpc_obj is None:
# check if the resource already exists
if obj_type == ObjectType.CONNECTION.value:
connection = get_connection(context_client, grpc_objid)
if connection is None: raise NotFoundException('Connection', obj_key)
elif obj_type == ObjectType.SERVICE.value:
service = get_service(context_client, grpc_objid)
if service is None: raise NotFoundException('Service', obj_key)
else:
MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
raise NotImplementedException('Empty Dependency', extra_details=msg_extra_details)
else:
# create/update the resource
if obj_type == ObjectType.CONNECTION.value:
update_connection(context_client, device_client, self.service_handler_factory, grpc_obj)
context_client.SetConnection(grpc_obj)
elif obj_type == ObjectType.SERVICE.value:
update_service(context_client, device_client, self.service_handler_factory, grpc_obj)
context_client.SetService(grpc_obj)
else:
MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
raise NotImplementedException('Specified Dependency', extra_details=msg_extra_details)
return request.service_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))
pce = PathComputationElement()
pce.load_topology(self.context_client)
pce.load_connectivity(self.context_client, request)
#pce.dump_topology_to_file('../data/topo.dot')
#pce.dump_connectivity_to_file('../data/conn-before.txt')
connectivity = pce.get_connectivity_from_service_id(request)
if connectivity is None: return Empty()
#pce.dump_connectivity_to_file('../data/conn-after.txt')
LOGGER.info('[DeleteService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))
for connection in connectivity.get('connections'):
delete_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
request, connection)
for sub_service, sub_connections in connectivity.get('requirements', []):
for sub_connection in sub_connections:
delete_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
sub_service.service_id, sub_connection)

Lluis Gifre Renom
committed
return Empty()