-
Lluis Gifre Renom authored
- reorganized code spliting database models, enumerations, and operation methods - separated unitary tests per entity and defined order between them - separated unitary test for fasthasher - modev old code to separate folder
Lluis Gifre Renom authored- reorganized code spliting database models, enumerations, and operation methods - separated unitary tests per entity and defined order between them - separated unitary test for fasthasher - modev old code to separate folder
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Device.py 14.54 KiB
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from sqlalchemy import delete
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional, Set, Tuple
from common.proto.context_pb2 import Device, DeviceId, DeviceIdList, DeviceList
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.service.database.models.ConfigRuleModel import ConfigRuleKindEnum, ConfigRuleModel
from context.service.database.models.DeviceModel import DeviceModel
from context.service.database.models.EndPointModel import EndPointModel
from context.service.database.models.RelationModels import TopologyDeviceModel
from context.service.database.models.enums.ConfigAction import grpc_to_enum__config_action
from context.service.database.models.enums.DeviceDriver import grpc_to_enum__device_driver
from context.service.database.models.enums.DeviceOperationalStatus import grpc_to_enum__device_operational_status
from context.service.database.models.enums.KpiSampleType import grpc_to_enum__kpi_sample_type
def device_list_ids(db_engine : Engine) -> DeviceIdList:
def callback(session : Session) -> List[Dict]:
obj_list : List[DeviceModel] = session.query(DeviceModel).all()
#.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
return [obj.dump_id() for obj in obj_list]
return DeviceIdList(device_ids=run_transaction(sessionmaker(bind=db_engine), callback))
def device_list_objs(db_engine : Engine) -> DeviceList:
def callback(session : Session) -> List[Dict]:
obj_list : List[DeviceModel] = session.query(DeviceModel).all()
#.options(selectinload(DeviceModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
return [obj.dump() for obj in obj_list]
return DeviceList(devices=run_transaction(sessionmaker(bind=db_engine), callback))
def device_get(db_engine : Engine, request : DeviceId) -> Device:
device_uuid = request.device_uuid.uuid
def callback(session : Session) -> Optional[Dict]:
obj : Optional[DeviceModel] = session.query(DeviceModel)\
.filter_by(device_uuid=device_uuid).one_or_none()
return None if obj is None else obj.dump()
obj = run_transaction(sessionmaker(bind=db_engine), callback)
if obj is None: raise NotFoundException('Device', device_uuid)
return Device(**obj)
def device_set(db_engine : Engine, request : Device) -> bool:
device_uuid = request.device_id.device_uuid.uuid
device_name = request.name
device_type = request.device_type
oper_status = grpc_to_enum__device_operational_status(request.device_operational_status)
device_drivers = [grpc_to_enum__device_driver(d) for d in request.device_drivers]
topology_keys : Set[Tuple[str, str]] = set()
related_topologies : List[Dict] = list()
endpoints_data : List[Dict] = list()
for i, endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
endpoint_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
kpi_sample_types = [grpc_to_enum__kpi_sample_type(kst) for kst in endpoint.kpi_sample_types]
endpoints_data.append({
'context_uuid' : endpoint_context_uuid,
'topology_uuid' : endpoint_topology_uuid,
'device_uuid' : endpoint_device_uuid,
'endpoint_uuid' : endpoint.endpoint_id.endpoint_uuid.uuid,
'endpoint_type' : endpoint.endpoint_type,
'kpi_sample_types': kpi_sample_types,
})
if len(endpoint_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
topology_key = (endpoint_context_uuid, endpoint_topology_uuid)
if topology_key not in topology_keys:
related_topologies.append({
'context_uuid': endpoint_context_uuid,
'topology_uuid': endpoint_topology_uuid,
'device_uuid': endpoint_device_uuid,
})
topology_keys.add(topology_key)
config_rules : List[Dict] = list()
for position,config_rule in enumerate(request.device_config.config_rules):
str_kind = config_rule.WhichOneof('config_rule')
config_rules.append({
'device_uuid': device_uuid,
'kind' : ConfigRuleKindEnum._member_map_.get(str_kind.upper()), # pylint: disable=no-member
'action' : grpc_to_enum__config_action(config_rule.action),
'position' : position,
'data' : grpc_message_to_json_string(getattr(config_rule, str_kind, {})),
})
def callback(session : Session) -> None:
obj : Optional[DeviceModel] = session.query(DeviceModel).with_for_update()\
.filter_by(device_uuid=device_uuid).one_or_none()
is_update = obj is not None
if is_update:
obj.device_name = device_name
obj.device_type = device_type
obj.device_operational_status = oper_status
obj.device_drivers = device_drivers
session.merge(obj)
else:
session.add(DeviceModel(
device_uuid=device_uuid, device_name=device_name, device_type=device_type,
device_operational_status=oper_status, device_drivers=device_drivers, created_at=time.time()))
obj : Optional[DeviceModel] = session.query(DeviceModel)\
.filter_by(device_uuid=device_uuid).one_or_none()
stmt = insert(EndPointModel).values(endpoints_data)
stmt = stmt.on_conflict_do_update(
index_elements=[
EndPointModel.context_uuid, EndPointModel.topology_uuid, EndPointModel.device_uuid,
EndPointModel.endpoint_uuid
],
set_=dict(
endpoint_type = stmt.excluded.endpoint_type,
kpi_sample_types = stmt.excluded.kpi_sample_types,
)
)
session.execute(stmt)
session.execute(insert(TopologyDeviceModel).values(related_topologies).on_conflict_do_nothing(
index_elements=[
TopologyDeviceModel.context_uuid, TopologyDeviceModel.topology_uuid,
TopologyDeviceModel.device_uuid
]
))
session.execute(delete(ConfigRuleModel).where(ConfigRuleModel.device_uuid == device_uuid))
session.execute(insert(ConfigRuleModel).values(config_rules))
run_transaction(sessionmaker(bind=db_engine), callback)
return False # TODO: improve and check if created/updated
def device_delete(db_engine : Engine, request : DeviceId) -> bool:
device_uuid = request.device_uuid.uuid
def callback(session : Session) -> bool:
session.query(TopologyDeviceModel).filter_by(device_uuid=device_uuid).delete()
num_deleted = session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete()
#db_device = session.query(DeviceModel).filter_by(device_uuid=device_uuid).one_or_none()
#session.query(ConfigRuleModel).filter_by(config_uuid=db_device.device_config_uuid).delete()
#session.query(ConfigModel).filter_by(config_uuid=db_device.device_config_uuid).delete()
#session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete()
return num_deleted > 0
return run_transaction(sessionmaker(bind=db_engine), callback)
#Union_SpecificConfigRule = Union[
# ConfigRuleCustomModel, ConfigRuleAclModel
#]
#
#def set_config_rule(
# database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str,
#): # -> Tuple[ConfigRuleModel, bool]:
#
# str_rule_key_hash = fast_hasher(resource_key)
# str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
#
# data = {'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key,
# 'value': resource_value}
# to_add = ConfigRuleModel(**data)
#
# result = database.create_or_update(to_add)
# return result
#Tuple_ConfigRuleSpecs = Tuple[Type, str, Dict, ConfigRuleKindEnum]
#
#def parse_config_rule_custom(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
# config_rule_class = ConfigRuleCustomModel
# str_config_rule_id = grpc_config_rule.custom.resource_key
# config_rule_data = {
# 'key' : grpc_config_rule.custom.resource_key,
# 'value': grpc_config_rule.custom.resource_value,
# }
# return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.CUSTOM
#
#def parse_config_rule_acl(database : Database, grpc_config_rule) -> Tuple_ConfigRuleSpecs:
# config_rule_class = ConfigRuleAclModel
# grpc_endpoint_id = grpc_config_rule.acl.endpoint_id
# grpc_rule_set = grpc_config_rule.acl.rule_set
# device_uuid = grpc_endpoint_id.device_id.device_uuid.uuid
# endpoint_uuid = grpc_endpoint_id.endpoint_uuid.uuid
# str_endpoint_key = '/'.join([device_uuid, endpoint_uuid])
# #str_endpoint_key, db_endpoint = get_endpoint(database, grpc_endpoint_id)
# str_config_rule_id = ':'.join([str_endpoint_key, grpc_rule_set.name])
# config_rule_data = {
# #'endpoint_fk': db_endpoint,
# 'endpoint_id': grpc_message_to_json_string(grpc_endpoint_id),
# 'acl_data': grpc_message_to_json_string(grpc_rule_set),
# }
# return config_rule_class, str_config_rule_id, config_rule_data, ConfigRuleKindEnum.ACL
#
#CONFIGRULE_PARSERS = {
# 'custom': parse_config_rule_custom,
# 'acl' : parse_config_rule_acl,
#}
#
#Union_ConfigRuleModel = Union[
# ConfigRuleCustomModel, ConfigRuleAclModel,
#]
#
#def set_config_rule(
# database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule, position : int
#) -> Tuple[Union_ConfigRuleModel, bool]:
# grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
# parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
# if parser is None:
# raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
# grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
#
# # create specific ConfigRule
# config_rule_class, str_config_rule_id, config_rule_data, config_rule_kind = parser(database, grpc_config_rule)
# str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
# str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
# result : Tuple[Union_ConfigRuleModel, bool] = update_or_create_object(
# database, config_rule_class, str_config_rule_key, config_rule_data)
# db_specific_config_rule, updated = result
#
# # create generic ConfigRule
# config_rule_fk_field_name = 'config_rule_{:s}_fk'.format(config_rule_kind.value)
# config_rule_data = {
# 'config_fk': db_config, 'kind': config_rule_kind, 'position': position,
# 'action': ORM_ConfigActionEnum.SET,
# config_rule_fk_field_name: db_specific_config_rule
# }
# result : Tuple[ConfigRuleModel, bool] = update_or_create_object(
# database, ConfigRuleModel, str_config_rule_key, config_rule_data)
# db_config_rule, updated = result
#
# return db_config_rule, updated
#
#def delete_config_rule(
# database : Database, db_config : ConfigModel, grpc_config_rule : ConfigRule
#) -> None:
# grpc_config_rule_kind = str(grpc_config_rule.WhichOneof('config_rule'))
# parser = CONFIGRULE_PARSERS.get(grpc_config_rule_kind)
# if parser is None:
# raise NotImplementedError('ConfigRule of kind {:s} is not implemented: {:s}'.format(
# grpc_config_rule_kind, grpc_message_to_json_string(grpc_config_rule)))
#
# # delete generic config rules; self deletes specific config rule
# _, str_config_rule_id, _, config_rule_kind = parser(database, grpc_config_rule)
# str_config_rule_key_hash = fast_hasher(':'.join([config_rule_kind.value, str_config_rule_id]))
# str_config_rule_key = key_to_str([db_config.pk, str_config_rule_key_hash], separator=':')
# db_config_rule : Optional[ConfigRuleModel] = get_object(
# database, ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
# if db_config_rule is None: return
# db_config_rule.delete()
#
#def update_config(
# database : Database, db_parent_pk : str, config_name : str, grpc_config_rules
#) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
#
# str_config_key = key_to_str([config_name, db_parent_pk], separator=':')
# result : Tuple[ConfigModel, bool] = get_or_create_object(database, ConfigModel, str_config_key)
# db_config, created = result
#
# db_objects = [(db_config, created)]
#
# for position,grpc_config_rule in enumerate(grpc_config_rules):
# action = grpc_to_enum__config_action(grpc_config_rule.action)
#
# if action == ORM_ConfigActionEnum.SET:
# result : Tuple[ConfigRuleModel, bool] = set_config_rule(
# database, db_config, grpc_config_rule, position)
# db_config_rule, updated = result
# db_objects.append((db_config_rule, updated))
# elif action == ORM_ConfigActionEnum.DELETE:
# delete_config_rule(database, db_config, grpc_config_rule)
# else:
# msg = 'Unsupported Action({:s}) for ConfigRule({:s})'
# str_action = str(ConfigActionEnum.Name(action))
# str_config_rule = grpc_message_to_json_string(grpc_config_rule)
# raise AttributeError(msg.format(str_action, str_config_rule))
#
# return db_objects