from typing import Dict import grpc, logging from prometheus_client import Counter, Histogram from common.database.api.Database import Database from common.exceptions.ServiceException import ServiceException from service.proto.context_pb2 import Empty from service.proto.service_pb2 import ConnectionList, Service, ServiceId, ServiceList from service.proto.service_pb2_grpc import ServiceServiceServicer from service.service.Tools import check_service_id_request, check_service_request LOGGER = logging.getLogger(__name__) GETSERVICELIST_COUNTER_STARTED = Counter ('service_getservicelist_counter_started', 'Service:GetServiceList counter of requests started' ) GETSERVICELIST_COUNTER_COMPLETED = Counter ('service_getservicelist_counter_completed', 'Service:GetServiceList counter of requests completed') GETSERVICELIST_COUNTER_FAILED = Counter ('service_getservicelist_counter_failed', 'Service:GetServiceList counter of requests failed' ) GETSERVICELIST_HISTOGRAM_DURATION = Histogram('service_getservicelist_histogram_duration', 'Service:GetServiceList histogram of request duration') CREATESERVICE_COUNTER_STARTED = Counter ('service_createservice_counter_started', 'Service:CreateService counter of requests started' ) CREATESERVICE_COUNTER_COMPLETED = Counter ('service_createservice_counter_completed', 'Service:CreateService counter of requests completed') CREATESERVICE_COUNTER_FAILED = Counter ('service_createservice_counter_failed', 'Service:CreateService counter of requests failed' ) CREATESERVICE_HISTOGRAM_DURATION = Histogram('service_createservice_histogram_duration', 'Service:CreateService histogram of request duration') UPDATESERVICE_COUNTER_STARTED = Counter ('service_updateservice_counter_started', 'Service:UpdateService counter of requests started' ) UPDATESERVICE_COUNTER_COMPLETED = Counter ('service_updateservice_counter_completed', 'Service:UpdateService counter of requests completed') UPDATESERVICE_COUNTER_FAILED = Counter ('service_updateservice_counter_failed', 'Service:UpdateService counter of requests failed' ) UPDATESERVICE_HISTOGRAM_DURATION = Histogram('service_updateservice_histogram_duration', 'Service:UpdateService histogram of request duration') DELETESERVICE_COUNTER_STARTED = Counter ('service_deleteservice_counter_started', 'Service:DeleteService counter of requests started' ) DELETESERVICE_COUNTER_COMPLETED = Counter ('service_deleteservice_counter_completed', 'Service:DeleteService counter of requests completed') DELETESERVICE_COUNTER_FAILED = Counter ('service_deleteservice_counter_failed', 'Service:DeleteService counter of requests failed' ) DELETESERVICE_HISTOGRAM_DURATION = Histogram('service_deleteservice_histogram_duration', 'Service:DeleteService histogram of request duration') GETSERVICEBYID_COUNTER_STARTED = Counter ('service_getservicebyid_counter_started', 'Service:GetServiceById counter of requests started' ) GETSERVICEBYID_COUNTER_COMPLETED = Counter ('service_getservicebyid_counter_completed', 'Service:GetServiceById counter of requests completed') GETSERVICEBYID_COUNTER_FAILED = Counter ('service_getservicebyid_counter_failed', 'Service:GetServiceById counter of requests failed' ) GETSERVICEBYID_HISTOGRAM_DURATION = Histogram('service_getservicebyid_histogram_duration', 'Service:GetServiceById histogram of request duration') GETCONNECTIONLIST_COUNTER_STARTED = Counter ('service_getconnectionlist_counter_started', 'Service:GetConnectionList counter of requests started' ) GETCONNECTIONLIST_COUNTER_COMPLETED = Counter ('service_getconnectionlist_counter_completed', 'Service:GetConnectionList counter of requests completed') GETCONNECTIONLIST_COUNTER_FAILED = Counter ('service_getconnectionlist_counter_failed', 'Service:GetConnectionList counter of requests failed' ) GETCONNECTIONLIST_HISTOGRAM_DURATION = Histogram('service_getconnectionlist_histogram_duration', 'Service:GetConnectionList histogram of request duration') class ServiceServiceServicerImpl(ServiceServiceServicer): def __init__(self, database : Database): LOGGER.debug('Creating Servicer...') self.database = database LOGGER.debug('Servicer Created') @GETSERVICELIST_HISTOGRAM_DURATION.time() def GetServiceList(self, request : Empty, grpc_context : grpc.ServicerContext) -> ServiceList: GETSERVICELIST_COUNTER_STARTED.inc() try: LOGGER.debug('GetServiceList request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- # ----- Retrieve data from the database -------------------------------------------------------------------- db_context_uuids = self.database.contexts.get() json_services = [] for db_context_uuid in db_context_uuids: db_context = self.database.context(db_context_uuid) json_services.extend(db_context.dump_services()) # ----- Compose reply -------------------------------------------------------------------------------------- reply = ServiceList(cs=json_services) LOGGER.debug('GetServiceList reply: {}'.format(str(reply))) GETSERVICELIST_COUNTER_COMPLETED.inc() return reply except ServiceException as e: # pragma: no cover (ServiceException not thrown) LOGGER.exception('GetServiceList exception') GETSERVICELIST_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetServiceList exception') GETSERVICELIST_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) @CREATESERVICE_HISTOGRAM_DURATION.time() def CreateService(self, request : Service, grpc_context : grpc.ServicerContext) -> ServiceId: CREATESERVICE_COUNTER_STARTED.inc() try: LOGGER.debug('CreateService request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- context_id, service_id, service_type, service_config, service_state, db_endpoints, constraint_tuples = \ check_service_request('CreateService', request, self.database, LOGGER) # ----- Implement changes in the database ------------------------------------------------------------------ db_context = self.database.context(context_id) db_service = db_context.service(service_id) db_service.create(service_type, service_config, service_state) for db_endpoint in db_endpoints: service_endpoint_id = '{}:{}/{}'.format( db_endpoint.topology_uuid, db_endpoint.device_uuid, db_endpoint.endpoint_uuid) db_service.endpoint(service_endpoint_id).create(db_endpoint) for cons_type,cons_value in constraint_tuples: db_service.constraint(cons_type).create(cons_value) # ----- Compose reply -------------------------------------------------------------------------------------- reply = ServiceId(**db_service.dump_id()) LOGGER.debug('CreateService reply: {}'.format(str(reply))) CREATESERVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: LOGGER.exception('CreateService exception') CREATESERVICE_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('CreateService exception') CREATESERVICE_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) @UPDATESERVICE_HISTOGRAM_DURATION.time() def UpdateService(self, request : Service, grpc_context : grpc.ServicerContext) -> ServiceId: UPDATESERVICE_COUNTER_STARTED.inc() try: LOGGER.debug('UpdateService request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- context_id, service_id, service_type, service_config, service_state, db_endpoints, constraint_tuples = \ check_service_request('UpdateService', request, self.database, LOGGER) # ----- Implement changes in the database ------------------------------------------------------------------ db_context = self.database.context(context_id) db_service = db_context.service(service_id) # Update service attributes db_service.update(update_attributes={ 'service_type' : service_type, 'service_config': service_config, 'service_state' : service_state, }) # Update service constraints; first add missing, then remove existing, but not added to Service db_service_constraint_types = set(db_service.constraints.get()) for constraint_type,constraint_value in constraint_tuples: if constraint_type in db_service_constraint_types: db_service.constraint(constraint_type).update(update_attributes={ 'constraint_value': constraint_value }) else: db_service.constraint(constraint_type).create(constraint_value) db_service_constraint_types.discard(constraint_type) for constraint_type in db_service_constraint_types: db_service.constraint(constraint_type).delete() # Update service endpoints; first add missing, then remove existing, but not added to Service db_service_endpoint_uuids = set(db_service.endpoints.get()) for db_endpoint in db_endpoints: service_endpoint_id = '{}:{}/{}'.format( db_endpoint.topology_uuid, db_endpoint.device_uuid, db_endpoint.endpoint_uuid) if service_endpoint_id not in db_service_endpoint_uuids: db_service.endpoint(service_endpoint_id).create(db_endpoint) db_service_endpoint_uuids.discard(service_endpoint_id) for db_service_endpoint_uuid in db_service_endpoint_uuids: db_service.endpoint(db_service_endpoint_uuid).delete() # ----- Compose reply -------------------------------------------------------------------------------------- reply = ServiceId(**db_service.dump_id()) LOGGER.debug('UpdateService reply: {}'.format(str(reply))) UPDATESERVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: LOGGER.exception('UpdateService exception') UPDATESERVICE_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('UpdateService exception') UPDATESERVICE_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) @DELETESERVICE_HISTOGRAM_DURATION.time() def DeleteService(self, request : ServiceId, grpc_context : grpc.ServicerContext) -> Empty: DELETESERVICE_COUNTER_STARTED.inc() try: LOGGER.debug('DeleteService request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- context_id, service_id = check_service_id_request('DeleteService', request, self.database, LOGGER) # ----- Implement changes in the database ------------------------------------------------------------------ db_context = self.database.context(context_id) db_service = db_context.service(service_id) db_service.delete() # ----- Compose reply -------------------------------------------------------------------------------------- reply = Empty() LOGGER.debug('DeleteService reply: {}'.format(str(reply))) DELETESERVICE_COUNTER_COMPLETED.inc() return reply except ServiceException as e: LOGGER.exception('DeleteService exception') DELETESERVICE_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('DeleteService exception') DELETESERVICE_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) @GETSERVICEBYID_HISTOGRAM_DURATION.time() def GetServiceById(self, request : ServiceId, grpc_context : grpc.ServicerContext) -> Service: GETSERVICEBYID_COUNTER_STARTED.inc() try: LOGGER.debug('GetServiceById request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- context_id, service_id = check_service_id_request('GetServiceById', request, self.database, LOGGER) # ----- Retrieve data from the database -------------------------------------------------------------------- db_context = self.database.context(context_id) db_service = db_context.service(service_id) # ----- Compose reply -------------------------------------------------------------------------------------- reply = Service(**db_service.dump()) LOGGER.debug('GetServiceById reply: {}'.format(str(reply))) GETSERVICEBYID_COUNTER_COMPLETED.inc() return reply except ServiceException as e: LOGGER.exception('GetServiceById exception') GETSERVICEBYID_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetServiceById exception') GETSERVICEBYID_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e)) @GETCONNECTIONLIST_HISTOGRAM_DURATION.time() def GetConnectionList(self, request : Empty, grpc_context : grpc.ServicerContext) -> ConnectionList: GETCONNECTIONLIST_COUNTER_STARTED.inc() try: LOGGER.debug('GetConnectionList request: {}'.format(str(request))) # ----- Validate request data and pre-conditions ----------------------------------------------------------- # ----- Retrieve data from the database -------------------------------------------------------------------- raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, 'RPC GetConnectionList() not implemented') # ----- Compose reply -------------------------------------------------------------------------------------- #reply = ConnectionList() #LOGGER.debug('GetConnectionList reply: {}'.format(str(reply))) #GETCONNECTIONLIST_COUNTER_COMPLETED.inc() #return reply except ServiceException as e: LOGGER.exception('GetConnectionList exception') GETCONNECTIONLIST_COUNTER_FAILED.inc() grpc_context.abort(e.code, e.details) except Exception as e: # pragma: no cover LOGGER.exception('GetConnectionList exception') GETCONNECTIONLIST_COUNTER_FAILED.inc() grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))