Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List

Lluis Gifre Renom
committed
import grpc, json, logging
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import Empty, Service, ServiceId
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string

Lluis Gifre Renom
committed
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.service.database.DeviceModel import DeviceModel

Lluis Gifre Renom
committed
from .database.DatabaseServiceTools import (
sync_service_from_context, sync_service_to_context, update_service_in_local_database)

Lluis Gifre Renom
committed
from .database.ServiceModel import ServiceModel
from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity

Lluis Gifre Renom
committed
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .Tools import delete_service, sync_devices_from_context, update_service
METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService']
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None:
self.context_client = ContextClient()
self.device_client = DeviceClient()

Lluis Gifre Renom
committed
self.database = database
self.service_handler_factory = service_handler_factory
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom
committed
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(grpc_message_to_json(service_endpoint_id))

Lluis Gifre Renom
committed
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(grpc_message_to_json(service_constraint))

Lluis Gifre Renom
committed
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = grpc_message_to_json(request.service_config)

Lluis Gifre Renom
committed
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service,_ = update_service_in_local_database(self.database, request)
LOGGER.info('[CreateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
sync_service_to_context(db_service, self.context_client)
return ServiceId(**db_service.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.info('[UpdateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))

Lluis Gifre Renom
committed
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
pce = PathComputationElement()
pce.load_topology(self.context_client)
pce.load_connectivity(self.context_client, service_id)
#pce.dump_topology_to_file('../data/topo.dot')
#pce.dump_connectivity_to_file('../data/conn-before.txt')
connectivity = pce.route_service(request)
#pce.dump_connectivity_to_file('../data/conn-after.txt')
LOGGER.info('[UpdateService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))
if connectivity is None:
# just update local database and context
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
LOGGER.info('[UpdateService] before db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
db_devices : Dict[str, DeviceModel] = sync_devices_from_context(
self.context_client, self.database, db_service, request.service_endpoint_ids)
LOGGER.info('[UpdateService] db_devices[{:d}] = {:s}'.format(
len(db_devices), str({
device_uuid:db_device.dump(include_config_rules=True, include_drivers=True, include_endpoints=True)
for device_uuid,db_device in db_devices.items()
})))
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service,_ = update_service_in_local_database(self.database, request)
LOGGER.info('[UpdateService] after db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
sync_service_to_context(db_service, self.context_client)
else:
for sub_service, sub_connections in connectivity.get('requirements', []):
for sub_connection in sub_connections:
update_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
sub_service, sub_connection)
for connection in connectivity.get('connections'):
db_service = update_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
request, connection)
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
if db_service is None: raise NotFoundException('Service', str_service_key)

Lluis Gifre Renom
committed
LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
return ServiceId(**db_service.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.info('[DeleteService] begin ; request = {:s}'.format(grpc_message_to_json_string(request)))
pce = PathComputationElement()
pce.load_topology(self.context_client)
pce.load_connectivity(self.context_client, request)
#pce.dump_topology_to_file('../data/topo.dot')
#pce.dump_connectivity_to_file('../data/conn-before.txt')
connectivity = pce.get_connectivity_from_service_id(request)
if connectivity is None: return Empty()
#pce.dump_connectivity_to_file('../data/conn-after.txt')
LOGGER.info('[DeleteService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))
for connection in connectivity.get('connections'):
delete_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
request, connection)
for sub_service, sub_connections in connectivity.get('requirements', []):
for sub_connection in sub_connections:
delete_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
sub_service.service_id, sub_connection)

Lluis Gifre Renom
committed
return Empty()