Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (2)
from typing import Tuple, List
from sqlalchemy import MetaData
from sqlalchemy.orm import Session
from context.service.database.Base import Base
import logging
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__)
......@@ -10,7 +16,7 @@ class Database(Session):
super().__init__()
self.session = session
def query_all(self, model):
def get_all(self, model):
result = []
with self.session() as session:
for entry in session.query(model).all():
......@@ -18,11 +24,88 @@ class Database(Session):
return result
def get_object(self):
pass
def create_or_update(self, model):
with self.session() as session:
att = getattr(model, model.main_pk_name())
filt = {model.main_pk_name(): att}
found = session.query(type(model)).filter_by(**filt).one_or_none()
if found:
found = True
else:
found = False
session.merge(model)
session.commit()
return model, found
def create(self, model):
with self.session() as session:
session.add(model)
session.commit()
return model
def remove(self, model, filter_d):
model_t = type(model)
with self.session() as session:
session.query(model_t).filter_by(**filter_d).delete()
session.commit()
def clear(self):
with self.session() as session:
engine = session.get_bind()
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def dump_by_table(self):
with self.session() as session:
engine = session.get_bind()
meta = MetaData()
meta.reflect(engine)
result = {}
for table in meta.sorted_tables:
result[table.name] = [dict(row) for row in engine.execute(table.select())]
LOGGER.info(result)
return result
def dump_all(self):
with self.session() as session:
engine = session.get_bind()
meta = MetaData()
meta.reflect(engine)
result = []
for table in meta.sorted_tables:
for row in engine.execute(table.select()):
result.append((table.name, dict(row)))
LOGGER.info(result)
return result
def get_object(self, model_class: Base, main_key: str, raise_if_not_found=False):
filt = {model_class.main_pk_name(): main_key}
with self.session() as session:
get = session.query(model_class).filter_by(**filt).one_or_none()
if not get:
if raise_if_not_found:
raise NotFoundException(model_class.__name__.replace('Model', ''), main_key)
return get
def get_or_create(self, model_class: Base, key_parts: List[str]
) -> Tuple[Base, bool]:
str_key = key_to_str(key_parts)
filt = {model_class.main_pk_name(): key_parts}
with self.session() as session:
get = session.query(model_class).filter_by(**filt).one_or_none()
if get:
return get, False
else:
obj = model_class()
setattr(obj, model_class.main_pk_name(), str_key)
LOGGER.info(obj.dump())
session.add(obj)
session.commit()
return obj, True
......@@ -52,7 +52,7 @@ def main():
start_http_server(metrics_port)
# Get database instance
db_uri = 'cockroachdb://root@10.152.183.66:26257/defaultdb?sslmode=disable'
db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable'
LOGGER.debug('Connecting to DB: {}'.format(db_uri))
# engine = create_engine(db_uri, echo=False)
......@@ -65,7 +65,7 @@ def main():
return 1
Base.metadata.create_all(engine)
session = sessionmaker(bind=engine)
session = sessionmaker(bind=engine, expire_on_commit=False)
# Get message broker instance
messagebroker = MessageBroker(get_messagebroker_backend())
......
......@@ -11,26 +11,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import functools, logging, operator
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from common.orm.Database import Database
from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
from context.service.Database import Database
from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
LOGGER = logging.getLogger(__name__)
class ORM_ConfigActionEnum(Enum):
class ORM_ConfigActionEnum(enum.Enum):
UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
SET = ConfigActionEnum.CONFIGACTION_SET
DELETE = ConfigActionEnum.CONFIGACTION_DELETE
......@@ -38,27 +35,47 @@ class ORM_ConfigActionEnum(Enum):
grpc_to_enum__config_action = functools.partial(
grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
class ConfigModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
class ConfigModel(Base): # pylint: disable=abstract-method
__tablename__ = 'Config'
config_uuid = Column(UUID(as_uuid=False), primary_key=True)
# Relationships
config_rule = relationship("ConfigRuleModel", back_populates="config", lazy="dynamic")
def delete(self) -> None:
db_config_rule_pks = self.references(ConfigRuleModel)
for pk,_ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()
super().delete()
def dump(self) -> List[Dict]:
db_config_rule_pks = self.references(ConfigRuleModel)
config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks]
config_rules = sorted(config_rules, key=operator.itemgetter('position'))
def dump(self): # -> List[Dict]:
config_rules = []
for a in self.config_rule:
asdf = a.dump()
config_rules.append(asdf)
return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
class ConfigRuleModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
config_fk = ForeignKeyField(ConfigModel)
position = IntegerField(min_value=0, required=True)
action = EnumeratedField(ORM_ConfigActionEnum, required=True)
key = StringField(required=True, allow_empty=False)
value = StringField(required=True, allow_empty=False)
@staticmethod
def main_pk_name():
return 'config_uuid'
class ConfigRuleModel(Base): # pylint: disable=abstract-method
__tablename__ = 'ConfigRule'
config_rule_uuid = Column(UUID(as_uuid=False), primary_key=True)
config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid"), primary_key=True)
action = Column(Enum(ORM_ConfigActionEnum, create_constraint=True, native_enum=True), nullable=False)
position = Column(INTEGER, nullable=False)
key = Column(String, nullable=False)
value = Column(String, nullable=False)
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
{}
)
# Relationships
config = relationship("ConfigModel", back_populates="config_rule")
def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
result = {
......@@ -71,17 +88,23 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method
if include_position: result['position'] = self.position
return result
@staticmethod
def main_pk_name():
return 'config_rule_uuid'
def set_config_rule(
database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str
) -> Tuple[ConfigRuleModel, bool]:
database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str,
): # -> Tuple[ConfigRuleModel, bool]:
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, {
'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET,
'key': resource_key, 'value': resource_value})
db_config_rule, updated = result
return db_config_rule, updated
str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
data = {'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key,
'value': resource_value}
to_add = ConfigRuleModel(**data)
result = database.create_or_update(to_add)
return result
def delete_config_rule(
database : Database, db_config : ConfigModel, resource_key : str
......
......@@ -33,6 +33,9 @@ class ContextModel(Base):
def dump_id(self) -> Dict:
return {'context_uuid': {'uuid': self.context_uuid}}
def main_pk_name(self):
return 'context_uuid'
"""
def dump_service_ids(self) -> List[Dict]:
from .ServiceModel import ServiceModel # pylint: disable=import-outside-toplevel
......
......@@ -11,24 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import functools, logging
from enum import Enum
import uuid
from typing import Dict, List
from common.orm.Database import Database
from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from .ConfigModel import ConfigModel
from sqlalchemy import Column, ForeignKey, String, Enum
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
from .Tools import grpc_to_enum
LOGGER = logging.getLogger(__name__)
class ORM_DeviceDriverEnum(Enum):
class ORM_DeviceDriverEnum(enum.Enum):
UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED
OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG
TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API
......@@ -39,7 +37,7 @@ class ORM_DeviceDriverEnum(Enum):
grpc_to_enum__device_driver = functools.partial(
grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
class ORM_DeviceOperationalStatusEnum(Enum):
class ORM_DeviceOperationalStatusEnum(enum.Enum):
UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
......@@ -47,48 +45,51 @@ class ORM_DeviceOperationalStatusEnum(Enum):
grpc_to_enum__device_operational_status = functools.partial(
grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum)
class DeviceModel(Model):
pk = PrimaryKeyField()
device_uuid = StringField(required=True, allow_empty=False)
device_type = StringField()
device_config_fk = ForeignKeyField(ConfigModel)
device_operational_status = EnumeratedField(ORM_DeviceOperationalStatusEnum, required=True)
def delete(self) -> None:
# pylint: disable=import-outside-toplevel
from .EndPointModel import EndPointModel
from .RelationModels import TopologyDeviceModel
for db_endpoint_pk,_ in self.references(EndPointModel):
EndPointModel(self.database, db_endpoint_pk).delete()
for db_topology_device_pk,_ in self.references(TopologyDeviceModel):
TopologyDeviceModel(self.database, db_topology_device_pk).delete()
for db_driver_pk,_ in self.references(DriverModel):
DriverModel(self.database, db_driver_pk).delete()
super().delete()
ConfigModel(self.database, self.device_config_fk).delete()
class DeviceModel(Base):
__tablename__ = 'Device'
device_uuid = Column(UUID(as_uuid=False), primary_key=True)
device_type = Column(String)
device_config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid"))
device_operational_status = Column(Enum(ORM_DeviceOperationalStatusEnum, create_constraint=False,
native_enum=False))
# Relationships
device_config = relationship("ConfigModel", lazy="joined")
driver = relationship("DriverModel", lazy="joined")
endpoints = relationship("EndPointModel", lazy="joined")
# def delete(self) -> None:
# # pylint: disable=import-outside-toplevel
# from .EndPointModel import EndPointModel
# from .RelationModels import TopologyDeviceModel
#
# for db_endpoint_pk,_ in self.references(EndPointModel):
# EndPointModel(self.database, db_endpoint_pk).delete()
#
# for db_topology_device_pk,_ in self.references(TopologyDeviceModel):
# TopologyDeviceModel(self.database, db_topology_device_pk).delete()
#
# for db_driver_pk,_ in self.references(DriverModel):
# DriverModel(self.database, db_driver_pk).delete()
#
# super().delete()
#
# ConfigModel(self.database, self.device_config_fk).delete()
def dump_id(self) -> Dict:
return {'device_uuid': {'uuid': self.device_uuid}}
def dump_config(self) -> Dict:
return ConfigModel(self.database, self.device_config_fk).dump()
return self.device_config.dump()
def dump_drivers(self) -> List[int]:
db_driver_pks = self.references(DriverModel)
return [DriverModel(self.database, pk).dump() for pk,_ in db_driver_pks]
return self.driver.dump()
def dump_endpoints(self) -> List[Dict]:
from .EndPointModel import EndPointModel # pylint: disable=import-outside-toplevel
db_endpoints_pks = self.references(EndPointModel)
return [EndPointModel(self.database, pk).dump() for pk,_ in db_endpoints_pks]
return self.endpoints.dump()
def dump( # pylint: disable=arguments-differ
self, include_config_rules=True, include_drivers=True, include_endpoints=True
self, include_config_rules=True, include_drivers=False, include_endpoints=False
) -> Dict:
result = {
'device_id': self.dump_id(),
......@@ -100,16 +101,27 @@ class DeviceModel(Model):
if include_endpoints: result['device_endpoints'] = self.dump_endpoints()
return result
class DriverModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
device_fk = ForeignKeyField(DeviceModel)
driver = EnumeratedField(ORM_DeviceDriverEnum, required=True)
def main_pk_name(self):
return 'device_uuid'
class DriverModel(Base): # pylint: disable=abstract-method
__tablename__ = 'Driver'
driver_uuid = Column(UUID(as_uuid=False), primary_key=True)
device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid"), primary_key=True)
driver = Column(Enum(ORM_DeviceDriverEnum, create_constraint=False, native_enum=False))
# Relationships
device = relationship("DeviceModel")
def dump(self) -> Dict:
return self.driver.value
def main_pk_name(self):
return 'driver_uuid'
def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers):
db_device_pk = db_device.pk
db_device_pk = db_device.device_uuid
for driver in grpc_device_drivers:
orm_driver = grpc_to_enum__device_driver(driver)
str_device_driver_key = key_to_str([db_device_pk, orm_driver.name])
......
......@@ -17,24 +17,25 @@ from typing import Dict, List, Optional, Tuple
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import EndPointId
from .DeviceModel import DeviceModel
from .KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from .TopologyModel import TopologyModel
from sqlalchemy import Column, ForeignKey, String, Enum, ForeignKeyConstraint
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
LOGGER = logging.getLogger(__name__)
class EndPointModel(Model):
pk = PrimaryKeyField()
topology_fk = ForeignKeyField(TopologyModel, required=False)
device_fk = ForeignKeyField(DeviceModel)
endpoint_uuid = StringField(required=True, allow_empty=False)
endpoint_type = StringField()
class EndPointModel(Base):
__tablename__ = 'EndPoint'
endpoint_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid"), primary_key=True)
device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid"), primary_key=True)
endpoint_type = Column(String)
# Relationships
def main_pk_name(self):
return 'endpoint_uuid'
def delete(self) -> None:
for db_kpi_sample_type_pk,_ in self.references(KpiSampleTypeModel):
......@@ -42,13 +43,10 @@ class EndPointModel(Model):
super().delete()
def dump_id(self) -> Dict:
device_id = DeviceModel(self.database, self.device_fk).dump_id()
result = {
'device_id': device_id,
'device_uuid': self.device_uuid,
'endpoint_uuid': {'uuid': self.endpoint_uuid},
}
if self.topology_fk is not None:
result['topology_id'] = TopologyModel(self.database, self.topology_fk).dump_id()
return result
def dump_kpi_sample_types(self) -> List[int]:
......@@ -59,20 +57,26 @@ class EndPointModel(Model):
self, include_kpi_sample_types=True
) -> Dict:
result = {
'endpoint_id': self.dump_id(),
'endpoint_uuid': self.dump_id(),
'endpoint_type': self.endpoint_type,
}
if include_kpi_sample_types: result['kpi_sample_types'] = self.dump_kpi_sample_types()
return result
class KpiSampleTypeModel(Model): # pylint: disable=abstract-method
pk = PrimaryKeyField()
endpoint_fk = ForeignKeyField(EndPointModel)
kpi_sample_type = EnumeratedField(ORM_KpiSampleTypeEnum, required=True)
class KpiSampleTypeModel(Base): # pylint: disable=abstract-method
__tablename__ = 'KpiSampleType'
kpi_uuid = Column(UUID(as_uuid=False), primary_key=True)
endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"))
kpi_sample_type = Column(Enum(ORM_KpiSampleTypeEnum, create_constraint=False,
native_enum=False))
# __table_args__ = (ForeignKeyConstraint([endpoint_uuid], [EndPointModel.endpoint_uuid]), {})
def dump(self) -> Dict:
return self.kpi_sample_type.value
def main_pk_name(self):
return 'kpi_uuid'
"""
def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_endpoint_kpi_sample_types):
db_endpoint_pk = db_endpoint.pk
for kpi_sample_type in grpc_endpoint_kpi_sample_types:
......@@ -82,7 +86,7 @@ def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_
db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint
db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type
db_endpoint_kpi_sample_type.save()
"""
def get_endpoint(
database : Database, grpc_endpoint_id : EndPointId,
validate_topology_exists : bool = True, validate_device_in_topology : bool = True
......
......@@ -13,11 +13,11 @@
# limitations under the License.
import functools
from enum import Enum
import enum
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from .Tools import grpc_to_enum
class ORM_KpiSampleTypeEnum(Enum):
class ORM_KpiSampleTypeEnum(enum.Enum):
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
......
......@@ -15,8 +15,9 @@
import hashlib, re
from enum import Enum
from typing import Dict, List, Tuple, Union
import logging
# Convenient helper function to remove dictionary items in dict/list/set comprehensions.
LOGGER = logging.getLogger(__name__)
def remove_dict_key(dictionary : Dict, key : str):
dictionary.pop(key, None)
......
......@@ -14,11 +14,6 @@
import logging, operator
from typing import Dict, List
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.orm.HighLevel import get_related_objects
from sqlalchemy.orm import relationship
from sqlalchemy import Column, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
......@@ -28,10 +23,10 @@ LOGGER = logging.getLogger(__name__)
class TopologyModel(Base):
__tablename__ = 'Topology'
context_uuid = Column(UUID(as_uuid=False), ForeignKey("Context.context_uuid"), primary_key=True)
topology_uuid = Column(UUID(as_uuid=False), primary_key=True)
topology_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
# Relationships
context = relationship("ContextModel", back_populates="topology", lazy="subquery")
context = relationship("ContextModel", back_populates="topology", lazy="joined")
def dump_id(self) -> Dict:
context_id = self.context.dump_id()
......@@ -40,6 +35,10 @@ class TopologyModel(Base):
'topology_uuid': {'uuid': self.topology_uuid},
}
@staticmethod
def main_pk_name() -> str:
return 'topology_uuid'
"""def dump_device_ids(self) -> List[Dict]:
from .RelationModels import TopologyDeviceModel # pylint: disable=import-outside-toplevel
db_devices = get_related_objects(self, TopologyDeviceModel, 'device_fk')
......
......@@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import grpc, json, logging, operator, threading
from typing import Iterator, List, Set, Tuple
from typing import Iterator, List, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
from context.service.Database import Database
......@@ -25,19 +26,24 @@ from common.proto.context_pb2 import (
Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList)
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
ConfigActionEnum)
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
from sqlalchemy.orm import Session, contains_eager, selectinload
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
from context.service.database.ConfigModel import grpc_config_rules_to_raw
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel
from context.service.database.ConfigModel import ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel
from common.orm.backend.Tools import key_to_str
from ..database.KpiSampleType import grpc_to_enum__kpi_sample_type
"""
from context.service.database.ConfigModel import grpc_config_rules_to_raw, update_config
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import set_constraints
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers
from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
from context.service.database.Events import notify_event
from context.service.database.LinkModel import LinkModel
......@@ -51,8 +57,9 @@ from context.service.database.TopologyModel import TopologyModel
"""
from context.service.database.ContextModel import ContextModel
from context.service.database.TopologyModel import TopologyModel
# from context.service.database.TopologyModel import TopologyModel
from context.service.database.Events import notify_event
from context.service.database.EndPointModel import EndPointModel
from context.service.database.EndPointModel import KpiSampleTypeModel
from .Constants import (
CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
......@@ -201,10 +208,10 @@ class ContextServiceServicerImpl(ContextServiceServicer):
with self.session() as session:
result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).options(contains_eager(TopologyModel.context)).one_or_none()
if not result:
raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid)
if not result:
raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid)
return Topology(**result.dump())
return Topology(**result.dump())
@safe_and_metered_rpc_method(METRICS, LOGGER)
......@@ -247,97 +254,201 @@ class ContextServiceServicerImpl(ContextServiceServicer):
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
yield TopologyEvent(**json.loads(message.content))
"""
# ----- Device -----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
with self.lock:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])
with self.session() as session:
result = session.query(DeviceModel).all()
return DeviceIdList(device_ids=[device.dump_id() for device in result])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
with self.lock:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel)
db_devices = sorted(db_devices, key=operator.attrgetter('pk'))
return DeviceList(devices=[db_device.dump() for db_device in db_devices])
with self.session() as session:
result = session.query(DeviceModel).all()
return DeviceList(devices=[device.dump_id() for device in result])
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
with self.lock:
device_uuid = request.device_uuid.uuid
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid)
return Device(**db_device.dump(
include_config_rules=True, include_drivers=True, include_endpoints=True))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
with self.lock:
device_uuid = request.device_id.device_uuid.uuid
for i,endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
db_running_config = running_config_result[0][0]
result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
'device_uuid' : device_uuid,
'device_type' : request.device_type,
'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
'device_config_fk' : db_running_config,
})
db_device, updated = result
set_drivers(self.database, db_device, request.device_drivers)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_attributes = {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint.endpoint_type,
}
endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
self.database, TopologyDeviceModel, str_topology_device_key, {
'topology_fk': db_topology, 'device_fk': db_device})
#db_topology_device, topology_device_created = result
device_uuid = request.device_uuid.uuid
with self.session() as session:
result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none()
if not result:
raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid)
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
endpoint_attributes['topology_fk'] = db_topology
rd = result.dump()
rt = Device(**rd)
result : Tuple[EndPointModel, bool] = update_or_create_object(
self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
db_endpoint, endpoint_updated = result
return rt
set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
device_uuid = request.device_id.device_uuid.uuid
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_device_id = db_device.dump_id()
notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
return DeviceId(**dict_device_id)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
running_config_result = self.update_config(device_uuid, 'running', config_rules)
db_running_config = running_config_result[0][0]
config_uuid = db_running_config.config_uuid
new_obj = DeviceModel(**{
'device_uuid' : device_uuid,
'device_type' : request.device_type,
'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status),
'device_config_uuid' : config_uuid,
})
result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj)
db_device, updated = result
self.set_drivers(db_device, request.device_drivers)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_attributes = {
'device_uuid' : db_device.device_uuid,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint.endpoint_type,
}
endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid)
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
endpoint_attributes['topology_uuid'] = db_topology.topology_uuid
new_endpoint = EndPointModel(**endpoint_attributes)
result : Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint)
db_endpoint, updated = result
self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types)
# event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_device_id = db_device.dump_id()
# notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
return DeviceId(**dict_device_id)
def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types):
db_endpoint_pk = db_endpoint.endpoint_uuid
for kpi_sample_type in grpc_endpoint_kpi_sample_types:
orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type)
# str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name])
data = {'endpoint_uuid': db_endpoint_pk,
'kpi_sample_type': orm_kpi_sample_type.name,
'kpi_uuid': str(uuid.uuid4())}
db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data)
self.database.create(db_endpoint_kpi_sample_type)
def set_drivers(self, db_device: DeviceModel, grpc_device_drivers):
db_device_pk = db_device.device_uuid
for driver in grpc_device_drivers:
orm_driver = grpc_to_enum__device_driver(driver)
str_device_driver_key = key_to_str([db_device_pk, orm_driver.name])
driver_config = {
"driver_uuid": str(uuid.uuid4()),
"device_uuid": db_device_pk,
"driver": orm_driver.name
}
db_device_driver = DriverModel(**driver_config)
db_device_driver.device_fk = db_device
db_device_driver.driver = orm_driver
self.database.create_or_update(db_device_driver)
def update_config(
self, db_parent_pk: str, config_name: str,
raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]]
) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
result = self.database.get_or_create(ConfigModel, db_parent_pk)
db_config, created = result
LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump()))
db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)]
for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
if action == ORM_ConfigActionEnum.SET:
result : Tuple[ConfigRuleModel, bool] = self.set_config_rule(
db_config, position, resource_key, resource_value)
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
self.delete_config_rule(db_config, resource_key)
else:
msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
raise AttributeError(
msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))
return db_objects
def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str,
): # -> Tuple[ConfigRuleModel, bool]:
from src.context.service.database.Tools import fast_hasher
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key))
data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position,
'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value}
to_add = ConfigRuleModel(**data)
result, updated = self.database.create_or_update(to_add)
return result, updated
def delete_config_rule(
self, db_config: ConfigModel, resource_key: str
) -> None:
from src.context.service.database.Tools import fast_hasher
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
if db_config_rule is None:
return
db_config_rule.delete()
def delete_all_config_rules(self, db_config: ConfigModel) -> None:
db_config_rule_pks = db_config.references(ConfigRuleModel)
for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()
"""
for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
if action == ORM_ConfigActionEnum.SET:
result: Tuple[ConfigRuleModel, bool] = set_config_rule(
database, db_config, position, resource_key, resource_value)
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
delete_config_rule(database, db_config, resource_key)
else:
msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
raise AttributeError(
msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))
return db_objects
"""
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
......@@ -360,6 +471,9 @@ class ContextServiceServicerImpl(ContextServiceServicer):
yield DeviceEvent(**json.loads(message.content))
"""
# ----- Link -------------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
......
......@@ -45,12 +45,17 @@ PACKET_PORT_SAMPLE_TYPES = [
# ----- Device ---------------------------------------------------------------------------------------------------------
DEVICE_R1_UUID = 'R1'
EP2 = '7eb80584-2587-4e71-b10c-f3a5c48e84ab'
EP3 = '368baf47-0540-4ab4-add8-a19b5167162c'
EP100 = '6a923121-36e1-4b5e-8cd6-90aceca9b5cf'
DEVICE_R1_UUID = 'fe83a200-6ded-47b4-b156-3bb3556a10d6'
DEVICE_R1_ID = json_device_id(DEVICE_R1_UUID)
DEVICE_R1_EPS = [
json_endpoint(DEVICE_R1_ID, 'EP2', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, 'EP3', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, 'EP100', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, EP2, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, EP3, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, EP100, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
]
DEVICE_R1_RULES = [
json_config_rule_set('dev/rsrc1/value', 'value1'),
......
......@@ -20,7 +20,6 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import (
......@@ -84,7 +83,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]:
msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))
db_uri = 'cockroachdb://root@10.152.183.66:26257/defaultdb?sslmode=disable'
db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable'
LOGGER.debug('Connecting to DB: {}'.format(db_uri))
try:
......@@ -95,7 +94,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]:
return 1
Base.metadata.create_all(engine)
_session = sessionmaker(bind=engine)
_session = sessionmaker(bind=engine, expire_on_commit=False)
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _session, _message_broker
......@@ -164,7 +163,7 @@ def test_grpc_context(
assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.query_all(ContextModel)
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
......@@ -214,7 +213,7 @@ def test_grpc_context(
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.query_all(ContextModel)
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
......@@ -252,7 +251,7 @@ def test_grpc_context(
events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.query_all(ContextModel)
db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
......@@ -295,7 +294,7 @@ def test_grpc_topology(
assert len(response.topologies) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.query_all(TopologyModel)
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
......@@ -337,7 +336,7 @@ def test_grpc_topology(
# assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.query_all(TopologyModel)
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
......@@ -384,22 +383,22 @@ def test_grpc_topology(
# events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.query_all(TopologyModel)
db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
"""
def test_grpc_device(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0]
context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name
session = context_s_mb[0]
database = Database(session)
# ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all()
database.clear()
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc)
......@@ -438,49 +437,49 @@ def test_grpc_device(
assert len(response.devices) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump()
db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 5
assert len(db_entries) == 2
# ----- Create the object ------------------------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
WRONG_DEVICE = copy.deepcopy(DEVICE_R1)
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = 'wrong-device-uuid'
WRONG_DEVICE_UUID = '3f03c76d-31fb-47f5-9c1d-bc6b6bfa2d08'
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = WRONG_DEVICE_UUID
context_client_grpc.SetDevice(Device(**WRONG_DEVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.device_endpoints[0].device_id.device_uuid.uuid(wrong-device-uuid) is invalid; '\
'should be == request.device_id.device_uuid.uuid({:s})'.format(DEVICE_R1_UUID)
msg = 'request.device_endpoints[0].device_id.device_uuid.uuid({}) is invalid; '\
'should be == request.device_id.device_uuid.uuid({})'.format(WRONG_DEVICE_UUID, DEVICE_R1_UUID)
assert e.value.details() == msg
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# event = events_collector.get_event(block=True)
# assert isinstance(event, DeviceEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
# assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# event = events_collector.get_event(block=True)
# assert isinstance(event, DeviceEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump()
db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 40
assert len(db_entries) == 36
# ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID))
......@@ -513,11 +512,11 @@ def test_grpc_device(
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# event = events_collector.get_event(block=True)
# assert isinstance(event, TopologyEvent)
# assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
# assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
......@@ -528,12 +527,12 @@ def test_grpc_device(
assert len(response.link_ids) == 0
# ----- Dump state of database after creating the object relation --------------------------------------------------
db_entries = context_database.dump()
db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 40
assert len(db_entries) == 33
# ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID))
......@@ -541,33 +540,33 @@ def test_grpc_device(
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=3)
# events = events_collector.get_events(block=True, count=3)
assert isinstance(events[0], DeviceEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].device_id.device_uuid.uuid == DEVICE_R1_UUID
# assert isinstance(events[0], DeviceEvent)
# assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[0].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# assert isinstance(events[1], TopologyEvent)
# assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], ContextEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# assert isinstance(events[2], ContextEvent)
# assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
# assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
# events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump()
db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
# for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0
"""
def test_grpc_link(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
......