Newer
Older

Lluis Gifre Renom
committed
import grpc, json, logging
from typing import Any, List, Optional, Tuple
from google.protobuf.json_format import MessageToDict
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method

Lluis Gifre Renom
committed
from common.rpc_method_wrapper.ServiceExceptions import (
InvalidArgumentException, NotFoundException, OperationFailedException)
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.proto.context_pb2 import ConnectionList, Empty, Service, ServiceId
from service.proto.service_pb2_grpc import ServiceServiceServicer

Lluis Gifre Renom
committed
from .database.ConfigModel import ConfigModel, ConfigRuleModel
from .database.ConstraintModel import ConstraintModel, ConstraintsModel
from .database.DatabaseServiceTools import (
delete_service_from_context, sync_service_from_context, sync_service_to_context, update_service_in_local_database)
from .database.RelationModels import ServiceEndPointModel
from .database.ServiceModel import ServiceModel
from .service_handler_api._ServiceHandler import _ServiceHandler
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .service_handler_api.Tools import (
check_errors_deleteconfig, check_errors_deleteconstraint, check_errors_deleteendpoint, check_errors_setconfig,
check_errors_setconstraint, check_errors_setendpoint)
from .Tools import (
classify_config_rules, classify_constraints, classify_endpointids, get_service_handler_class,
sync_devices_from_context)
SERVICE_NAME = 'Service'
METHOD_NAMES = ['CreateService', 'UpdateService', 'DeleteService', 'GetConnectionList']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ServiceServiceServicerImpl(ServiceServiceServicer):

Lluis Gifre Renom
committed
def __init__(
self, context_client : ContextClient, device_client : DeviceClient, database : Database,
service_handler_factory : ServiceHandlerFactory):

Lluis Gifre Renom
committed
self.context_client = context_client
self.device_client = device_client
self.database = database
self.service_handler_factory = service_handler_factory
@safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:

