Newer
Older
import grpc, json, logging, operator
from typing import Iterator, List, Set, Tuple
from common.message_broker.MessageBroker import MessageBroker
from common.orm.Database import Database
from common.orm.HighLevel import (
get_all_objects, get_object, get_or_create_object, get_related_objects, update_or_create_object)
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId,
ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, EventTypeEnum, Link,
LinkEvent, LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Topology,
TopologyEvent, TopologyId, TopologyIdList, TopologyList)
from context.proto.context_pb2_grpc import ContextServiceServicer
from context.service.database.ConfigModel import ConfigModel, ConfigRuleModel, grpc_config_rules_to_raw, update_config
from context.service.database.ConnectionModel import ConnectionModel, PathHopModel, PathModel, set_path
from context.service.database.ConstraintModel import ConstraintModel, ConstraintsModel, set_constraints
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
DeviceModel, DriverModel, grpc_to_enum__device_operational_status, set_drivers)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
from context.service.database.Events import notify_event
from context.service.database.LinkModel import LinkModel
from context.service.database.RelationModels import (
ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, TopologyDeviceModel, TopologyLinkModel)
from context.service.database.ServiceModel import (
ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
from context.service.database.TopologyModel import TopologyModel
CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_TOPOLOGY)
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Context'
METHOD_NAMES = [
'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents',
'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents',
'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents',
'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents',
'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer):
def __init__(self, database : Database, messagebroker : MessageBroker):
LOGGER.debug('Creating Servicer...')
self.database = database
LOGGER.debug('Servicer Created')
# ----- Context ----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
return ContextIdList(context_ids=[db_context.dump_id() for db_context in db_contexts])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
db_contexts : List[ContextModel] = get_all_objects(self.database, ContextModel)
db_contexts = sorted(db_contexts, key=operator.attrgetter('pk'))
return ContextList(contexts=[db_context.dump() for db_context in db_contexts])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
context_uuid = request.context_uuid.uuid
db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
return Context(**db_context.dump(include_services=True, include_topologies=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
for i,topology_id in enumerate(request.topology_ids):
topology_context_uuid = topology_id.context_id.context_uuid.uuid
if topology_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
for i,service_id in enumerate(request.service_ids):
service_context_uuid = service_id.context_id.context_uuid.uuid
if service_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
result : Tuple[ContextModel, bool] = update_or_create_object(
self.database, ContextModel, context_uuid, {'context_uuid': context_uuid})
db_context, updated = result
for i,topology_id in enumerate(request.topology_ids):
topology_context_uuid = topology_id.context_id.context_uuid.uuid
topology_uuid = topology_id.topology_uuid.uuid
get_object(self.database, TopologyModel, [context_uuid, topology_uuid]) # just to confirm it exists
for i,service_id in enumerate(request.service_ids):
service_context_uuid = service_id.context_id.context_uuid.uuid
service_uuid = service_id.service_uuid.uuid
get_object(self.database, ServiceModel, [context_uuid, service_uuid]) # just to confirm it exists
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_context_id = db_context.dump_id()
notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
return ContextId(**dict_context_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
context_uuid = request.context_uuid.uuid
db_context = ContextModel(self.database, context_uuid, auto_load=False)
found = db_context.load()
if not found: return Empty()
dict_context_id = db_context.dump_id()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': dict_context_id})
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
yield ContextEvent(**json.loads(message.content))
# ----- Topology ---------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
context_uuid = request.context_uuid.uuid
db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
context_uuid = request.context_uuid.uuid
db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
db_topologies : Set[TopologyModel] = get_related_objects(db_context, TopologyModel)
db_topologies = sorted(db_topologies, key=operator.attrgetter('pk'))
return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Topology:
str_key = key_to_str([request.context_id.context_uuid.uuid, request.topology_uuid.uuid])
db_topology : TopologyModel = get_object(self.database, TopologyModel, str_key)
return Topology(**db_topology.dump(include_devices=True, include_links=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
context_uuid = request.topology_id.context_id.context_uuid.uuid
db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
topology_uuid = request.topology_id.topology_uuid.uuid
str_topology_key = key_to_str([context_uuid, topology_uuid])
result : Tuple[TopologyModel, bool] = update_or_create_object(
self.database, TopologyModel, str_topology_key, {'context_fk': db_context, 'topology_uuid': topology_uuid})
db_topology,updated = result
for device_id in request.device_ids:
device_uuid = device_id.device_uuid.uuid
db_device = get_object(self.database, DeviceModel, device_uuid)
str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
result : Tuple[TopologyDeviceModel, bool] = update_or_create_object(
self.database, TopologyDeviceModel, str_topology_device_key,
{'topology_fk': db_topology, 'device_fk': db_device})
#db_topology_device,topology_device_updated = result
link_uuid = link_id.link_uuid.uuid
db_link = get_object(self.database, LinkModel, link_uuid)
str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
result : Tuple[TopologyLinkModel, bool] = update_or_create_object(
self.database, TopologyLinkModel, str_topology_link_key,
{'topology_fk': db_topology, 'link_fk': db_link})
#db_topology_link,topology_link_updated = result
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_topology_id = db_topology.dump_id()
notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
return TopologyId(**dict_topology_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
context_uuid = request.context_id.context_uuid.uuid
topology_uuid = request.topology_uuid.uuid
db_topology = TopologyModel(self.database, key_to_str([context_uuid, topology_uuid]), auto_load=False)
found = db_topology.load()
if not found: return Empty()
dict_topology_id = db_topology.dump_id()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_TOPOLOGY, event_type, {'topology_id': dict_topology_id})
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
yield TopologyEvent(**json.loads(message.content))
# ----- Device -----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
return DeviceList(devices=[db_device.dump() for db_device in db_devices])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
device_uuid = request.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid)
return Device(**db_device.dump(
include_config_rules=True, include_drivers=True, include_endpoints=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
for i,endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
db_running_config = running_config_result[0][0]
result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
'device_uuid' : device_uuid,
'device_type' : request.device_type,
'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
'device_config_fk' : db_running_config,
})
db_device, updated = result
set_drivers(self.database, db_device, request.device_drivers)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_attributes = {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint.endpoint_type,
}
endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
self.database, TopologyDeviceModel, str_topology_device_key, {
'topology_fk': db_topology, 'device_fk': db_device})
#db_topology_device, topology_device_created = result
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
endpoint_attributes['topology_fk'] = db_topology
result : Tuple[EndPointModel, bool] = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
db_endpoint, endpoint_updated = result
set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types)
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_device_id = db_device.dump_id()
notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
return DeviceId(**dict_device_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
device_uuid = request.device_uuid.uuid
db_device = DeviceModel(self.database, device_uuid, auto_load=False)
found = db_device.load()
if not found: return Empty()
dict_device_id = db_device.dump_id()
for db_endpoint_pk,_ in db_device.references(EndPointModel):
db_endpoint = EndPointModel(self.database, db_endpoint_pk)
for db_kpi_sample_type_pk,_ in db_endpoint.references(KpiSampleTypeModel):
KpiSampleTypeModel(self.database, db_kpi_sample_type_pk).delete()
db_endpoint.delete()
for db_topology_device_pk,_ in db_device.references(TopologyDeviceModel):
TopologyDeviceModel(self.database, db_topology_device_pk).delete()
for db_driver_pk,_ in db_device.references(DriverModel):
DriverModel(self.database, db_driver_pk).delete()
db_config = ConfigModel(self.database, db_device.device_config_fk)
for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_device.delete()
db_config.delete()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetDeviceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[DeviceEvent]:
for message in self.messagebroker.consume({TOPIC_DEVICE}, consume_timeout=CONSUME_TIMEOUT):
yield DeviceEvent(**json.loads(message.content))
# ----- Link -------------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListLinkIds(self, request: Empty, context : grpc.ServicerContext) -> LinkIdList:
db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
db_links = sorted(db_links, key=operator.attrgetter('pk'))
return LinkIdList(link_ids=[db_link.dump_id() for db_link in db_links])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListLinks(self, request: Empty, context : grpc.ServicerContext) -> LinkList:
db_links : List[LinkModel] = get_all_objects(self.database, LinkModel)
db_links = sorted(db_links, key=operator.attrgetter('pk'))
return LinkList(links=[db_link.dump() for db_link in db_links])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetLink(self, request: LinkId, context : grpc.ServicerContext) -> Link:
link_uuid = request.link_uuid.uuid
db_link : LinkModel = get_object(self.database, LinkModel, link_uuid)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetLink(self, request: Link, context : grpc.ServicerContext) -> LinkId:
link_uuid = request.link_id.link_uuid.uuid
result : Tuple[LinkModel, bool] = update_or_create_object(
self.database, LinkModel, link_uuid, {'link_uuid': link_uuid})
db_link, updated = result
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
db_topology = None
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
str_topology_device_key = key_to_str([str_topology_key, endpoint_device_uuid], separator='--')
get_object(self.database, TopologyDeviceModel, str_topology_device_key) # check device is in topology
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
str_link_endpoint_key = key_to_str([link_uuid, endpoint_device_uuid], separator='--')
result : Tuple[LinkEndPointModel, bool] = get_or_create_object(
self.database, LinkEndPointModel, str_link_endpoint_key, {
'link_fk': db_link, 'endpoint_fk': db_endpoint})
#db_link_endpoint, link_endpoint_created = result
if db_topology is not None:
str_topology_link_key = key_to_str([str_topology_key, link_uuid], separator='--')
result : Tuple[TopologyLinkModel, bool] = get_or_create_object(
self.database, TopologyLinkModel, str_topology_link_key, {
'topology_fk': db_topology, 'link_fk': db_link})
#db_topology_link, topology_link_created = result
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_link_id = db_link.dump_id()
notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
return LinkId(**dict_link_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveLink(self, request: LinkId, context : grpc.ServicerContext) -> Empty:
link_uuid = request.link_uuid.uuid
db_link = LinkModel(self.database, link_uuid, auto_load=False)
found = db_link.load()
if not found: return Empty()
dict_link_id = db_link.dump_id()
for db_link_endpoint_pk,_ in db_link.references(LinkEndPointModel):
LinkEndPointModel(self.database, db_link_endpoint_pk).delete()
for db_topology_link_pk,_ in db_link.references(TopologyLinkModel):
TopologyLinkModel(self.database, db_topology_link_pk).delete()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_LINK, event_type, {'link_id': dict_link_id})
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetLinkEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[LinkEvent]:
for message in self.messagebroker.consume({TOPIC_LINK}, consume_timeout=CONSUME_TIMEOUT):
yield LinkEvent(**json.loads(message.content))
# ----- Service ----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListServiceIds(self, request: ContextId, context : grpc.ServicerContext) -> ServiceIdList:
db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
db_services = sorted(db_services, key=operator.attrgetter('pk'))
return ServiceIdList(service_ids=[db_service.dump_id() for db_service in db_services])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListServices(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
db_context : ContextModel = get_object(self.database, ContextModel, request.context_uuid.uuid)
db_services : Set[ServiceModel] = get_related_objects(db_context, ServiceModel)
db_services = sorted(db_services, key=operator.attrgetter('pk'))
return ServiceList(services=[db_service.dump() for db_service in db_services])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetService(self, request: ServiceId, context : grpc.ServicerContext) -> Service:
str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
return Service(**db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
context_uuid = request.service_id.context_id.context_uuid.uuid
db_context : ContextModel = get_object(self.database, ContextModel, context_uuid)
for i,endpoint_id in enumerate(request.service_endpoint_ids):
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and context_uuid != endpoint_topology_context_uuid:
raise InvalidArgumentException(
'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
endpoint_topology_context_uuid,
['should be == {:s}({:s})'.format('request.service_id.context_id.context_uuid.uuid', context_uuid)])
service_uuid = request.service_id.service_uuid.uuid
str_service_key = key_to_str([context_uuid, service_uuid])
constraints_result = set_constraints(
self.database, str_service_key, 'constraints', request.service_constraints)
db_constraints = constraints_result[0][0]
config_rules = grpc_config_rules_to_raw(request.service_config.config_rules)
running_config_result = update_config(self.database, str_service_key, 'running', config_rules)
result : Tuple[ServiceModel, bool] = update_or_create_object(self.database, ServiceModel, str_service_key, {
'context_fk' : db_context,
'service_uuid' : service_uuid,
'service_type' : grpc_to_enum__service_type(request.service_type),
'service_constraints_fk': db_constraints,
'service_status' : grpc_to_enum__service_status(request.service_status.service_status),
'service_config_fk' : db_running_config,
})
db_service, updated = result
for i,endpoint_id in enumerate(request.service_endpoint_ids):
endpoint_uuid = endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint_id.device_id.device_uuid.uuid
endpoint_topology_uuid = endpoint_id.topology_id.topology_uuid.uuid
endpoint_topology_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
str_endpoint_key = key_to_str([endpoint_device_uuid, endpoint_uuid])
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
db_endpoint : EndPointModel = get_object(self.database, EndPointModel, str_endpoint_key)
str_service_endpoint_key = key_to_str([service_uuid, endpoint_device_uuid], separator='--')
result : Tuple[ServiceEndPointModel, bool] = get_or_create_object(
self.database, ServiceEndPointModel, str_service_endpoint_key, {
'service_fk': db_service, 'endpoint_fk': db_endpoint})
#db_service_endpoint, service_endpoint_created = result
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_service_id = db_service.dump_id()
notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
return ServiceId(**dict_service_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
context_uuid = request.context_id.context_uuid.uuid
service_uuid = request.service_uuid.uuid
db_service = ServiceModel(self.database, key_to_str([context_uuid, service_uuid]), auto_load=False)
found = db_service.load()
if not found: return Empty()
dict_service_id = db_service.dump_id()
for db_service_endpoint_pk,_ in db_service.references(ServiceEndPointModel):
ServiceEndPointModel(self.database, db_service_endpoint_pk).delete()
db_config = ConfigModel(self.database, db_service.service_config_fk)
for db_config_rule_pk,_ in db_config.references(ConfigRuleModel):
ConfigRuleModel(self.database, db_config_rule_pk).delete()
db_constraints = ConstraintsModel(self.database, db_service.service_constraints_fk)
for db_constraint_pk,_ in db_constraints.references(ConstraintModel):
ConstraintModel(self.database, db_constraint_pk).delete()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_SERVICE, event_type, {'service_id': dict_service_id})
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetServiceEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ServiceEvent]:
for message in self.messagebroker.consume({TOPIC_SERVICE}, consume_timeout=CONSUME_TIMEOUT):
yield ServiceEvent(**json.loads(message.content))
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# ----- Connection -------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListConnectionIds(self, request: ServiceId, context : grpc.ServicerContext) -> ConnectionIdList:
str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
return ConnectionIdList(connection_ids=[db_connection.dump_id() for db_connection in db_connections])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListConnections(self, request: ContextId, context : grpc.ServicerContext) -> ServiceList:
str_key = key_to_str([request.context_id.context_uuid.uuid, request.service_uuid.uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_key)
db_connections : Set[ConnectionModel] = get_related_objects(db_service, ConnectionModel)
db_connections = sorted(db_connections, key=operator.attrgetter('pk'))
return ConnectionList(connections=[db_connection.dump() for db_connection in db_connections])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Connection:
db_connection : ConnectionModel = get_object(self.database, ConnectionModel, request.connection_uuid.uuid)
return Connection(**db_connection.dump(include_path=True, include_sub_service_ids=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetConnection(self, request: Connection, context : grpc.ServicerContext) -> ConnectionId:
connection_uuid = request.connection_id.connection_uuid.uuid
connection_attributes = {'connection_uuid': connection_uuid}
service_context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
if len(service_context_uuid) > 0 and len(service_uuid) > 0:
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_service_key)
connection_attributes['service_fk'] = db_service
path_hops_result = set_path(self.database, connection_uuid, request.path_hops_endpoint_ids, path_name = '')
db_path = path_hops_result[0]
connection_attributes['path_fk'] = db_path
result : Tuple[ConnectionModel, bool] = update_or_create_object(
self.database, ConnectionModel, connection_uuid, connection_attributes)
db_connection, updated = result
for sub_service_id in request.sub_service_ids:
sub_service_uuid = sub_service_id.service_uuid.uuid
sub_service_context_uuid = sub_service_id.context_id.context_uuid.uuid
str_sub_service_key = key_to_str([sub_service_context_uuid, sub_service_uuid])
db_service : ServiceModel = get_object(self.database, ServiceModel, str_sub_service_key)
str_connection_sub_service_key = key_to_str([connection_uuid, str_sub_service_key], separator='--')
result : Tuple[ConnectionSubServiceModel, bool] = get_or_create_object(
self.database, ConnectionSubServiceModel, str_connection_sub_service_key, {
'connection_fk': db_connection, 'sub_service_fk': db_service})
#db_connection_sub_service, connection_sub_service_created = result
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_connection_id = db_connection.dump_id()
notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
return ConnectionId(**dict_connection_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveConnection(self, request: ConnectionId, context : grpc.ServicerContext) -> Empty:
db_connection = ConnectionModel(self.database, request.connection_uuid.uuid, auto_load=False)
found = db_connection.load()
if not found: return Empty()
dict_connection_id = db_connection.dump_id()
db_path = PathModel(self.database, db_connection.path_fk)
for db_path_hop_pk,_ in db_path.references(PathHopModel):
PathHopModel(self.database, db_path_hop_pk).delete()
# Do not remove sub-services automatically. They are supported by real services, so Service component should
# deal with the correct removal workflow to deconfigure the devices.
for db_connection_sub_service_pk,_ in db_connection.references(ConnectionSubServiceModel):
db_connection_sub_service : ConnectionSubServiceModel = get_object(
self.database, ConnectionSubServiceModel, db_connection_sub_service_pk)
db_connection_sub_service.delete()
db_connection.delete()
db_path.delete()
event_type = EventTypeEnum.EVENTTYPE_REMOVE
notify_event(self.messagebroker, TOPIC_CONNECTION, event_type, {'connection_id': dict_connection_id})
return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetConnectionEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ConnectionEvent]:
for message in self.messagebroker.consume({TOPIC_CONNECTION}, consume_timeout=CONSUME_TIMEOUT):
yield ConnectionEvent(**json.loads(message.content))