Lluis Gifre Renom
committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
if len(request.service_endpoint_ids) > 0:
unexpected_endpoints = []
for service_endpoint_id in request.service_endpoint_ids:
unexpected_endpoints.append(MessageToDict(
service_endpoint_id, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True))
str_unexpected_endpoints = json.dumps(unexpected_endpoints, sort_keys=True)
raise InvalidArgumentException(
'service.service_endpoint_ids', str_unexpected_endpoints,
extra_details='RPC method CreateService does not accept Endpoints. '\
'Endpoints should be configured after creating the service.')
if len(request.service_constraints) > 0:
unexpected_constraints = []
for service_constraint in request.service_constraints:
unexpected_constraints.append(MessageToDict(
service_constraint, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=True))
str_unexpected_constraints = json.dumps(unexpected_constraints, sort_keys=True)
raise InvalidArgumentException(
'service.service_constraints', str_unexpected_constraints,
extra_details='RPC method CreateService does not accept Constraints. '\
'Constraints should be configured after creating the service.')
if len(request.service_config.config_rules) > 0:
unexpected_config_rules = MessageToDict(
request.service_config, including_default_value_fields=True,
preserving_proto_field_name=True, use_integers_for_enums=True)
unexpected_config_rules = unexpected_config_rules['config_rules']
str_unexpected_config_rules = json.dumps(unexpected_config_rules, sort_keys=True)
raise InvalidArgumentException(
'service.service_config.config_rules', str_unexpected_config_rules,
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service,_ = update_service_in_local_database(self.database, request)
LOGGER.info('[CreateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
sync_service_to_context(db_service, self.context_client)
return ServiceId(**db_service.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:

Lluis Gifre Renom
committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
service_id = request.service_id
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
str_service_key = key_to_str([service_context_uuid, service_uuid])
# Sync before updating service to ensure we have devices, endpoints, constraints, and config rules to be
# set/deleted before actuallymodifying them in the local in-memory database.
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
if db_service is None: raise NotFoundException('Service', str_service_key)
LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
db_devices = sync_devices_from_context(self.context_client, db_service, request.service_endpoint_ids)
resources_to_set : List[Tuple[str, Any]] = [] # resource_key, resource_value
resources_to_delete : List[Tuple[str, Any]] = [] # resource_key, resource_value
classify_config_rules(db_service, request.service_config.config_rules, resources_to_set, resources_to_delete)
constraints_to_set : List[Tuple[str, str]] = [] # constraint_type, constraint_value
constraints_to_delete : List[Tuple[str, str]] = [] # constraint_type, constraint_value
classify_constraints(db_service, request.service_constraints, constraints_to_set, constraints_to_delete)
endpointids_to_set : List[Tuple[str, str, Optional[str]]] = [] # device_uuid, endpoint_uuid, topology_uuid
endpointids_to_delete : List[Tuple[str, str, Optional[str]]] = [] # device_uuid, endpoint_uuid, topology_uuid
classify_endpointids(db_service, request.service_endpoint_ids, endpointids_to_set, endpointids_to_delete)
service_handler_class = get_service_handler_class(self.service_handler_factory, db_service, db_devices)
service_handler_settings = {}
service_handler : _ServiceHandler = service_handler_class(
db_service, self.database, self.context_client, self.device_client, **service_handler_settings)
errors = []
if len(errors) == 0:
results_deleteendpoint = service_handler.DeleteEndpoint(endpointids_to_delete)
errors.extend(check_errors_deleteendpoint(endpointids_to_delete, results_deleteendpoint))
if len(errors) == 0:
results_deleteconstraint = service_handler.DeleteConstraint(constraints_to_delete)
errors.extend(check_errors_deleteconstraint(constraints_to_delete, results_deleteconstraint))
if len(errors) == 0:
results_deleteconfig = service_handler.DeleteConfig(resources_to_delete)
errors.extend(check_errors_deleteconfig(resources_to_delete, results_deleteconfig))
if len(errors) == 0:
results_setconfig = service_handler.SetConfig(resources_to_set)
errors.extend(check_errors_setconfig(resources_to_set, results_setconfig))
if len(errors) == 0:
results_setconstraint = service_handler.SetConstraint(constraints_to_set)
errors.extend(check_errors_setconstraint(constraints_to_set, results_setconstraint))
if len(errors) == 0:
results_setendpoint = service_handler.SetEndpoint(endpointids_to_set)
errors.extend(check_errors_setendpoint(endpointids_to_set, results_setendpoint))
if len(errors) > 0:
raise OperationFailedException('UpdateService', extra_details=errors)
db_service,_ = update_service_in_local_database(self.database, request)
LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
#db_entries = self.database.dump()
#LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
#for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
#LOGGER.info('-----------------------------------------------------------')
sync_service_to_context(db_service, self.context_client)
return ServiceId(**db_service.dump_id())
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:

Lluis Gifre Renom
committed
service_uuid = request.service_uuid.uuid
service_context_uuid = request.context_id.context_uuid.uuid
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
if db_service is None: return Empty()
delete_service_from_context(db_service, self.context_client)

Lluis Gifre Renom
committed
for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel):
ServiceEndPointModel(self.database, db_service_endpoint_pk).delete()
db_running_config = ConfigModel(self.database, db_service.service_config_fk)
for db_config_rule_pk,_ in db_running_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_running_constraints = ConstraintsModel(self.database, db_service.service_constraints_fk)
for db_constraint_pk,_ in db_running_constraints.references(ConstraintModel):
ConstraintModel(self.database, db_constraint_pk).delete()
db_service.delete()
db_running_config.delete()
db_running_constraints.delete()
return Empty()

Lluis Gifre Renom
committed
def GetConnectionList(self, request : ServiceId, context : grpc.ServicerContext) -> ConnectionList:
#raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, 'RPC GetConnectionList() not implemented')
return ConnectionList()