Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (2)
from typing import Tuple, List
from sqlalchemy import MetaData
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from context.service.database.Base import Base from context.service.database.Base import Base
import logging import logging
from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -10,7 +16,7 @@ class Database(Session): ...@@ -10,7 +16,7 @@ class Database(Session):
super().__init__() super().__init__()
self.session = session self.session = session
def query_all(self, model): def get_all(self, model):
result = [] result = []
with self.session() as session: with self.session() as session:
for entry in session.query(model).all(): for entry in session.query(model).all():
...@@ -18,11 +24,88 @@ class Database(Session): ...@@ -18,11 +24,88 @@ class Database(Session):
return result return result
def get_object(self): def create_or_update(self, model):
pass with self.session() as session:
att = getattr(model, model.main_pk_name())
filt = {model.main_pk_name(): att}
found = session.query(type(model)).filter_by(**filt).one_or_none()
if found:
found = True
else:
found = False
session.merge(model)
session.commit()
return model, found
def create(self, model):
with self.session() as session:
session.add(model)
session.commit()
return model
def remove(self, model, filter_d):
model_t = type(model)
with self.session() as session:
session.query(model_t).filter_by(**filter_d).delete()
session.commit()
def clear(self): def clear(self):
with self.session() as session: with self.session() as session:
engine = session.get_bind() engine = session.get_bind()
Base.metadata.drop_all(engine) Base.metadata.drop_all(engine)
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
def dump_by_table(self):
with self.session() as session:
engine = session.get_bind()
meta = MetaData()
meta.reflect(engine)
result = {}
for table in meta.sorted_tables:
result[table.name] = [dict(row) for row in engine.execute(table.select())]
LOGGER.info(result)
return result
def dump_all(self):
with self.session() as session:
engine = session.get_bind()
meta = MetaData()
meta.reflect(engine)
result = []
for table in meta.sorted_tables:
for row in engine.execute(table.select()):
result.append((table.name, dict(row)))
LOGGER.info(result)
return result
def get_object(self, model_class: Base, main_key: str, raise_if_not_found=False):
filt = {model_class.main_pk_name(): main_key}
with self.session() as session:
get = session.query(model_class).filter_by(**filt).one_or_none()
if not get:
if raise_if_not_found:
raise NotFoundException(model_class.__name__.replace('Model', ''), main_key)
return get
def get_or_create(self, model_class: Base, key_parts: List[str]
) -> Tuple[Base, bool]:
str_key = key_to_str(key_parts)
filt = {model_class.main_pk_name(): key_parts}
with self.session() as session:
get = session.query(model_class).filter_by(**filt).one_or_none()
if get:
return get, False
else:
obj = model_class()
setattr(obj, model_class.main_pk_name(), str_key)
LOGGER.info(obj.dump())
session.add(obj)
session.commit()
return obj, True
...@@ -52,7 +52,7 @@ def main(): ...@@ -52,7 +52,7 @@ def main():
start_http_server(metrics_port) start_http_server(metrics_port)
# Get database instance # Get database instance
db_uri = 'cockroachdb://root@10.152.183.66:26257/defaultdb?sslmode=disable' db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable'
LOGGER.debug('Connecting to DB: {}'.format(db_uri)) LOGGER.debug('Connecting to DB: {}'.format(db_uri))
# engine = create_engine(db_uri, echo=False) # engine = create_engine(db_uri, echo=False)
...@@ -65,7 +65,7 @@ def main(): ...@@ -65,7 +65,7 @@ def main():
return 1 return 1
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
session = sessionmaker(bind=engine) session = sessionmaker(bind=engine, expire_on_commit=False)
# Get message broker instance # Get message broker instance
messagebroker = MessageBroker(get_messagebroker_backend()) messagebroker = MessageBroker(get_messagebroker_backend())
......
...@@ -11,26 +11,23 @@ ...@@ -11,26 +11,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import enum
import functools, logging, operator import functools, logging, operator
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union from typing import Dict, List, Optional, Tuple, Union
from common.orm.Database import Database
from common.orm.HighLevel import get_object, get_or_create_object, update_or_create_object
from common.orm.backend.Tools import key_to_str from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.IntegerField import IntegerField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import ConfigActionEnum from common.proto.context_pb2 import ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
from context.service.Database import Database
from .Tools import fast_hasher, grpc_to_enum, remove_dict_key from .Tools import fast_hasher, grpc_to_enum, remove_dict_key
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class ORM_ConfigActionEnum(Enum): class ORM_ConfigActionEnum(enum.Enum):
UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED UNDEFINED = ConfigActionEnum.CONFIGACTION_UNDEFINED
SET = ConfigActionEnum.CONFIGACTION_SET SET = ConfigActionEnum.CONFIGACTION_SET
DELETE = ConfigActionEnum.CONFIGACTION_DELETE DELETE = ConfigActionEnum.CONFIGACTION_DELETE
...@@ -38,27 +35,47 @@ class ORM_ConfigActionEnum(Enum): ...@@ -38,27 +35,47 @@ class ORM_ConfigActionEnum(Enum):
grpc_to_enum__config_action = functools.partial( grpc_to_enum__config_action = functools.partial(
grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum) grpc_to_enum, ConfigActionEnum, ORM_ConfigActionEnum)
class ConfigModel(Model): # pylint: disable=abstract-method class ConfigModel(Base): # pylint: disable=abstract-method
pk = PrimaryKeyField() __tablename__ = 'Config'
config_uuid = Column(UUID(as_uuid=False), primary_key=True)
# Relationships
config_rule = relationship("ConfigRuleModel", back_populates="config", lazy="dynamic")
def delete(self) -> None: def delete(self) -> None:
db_config_rule_pks = self.references(ConfigRuleModel) db_config_rule_pks = self.references(ConfigRuleModel)
for pk,_ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete() for pk,_ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()
super().delete() super().delete()
def dump(self) -> List[Dict]: def dump(self): # -> List[Dict]:
db_config_rule_pks = self.references(ConfigRuleModel) config_rules = []
config_rules = [ConfigRuleModel(self.database, pk).dump(include_position=True) for pk,_ in db_config_rule_pks] for a in self.config_rule:
config_rules = sorted(config_rules, key=operator.itemgetter('position')) asdf = a.dump()
config_rules.append(asdf)
return [remove_dict_key(config_rule, 'position') for config_rule in config_rules] return [remove_dict_key(config_rule, 'position') for config_rule in config_rules]
class ConfigRuleModel(Model): # pylint: disable=abstract-method @staticmethod
pk = PrimaryKeyField() def main_pk_name():
config_fk = ForeignKeyField(ConfigModel) return 'config_uuid'
position = IntegerField(min_value=0, required=True)
action = EnumeratedField(ORM_ConfigActionEnum, required=True) class ConfigRuleModel(Base): # pylint: disable=abstract-method
key = StringField(required=True, allow_empty=False) __tablename__ = 'ConfigRule'
value = StringField(required=True, allow_empty=False) config_rule_uuid = Column(UUID(as_uuid=False), primary_key=True)
config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid"), primary_key=True)
action = Column(Enum(ORM_ConfigActionEnum, create_constraint=True, native_enum=True), nullable=False)
position = Column(INTEGER, nullable=False)
key = Column(String, nullable=False)
value = Column(String, nullable=False)
__table_args__ = (
CheckConstraint(position >= 0, name='check_position_value'),
{}
)
# Relationships
config = relationship("ConfigModel", back_populates="config_rule")
def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ def dump(self, include_position=True) -> Dict: # pylint: disable=arguments-differ
result = { result = {
...@@ -71,17 +88,23 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method ...@@ -71,17 +88,23 @@ class ConfigRuleModel(Model): # pylint: disable=abstract-method
if include_position: result['position'] = self.position if include_position: result['position'] = self.position
return result return result
@staticmethod
def main_pk_name():
return 'config_rule_uuid'
def set_config_rule( def set_config_rule(
database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str database : Database, db_config : ConfigModel, position : int, resource_key : str, resource_value : str,
) -> Tuple[ConfigRuleModel, bool]: ): # -> Tuple[ConfigRuleModel, bool]:
str_rule_key_hash = fast_hasher(resource_key) str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':') str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
result : Tuple[ConfigRuleModel, bool] = update_or_create_object(database, ConfigRuleModel, str_config_rule_key, {
'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, data = {'config_fk': db_config, 'position': position, 'action': ORM_ConfigActionEnum.SET, 'key': resource_key,
'key': resource_key, 'value': resource_value}) 'value': resource_value}
db_config_rule, updated = result to_add = ConfigRuleModel(**data)
return db_config_rule, updated
result = database.create_or_update(to_add)
return result
def delete_config_rule( def delete_config_rule(
database : Database, db_config : ConfigModel, resource_key : str database : Database, db_config : ConfigModel, resource_key : str
......
...@@ -33,6 +33,9 @@ class ContextModel(Base): ...@@ -33,6 +33,9 @@ class ContextModel(Base):
def dump_id(self) -> Dict: def dump_id(self) -> Dict:
return {'context_uuid': {'uuid': self.context_uuid}} return {'context_uuid': {'uuid': self.context_uuid}}
def main_pk_name(self):
return 'context_uuid'
""" """
def dump_service_ids(self) -> List[Dict]: def dump_service_ids(self) -> List[Dict]:
from .ServiceModel import ServiceModel # pylint: disable=import-outside-toplevel from .ServiceModel import ServiceModel # pylint: disable=import-outside-toplevel
......
...@@ -11,24 +11,22 @@ ...@@ -11,24 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import enum
import functools, logging import functools, logging
from enum import Enum import uuid
from typing import Dict, List from typing import Dict, List
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.backend.Tools import key_to_str from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum from common.proto.context_pb2 import DeviceDriverEnum, DeviceOperationalStatusEnum
from .ConfigModel import ConfigModel from sqlalchemy import Column, ForeignKey, String, Enum
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
from .Tools import grpc_to_enum from .Tools import grpc_to_enum
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class ORM_DeviceDriverEnum(Enum): class ORM_DeviceDriverEnum(enum.Enum):
UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED UNDEFINED = DeviceDriverEnum.DEVICEDRIVER_UNDEFINED
OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG OPENCONFIG = DeviceDriverEnum.DEVICEDRIVER_OPENCONFIG
TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API TRANSPORT_API = DeviceDriverEnum.DEVICEDRIVER_TRANSPORT_API
...@@ -39,7 +37,7 @@ class ORM_DeviceDriverEnum(Enum): ...@@ -39,7 +37,7 @@ class ORM_DeviceDriverEnum(Enum):
grpc_to_enum__device_driver = functools.partial( grpc_to_enum__device_driver = functools.partial(
grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum) grpc_to_enum, DeviceDriverEnum, ORM_DeviceDriverEnum)
class ORM_DeviceOperationalStatusEnum(Enum): class ORM_DeviceOperationalStatusEnum(enum.Enum):
UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
...@@ -47,48 +45,51 @@ class ORM_DeviceOperationalStatusEnum(Enum): ...@@ -47,48 +45,51 @@ class ORM_DeviceOperationalStatusEnum(Enum):
grpc_to_enum__device_operational_status = functools.partial( grpc_to_enum__device_operational_status = functools.partial(
grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum) grpc_to_enum, DeviceOperationalStatusEnum, ORM_DeviceOperationalStatusEnum)
class DeviceModel(Model): class DeviceModel(Base):
pk = PrimaryKeyField() __tablename__ = 'Device'
device_uuid = StringField(required=True, allow_empty=False) device_uuid = Column(UUID(as_uuid=False), primary_key=True)
device_type = StringField() device_type = Column(String)
device_config_fk = ForeignKeyField(ConfigModel) device_config_uuid = Column(UUID(as_uuid=False), ForeignKey("Config.config_uuid"))
device_operational_status = EnumeratedField(ORM_DeviceOperationalStatusEnum, required=True) device_operational_status = Column(Enum(ORM_DeviceOperationalStatusEnum, create_constraint=False,
native_enum=False))
def delete(self) -> None:
# pylint: disable=import-outside-toplevel # Relationships
from .EndPointModel import EndPointModel device_config = relationship("ConfigModel", lazy="joined")
from .RelationModels import TopologyDeviceModel driver = relationship("DriverModel", lazy="joined")
endpoints = relationship("EndPointModel", lazy="joined")
for db_endpoint_pk,_ in self.references(EndPointModel):
EndPointModel(self.database, db_endpoint_pk).delete() # def delete(self) -> None:
# # pylint: disable=import-outside-toplevel
for db_topology_device_pk,_ in self.references(TopologyDeviceModel): # from .EndPointModel import EndPointModel
TopologyDeviceModel(self.database, db_topology_device_pk).delete() # from .RelationModels import TopologyDeviceModel
#
for db_driver_pk,_ in self.references(DriverModel): # for db_endpoint_pk,_ in self.references(EndPointModel):
DriverModel(self.database, db_driver_pk).delete() # EndPointModel(self.database, db_endpoint_pk).delete()
#
super().delete() # for db_topology_device_pk,_ in self.references(TopologyDeviceModel):
# TopologyDeviceModel(self.database, db_topology_device_pk).delete()
ConfigModel(self.database, self.device_config_fk).delete() #
# for db_driver_pk,_ in self.references(DriverModel):
# DriverModel(self.database, db_driver_pk).delete()
#
# super().delete()
#
# ConfigModel(self.database, self.device_config_fk).delete()
def dump_id(self) -> Dict: def dump_id(self) -> Dict:
return {'device_uuid': {'uuid': self.device_uuid}} return {'device_uuid': {'uuid': self.device_uuid}}
def dump_config(self) -> Dict: def dump_config(self) -> Dict:
return ConfigModel(self.database, self.device_config_fk).dump() return self.device_config.dump()
def dump_drivers(self) -> List[int]: def dump_drivers(self) -> List[int]:
db_driver_pks = self.references(DriverModel) return self.driver.dump()
return [DriverModel(self.database, pk).dump() for pk,_ in db_driver_pks]
def dump_endpoints(self) -> List[Dict]: def dump_endpoints(self) -> List[Dict]:
from .EndPointModel import EndPointModel # pylint: disable=import-outside-toplevel return self.endpoints.dump()
db_endpoints_pks = self.references(EndPointModel)
return [EndPointModel(self.database, pk).dump() for pk,_ in db_endpoints_pks]
def dump( # pylint: disable=arguments-differ def dump( # pylint: disable=arguments-differ
self, include_config_rules=True, include_drivers=True, include_endpoints=True self, include_config_rules=True, include_drivers=False, include_endpoints=False
) -> Dict: ) -> Dict:
result = { result = {
'device_id': self.dump_id(), 'device_id': self.dump_id(),
...@@ -100,16 +101,27 @@ class DeviceModel(Model): ...@@ -100,16 +101,27 @@ class DeviceModel(Model):
if include_endpoints: result['device_endpoints'] = self.dump_endpoints() if include_endpoints: result['device_endpoints'] = self.dump_endpoints()
return result return result
class DriverModel(Model): # pylint: disable=abstract-method def main_pk_name(self):
pk = PrimaryKeyField() return 'device_uuid'
device_fk = ForeignKeyField(DeviceModel)
driver = EnumeratedField(ORM_DeviceDriverEnum, required=True) class DriverModel(Base): # pylint: disable=abstract-method
__tablename__ = 'Driver'
driver_uuid = Column(UUID(as_uuid=False), primary_key=True)
device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid"), primary_key=True)
driver = Column(Enum(ORM_DeviceDriverEnum, create_constraint=False, native_enum=False))
# Relationships
device = relationship("DeviceModel")
def dump(self) -> Dict: def dump(self) -> Dict:
return self.driver.value return self.driver.value
def main_pk_name(self):
return 'driver_uuid'
def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers): def set_drivers(database : Database, db_device : DeviceModel, grpc_device_drivers):
db_device_pk = db_device.pk db_device_pk = db_device.device_uuid
for driver in grpc_device_drivers: for driver in grpc_device_drivers:
orm_driver = grpc_to_enum__device_driver(driver) orm_driver = grpc_to_enum__device_driver(driver)
str_device_driver_key = key_to_str([db_device_pk, orm_driver.name]) str_device_driver_key = key_to_str([db_device_pk, orm_driver.name])
......
...@@ -17,24 +17,25 @@ from typing import Dict, List, Optional, Tuple ...@@ -17,24 +17,25 @@ from typing import Dict, List, Optional, Tuple
from common.orm.Database import Database from common.orm.Database import Database
from common.orm.HighLevel import get_object from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str from common.orm.backend.Tools import key_to_str
from common.orm.fields.EnumeratedField import EnumeratedField
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.proto.context_pb2 import EndPointId from common.proto.context_pb2 import EndPointId
from .DeviceModel import DeviceModel
from .KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type from .KpiSampleType import ORM_KpiSampleTypeEnum, grpc_to_enum__kpi_sample_type
from .TopologyModel import TopologyModel from sqlalchemy import Column, ForeignKey, String, Enum, ForeignKeyConstraint
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database.Base import Base
from sqlalchemy.orm import relationship
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class EndPointModel(Model): class EndPointModel(Base):
pk = PrimaryKeyField() __tablename__ = 'EndPoint'
topology_fk = ForeignKeyField(TopologyModel, required=False) endpoint_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
device_fk = ForeignKeyField(DeviceModel) topology_uuid = Column(UUID(as_uuid=False), ForeignKey("Topology.topology_uuid"), primary_key=True)
endpoint_uuid = StringField(required=True, allow_empty=False) device_uuid = Column(UUID(as_uuid=False), ForeignKey("Device.device_uuid"), primary_key=True)
endpoint_type = StringField() endpoint_type = Column(String)
# Relationships
def main_pk_name(self):
return 'endpoint_uuid'
def delete(self) -> None: def delete(self) -> None:
for db_kpi_sample_type_pk,_ in self.references(KpiSampleTypeModel): for db_kpi_sample_type_pk,_ in self.references(KpiSampleTypeModel):
...@@ -42,13 +43,10 @@ class EndPointModel(Model): ...@@ -42,13 +43,10 @@ class EndPointModel(Model):
super().delete() super().delete()
def dump_id(self) -> Dict: def dump_id(self) -> Dict:
device_id = DeviceModel(self.database, self.device_fk).dump_id()
result = { result = {
'device_id': device_id, 'device_uuid': self.device_uuid,
'endpoint_uuid': {'uuid': self.endpoint_uuid}, 'endpoint_uuid': {'uuid': self.endpoint_uuid},
} }
if self.topology_fk is not None:
result['topology_id'] = TopologyModel(self.database, self.topology_fk).dump_id()
return result return result
def dump_kpi_sample_types(self) -> List[int]: def dump_kpi_sample_types(self) -> List[int]:
...@@ -59,20 +57,26 @@ class EndPointModel(Model): ...@@ -59,20 +57,26 @@ class EndPointModel(Model):
self, include_kpi_sample_types=True self, include_kpi_sample_types=True
) -> Dict: ) -> Dict:
result = { result = {
'endpoint_id': self.dump_id(), 'endpoint_uuid': self.dump_id(),
'endpoint_type': self.endpoint_type, 'endpoint_type': self.endpoint_type,
} }
if include_kpi_sample_types: result['kpi_sample_types'] = self.dump_kpi_sample_types() if include_kpi_sample_types: result['kpi_sample_types'] = self.dump_kpi_sample_types()
return result return result
class KpiSampleTypeModel(Model): # pylint: disable=abstract-method class KpiSampleTypeModel(Base): # pylint: disable=abstract-method
pk = PrimaryKeyField() __tablename__ = 'KpiSampleType'
endpoint_fk = ForeignKeyField(EndPointModel) kpi_uuid = Column(UUID(as_uuid=False), primary_key=True)
kpi_sample_type = EnumeratedField(ORM_KpiSampleTypeEnum, required=True) endpoint_uuid = Column(UUID(as_uuid=False), ForeignKey("EndPoint.endpoint_uuid"))
kpi_sample_type = Column(Enum(ORM_KpiSampleTypeEnum, create_constraint=False,
native_enum=False))
# __table_args__ = (ForeignKeyConstraint([endpoint_uuid], [EndPointModel.endpoint_uuid]), {})
def dump(self) -> Dict: def dump(self) -> Dict:
return self.kpi_sample_type.value return self.kpi_sample_type.value
def main_pk_name(self):
return 'kpi_uuid'
"""
def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_endpoint_kpi_sample_types): def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_endpoint_kpi_sample_types):
db_endpoint_pk = db_endpoint.pk db_endpoint_pk = db_endpoint.pk
for kpi_sample_type in grpc_endpoint_kpi_sample_types: for kpi_sample_type in grpc_endpoint_kpi_sample_types:
...@@ -82,7 +86,7 @@ def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_ ...@@ -82,7 +86,7 @@ def set_kpi_sample_types(database : Database, db_endpoint : EndPointModel, grpc_
db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint db_endpoint_kpi_sample_type.endpoint_fk = db_endpoint
db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type db_endpoint_kpi_sample_type.kpi_sample_type = orm_kpi_sample_type
db_endpoint_kpi_sample_type.save() db_endpoint_kpi_sample_type.save()
"""
def get_endpoint( def get_endpoint(
database : Database, grpc_endpoint_id : EndPointId, database : Database, grpc_endpoint_id : EndPointId,
validate_topology_exists : bool = True, validate_device_in_topology : bool = True validate_topology_exists : bool = True, validate_device_in_topology : bool = True
......
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
# limitations under the License. # limitations under the License.
import functools import functools
from enum import Enum import enum
from common.proto.kpi_sample_types_pb2 import KpiSampleType from common.proto.kpi_sample_types_pb2 import KpiSampleType
from .Tools import grpc_to_enum from .Tools import grpc_to_enum
class ORM_KpiSampleTypeEnum(Enum): class ORM_KpiSampleTypeEnum(enum.Enum):
UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED PACKETS_TRANSMITTED = KpiSampleType.KPISAMPLETYPE_PACKETS_TRANSMITTED
PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED PACKETS_RECEIVED = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
import hashlib, re import hashlib, re
from enum import Enum from enum import Enum
from typing import Dict, List, Tuple, Union from typing import Dict, List, Tuple, Union
import logging
# Convenient helper function to remove dictionary items in dict/list/set comprehensions. # Convenient helper function to remove dictionary items in dict/list/set comprehensions.
LOGGER = logging.getLogger(__name__)
def remove_dict_key(dictionary : Dict, key : str): def remove_dict_key(dictionary : Dict, key : str):
dictionary.pop(key, None) dictionary.pop(key, None)
......
...@@ -14,11 +14,6 @@ ...@@ -14,11 +14,6 @@
import logging, operator import logging, operator
from typing import Dict, List from typing import Dict, List
from common.orm.fields.ForeignKeyField import ForeignKeyField
from common.orm.fields.PrimaryKeyField import PrimaryKeyField
from common.orm.fields.StringField import StringField
from common.orm.model.Model import Model
from common.orm.HighLevel import get_related_objects
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy import Column, ForeignKey from sqlalchemy import Column, ForeignKey
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
...@@ -28,10 +23,10 @@ LOGGER = logging.getLogger(__name__) ...@@ -28,10 +23,10 @@ LOGGER = logging.getLogger(__name__)
class TopologyModel(Base): class TopologyModel(Base):
__tablename__ = 'Topology' __tablename__ = 'Topology'
context_uuid = Column(UUID(as_uuid=False), ForeignKey("Context.context_uuid"), primary_key=True) context_uuid = Column(UUID(as_uuid=False), ForeignKey("Context.context_uuid"), primary_key=True)
topology_uuid = Column(UUID(as_uuid=False), primary_key=True) topology_uuid = Column(UUID(as_uuid=False), primary_key=True, unique=True)
# Relationships # Relationships
context = relationship("ContextModel", back_populates="topology", lazy="subquery") context = relationship("ContextModel", back_populates="topology", lazy="joined")
def dump_id(self) -> Dict: def dump_id(self) -> Dict:
context_id = self.context.dump_id() context_id = self.context.dump_id()
...@@ -40,6 +35,10 @@ class TopologyModel(Base): ...@@ -40,6 +35,10 @@ class TopologyModel(Base):
'topology_uuid': {'uuid': self.topology_uuid}, 'topology_uuid': {'uuid': self.topology_uuid},
} }
@staticmethod
def main_pk_name() -> str:
return 'topology_uuid'
"""def dump_device_ids(self) -> List[Dict]: """def dump_device_ids(self) -> List[Dict]:
from .RelationModels import TopologyDeviceModel # pylint: disable=import-outside-toplevel from .RelationModels import TopologyDeviceModel # pylint: disable=import-outside-toplevel
db_devices = get_related_objects(self, TopologyDeviceModel, 'device_fk') db_devices = get_related_objects(self, TopologyDeviceModel, 'device_fk')
......
...@@ -11,9 +11,10 @@ ...@@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import uuid
import grpc, json, logging, operator, threading import grpc, json, logging, operator, threading
from typing import Iterator, List, Set, Tuple from typing import Iterator, List, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from context.service.Database import Database from context.service.Database import Database
...@@ -25,19 +26,24 @@ from common.proto.context_pb2 import ( ...@@ -25,19 +26,24 @@ from common.proto.context_pb2 import (
Link, LinkEvent, LinkId, LinkIdList, LinkList, Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceId, SliceIdList, SliceList, Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
ConfigActionEnum)
from common.proto.context_pb2_grpc import ContextServiceServicer from common.proto.context_pb2_grpc import ContextServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException
from sqlalchemy.orm import Session, contains_eager, selectinload from sqlalchemy.orm import Session, contains_eager, selectinload
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
from context.service.database.ConfigModel import grpc_config_rules_to_raw
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel
from context.service.database.ConfigModel import ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel
from common.orm.backend.Tools import key_to_str
from ..database.KpiSampleType import grpc_to_enum__kpi_sample_type
""" """
from context.service.database.ConfigModel import grpc_config_rules_to_raw, update_config
from context.service.database.ConnectionModel import ConnectionModel, set_path from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import set_constraints from context.service.database.ConstraintModel import set_constraints
from context.service.database.DeviceModel import DeviceModel, grpc_to_enum__device_operational_status, set_drivers
from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types from context.service.database.EndPointModel import EndPointModel, set_kpi_sample_types
from context.service.database.Events import notify_event from context.service.database.Events import notify_event
from context.service.database.LinkModel import LinkModel from context.service.database.LinkModel import LinkModel
...@@ -51,8 +57,9 @@ from context.service.database.TopologyModel import TopologyModel ...@@ -51,8 +57,9 @@ from context.service.database.TopologyModel import TopologyModel
""" """
from context.service.database.ContextModel import ContextModel from context.service.database.ContextModel import ContextModel
from context.service.database.TopologyModel import TopologyModel from context.service.database.TopologyModel import TopologyModel
# from context.service.database.TopologyModel import TopologyModel
from context.service.database.Events import notify_event from context.service.database.Events import notify_event
from context.service.database.EndPointModel import EndPointModel
from context.service.database.EndPointModel import KpiSampleTypeModel
from .Constants import ( from .Constants import (
CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
...@@ -201,10 +208,10 @@ class ContextServiceServicerImpl(ContextServiceServicer): ...@@ -201,10 +208,10 @@ class ContextServiceServicerImpl(ContextServiceServicer):
with self.session() as session: with self.session() as session:
result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).options(contains_eager(TopologyModel.context)).one_or_none() result = session.query(TopologyModel).join(TopologyModel.context).filter(TopologyModel.topology_uuid==topology_uuid).options(contains_eager(TopologyModel.context)).one_or_none()
if not result: if not result:
raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid) raise NotFoundException(TopologyModel.__name__.replace('Model', ''), topology_uuid)
return Topology(**result.dump()) return Topology(**result.dump())
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
...@@ -247,97 +254,201 @@ class ContextServiceServicerImpl(ContextServiceServicer): ...@@ -247,97 +254,201 @@ class ContextServiceServicerImpl(ContextServiceServicer):
def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]: def GetTopologyEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[TopologyEvent]:
for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT): for message in self.messagebroker.consume({TOPIC_TOPOLOGY}, consume_timeout=CONSUME_TIMEOUT):
yield TopologyEvent(**json.loads(message.content)) yield TopologyEvent(**json.loads(message.content))
"""
# ----- Device ----------------------------------------------------------------------------------------------------- # ----- Device -----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList: def ListDeviceIds(self, request: Empty, context : grpc.ServicerContext) -> DeviceIdList:
with self.lock: with self.session() as session:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel) result = session.query(DeviceModel).all()
db_devices = sorted(db_devices, key=operator.attrgetter('pk')) return DeviceIdList(device_ids=[device.dump_id() for device in result])
return DeviceIdList(device_ids=[db_device.dump_id() for db_device in db_devices])
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList: def ListDevices(self, request: Empty, context : grpc.ServicerContext) -> DeviceList:
with self.lock: with self.session() as session:
db_devices : List[DeviceModel] = get_all_objects(self.database, DeviceModel) result = session.query(DeviceModel).all()
db_devices = sorted(db_devices, key=operator.attrgetter('pk')) return DeviceList(devices=[device.dump_id() for device in result])
return DeviceList(devices=[db_device.dump() for db_device in db_devices])
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device: def GetDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Device:
with self.lock: device_uuid = request.device_uuid.uuid
device_uuid = request.device_uuid.uuid with self.session() as session:
db_device : DeviceModel = get_object(self.database, DeviceModel, device_uuid) result = session.query(DeviceModel).filter(DeviceModel.device_uuid == device_uuid).one_or_none()
return Device(**db_device.dump( if not result:
include_config_rules=True, include_drivers=True, include_endpoints=True)) raise NotFoundException(DeviceModel.__name__.replace('Model', ''), device_uuid)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
with self.lock:
device_uuid = request.device_id.device_uuid.uuid
for i,endpoint in enumerate(request.device_endpoints):
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
running_config_result = update_config(self.database, device_uuid, 'running', config_rules)
db_running_config = running_config_result[0][0]
result : Tuple[DeviceModel, bool] = update_or_create_object(self.database, DeviceModel, device_uuid, {
'device_uuid' : device_uuid,
'device_type' : request.device_type,
'device_operational_status': grpc_to_enum__device_operational_status(request.device_operational_status),
'device_config_fk' : db_running_config,
})
db_device, updated = result
set_drivers(self.database, db_device, request.device_drivers)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_attributes = {
'device_fk' : db_device,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint.endpoint_type,
}
endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = get_object(self.database, TopologyModel, str_topology_key)
str_topology_device_key = key_to_str([str_topology_key, device_uuid], separator='--')
result : Tuple[TopologyDeviceModel, bool] = get_or_create_object(
self.database, TopologyDeviceModel, str_topology_device_key, {
'topology_fk': db_topology, 'device_fk': db_device})
#db_topology_device, topology_device_created = result
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':') rd = result.dump()
endpoint_attributes['topology_fk'] = db_topology rt = Device(**rd)
result : Tuple[EndPointModel, bool] = update_or_create_object( return rt
self.database, EndPointModel, str_endpoint_key, endpoint_attributes)
db_endpoint, endpoint_updated = result
set_kpi_sample_types(self.database, db_endpoint, endpoint.kpi_sample_types) @safe_and_metered_rpc_method(METRICS, LOGGER)
def SetDevice(self, request: Device, context : grpc.ServicerContext) -> DeviceId:
device_uuid = request.device_id.device_uuid.uuid
event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE for i,endpoint in enumerate(request.device_endpoints):
dict_device_id = db_device.dump_id() endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id}) if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
return DeviceId(**dict_device_id) if device_uuid != endpoint_device_uuid:
raise InvalidArgumentException(
'request.device_endpoints[{:d}].device_id.device_uuid.uuid'.format(i), endpoint_device_uuid,
['should be == {:s}({:s})'.format('request.device_id.device_uuid.uuid', device_uuid)])
config_rules = grpc_config_rules_to_raw(request.device_config.config_rules)
running_config_result = self.update_config(device_uuid, 'running', config_rules)
db_running_config = running_config_result[0][0]
config_uuid = db_running_config.config_uuid
new_obj = DeviceModel(**{
'device_uuid' : device_uuid,
'device_type' : request.device_type,
'device_operational_status' : grpc_to_enum__device_operational_status(request.device_operational_status),
'device_config_uuid' : config_uuid,
})
result: Tuple[DeviceModel, bool] = self.database.create_or_update(new_obj)
db_device, updated = result
self.set_drivers(db_device, request.device_drivers)
for i,endpoint in enumerate(request.device_endpoints):
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_device_uuid = endpoint.endpoint_id.device_id.device_uuid.uuid
if len(endpoint_device_uuid) == 0: endpoint_device_uuid = device_uuid
str_endpoint_key = key_to_str([device_uuid, endpoint_uuid])
endpoint_attributes = {
'device_uuid' : db_device.device_uuid,
'endpoint_uuid': endpoint_uuid,
'endpoint_type': endpoint.endpoint_type,
}
endpoint_topology_context_uuid = endpoint.endpoint_id.topology_id.context_id.context_uuid.uuid
endpoint_topology_uuid = endpoint.endpoint_id.topology_id.topology_uuid.uuid
if len(endpoint_topology_context_uuid) > 0 and len(endpoint_topology_uuid) > 0:
str_topology_key = key_to_str([endpoint_topology_context_uuid, endpoint_topology_uuid])
db_topology : TopologyModel = self.database.get_object(TopologyModel, endpoint_topology_uuid)
str_endpoint_key = key_to_str([str_endpoint_key, str_topology_key], separator=':')
endpoint_attributes['topology_uuid'] = db_topology.topology_uuid
new_endpoint = EndPointModel(**endpoint_attributes)
result : Tuple[EndPointModel, bool] = self.database.create_or_update(new_endpoint)
db_endpoint, updated = result
self.set_kpi_sample_types(db_endpoint, endpoint.kpi_sample_types)
# event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
dict_device_id = db_device.dump_id()
# notify_event(self.messagebroker, TOPIC_DEVICE, event_type, {'device_id': dict_device_id})
return DeviceId(**dict_device_id)
def set_kpi_sample_types(self, db_endpoint: EndPointModel, grpc_endpoint_kpi_sample_types):
db_endpoint_pk = db_endpoint.endpoint_uuid
for kpi_sample_type in grpc_endpoint_kpi_sample_types:
orm_kpi_sample_type = grpc_to_enum__kpi_sample_type(kpi_sample_type)
# str_endpoint_kpi_sample_type_key = key_to_str([db_endpoint_pk, orm_kpi_sample_type.name])
data = {'endpoint_uuid': db_endpoint_pk,
'kpi_sample_type': orm_kpi_sample_type.name,
'kpi_uuid': str(uuid.uuid4())}
db_endpoint_kpi_sample_type = KpiSampleTypeModel(**data)
self.database.create(db_endpoint_kpi_sample_type)
def set_drivers(self, db_device: DeviceModel, grpc_device_drivers):
db_device_pk = db_device.device_uuid
for driver in grpc_device_drivers:
orm_driver = grpc_to_enum__device_driver(driver)
str_device_driver_key = key_to_str([db_device_pk, orm_driver.name])
driver_config = {
"driver_uuid": str(uuid.uuid4()),
"device_uuid": db_device_pk,
"driver": orm_driver.name
}
db_device_driver = DriverModel(**driver_config)
db_device_driver.device_fk = db_device
db_device_driver.driver = orm_driver
self.database.create_or_update(db_device_driver)
def update_config(
self, db_parent_pk: str, config_name: str,
raw_config_rules: List[Tuple[ORM_ConfigActionEnum, str, str]]
) -> List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]]:
str_config_key = key_to_str([db_parent_pk, config_name], separator=':')
result = self.database.get_or_create(ConfigModel, db_parent_pk)
db_config, created = result
LOGGER.info('UPDATED-CONFIG: {}'.format(db_config.dump()))
db_objects: List[Tuple[Union[ConfigModel, ConfigRuleModel], bool]] = [(db_config, created)]
for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
if action == ORM_ConfigActionEnum.SET:
result : Tuple[ConfigRuleModel, bool] = self.set_config_rule(
db_config, position, resource_key, resource_value)
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
self.delete_config_rule(db_config, resource_key)
else:
msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
raise AttributeError(
msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))
return db_objects
def set_config_rule(self, db_config: ConfigModel, position: int, resource_key: str, resource_value: str,
): # -> Tuple[ConfigRuleModel, bool]:
from src.context.service.database.Tools import fast_hasher
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.config_uuid, str_rule_key_hash], separator=':')
pk = str(uuid.uuid5(uuid.UUID('9566448d-e950-425e-b2ae-7ead656c7e47'), str_config_rule_key))
data = {'config_rule_uuid': pk, 'config_uuid': db_config.config_uuid, 'position': position,
'action': ORM_ConfigActionEnum.SET, 'key': resource_key, 'value': resource_value}
to_add = ConfigRuleModel(**data)
result, updated = self.database.create_or_update(to_add)
return result, updated
def delete_config_rule(
self, db_config: ConfigModel, resource_key: str
) -> None:
from src.context.service.database.Tools import fast_hasher
str_rule_key_hash = fast_hasher(resource_key)
str_config_rule_key = key_to_str([db_config.pk, str_rule_key_hash], separator=':')
db_config_rule = self.database.get_object(ConfigRuleModel, str_config_rule_key, raise_if_not_found=False)
if db_config_rule is None:
return
db_config_rule.delete()
def delete_all_config_rules(self, db_config: ConfigModel) -> None:
db_config_rule_pks = db_config.references(ConfigRuleModel)
for pk, _ in db_config_rule_pks: ConfigRuleModel(self.database, pk).delete()
"""
for position, (action, resource_key, resource_value) in enumerate(raw_config_rules):
if action == ORM_ConfigActionEnum.SET:
result: Tuple[ConfigRuleModel, bool] = set_config_rule(
database, db_config, position, resource_key, resource_value)
db_config_rule, updated = result
db_objects.append((db_config_rule, updated))
elif action == ORM_ConfigActionEnum.DELETE:
delete_config_rule(database, db_config, resource_key)
else:
msg = 'Unsupported action({:s}) for resource_key({:s})/resource_value({:s})'
raise AttributeError(
msg.format(str(ConfigActionEnum.Name(action)), str(resource_key), str(resource_value)))
return db_objects
"""
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty: def RemoveDevice(self, request: DeviceId, context : grpc.ServicerContext) -> Empty:
...@@ -360,6 +471,9 @@ class ContextServiceServicerImpl(ContextServiceServicer): ...@@ -360,6 +471,9 @@ class ContextServiceServicerImpl(ContextServiceServicer):
yield DeviceEvent(**json.loads(message.content)) yield DeviceEvent(**json.loads(message.content))
"""
# ----- Link ------------------------------------------------------------------------------------------------------- # ----- Link -------------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
......
...@@ -45,12 +45,17 @@ PACKET_PORT_SAMPLE_TYPES = [ ...@@ -45,12 +45,17 @@ PACKET_PORT_SAMPLE_TYPES = [
# ----- Device --------------------------------------------------------------------------------------------------------- # ----- Device ---------------------------------------------------------------------------------------------------------
DEVICE_R1_UUID = 'R1' EP2 = '7eb80584-2587-4e71-b10c-f3a5c48e84ab'
EP3 = '368baf47-0540-4ab4-add8-a19b5167162c'
EP100 = '6a923121-36e1-4b5e-8cd6-90aceca9b5cf'
DEVICE_R1_UUID = 'fe83a200-6ded-47b4-b156-3bb3556a10d6'
DEVICE_R1_ID = json_device_id(DEVICE_R1_UUID) DEVICE_R1_ID = json_device_id(DEVICE_R1_UUID)
DEVICE_R1_EPS = [ DEVICE_R1_EPS = [
json_endpoint(DEVICE_R1_ID, 'EP2', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES), json_endpoint(DEVICE_R1_ID, EP2, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, 'EP3', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES), json_endpoint(DEVICE_R1_ID, EP3, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
json_endpoint(DEVICE_R1_ID, 'EP100', '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES), json_endpoint(DEVICE_R1_ID, EP100, '10G', topology_id=TOPOLOGY_ID, kpi_sample_types=PACKET_PORT_SAMPLE_TYPES),
] ]
DEVICE_R1_RULES = [ DEVICE_R1_RULES = [
json_config_rule_set('dev/rsrc1/value', 'value1'), json_config_rule_set('dev/rsrc1/value', 'value1'),
......
...@@ -20,7 +20,6 @@ from common.Settings import ( ...@@ -20,7 +20,6 @@ from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_baseurl_http, get_service_port_grpc, get_service_port_http) get_service_baseurl_http, get_service_port_grpc, get_service_port_http)
from context.service.Database import Database from context.service.Database import Database
from common.orm.Factory import get_database_backend, BackendEnum as DatabaseBackendEnum
from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum from common.message_broker.Factory import get_messagebroker_backend, BackendEnum as MessageBrokerBackendEnum
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
...@@ -84,7 +83,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]: ...@@ -84,7 +83,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]:
msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...' msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...'
LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings))) LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings)))
db_uri = 'cockroachdb://root@10.152.183.66:26257/defaultdb?sslmode=disable' db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable'
LOGGER.debug('Connecting to DB: {}'.format(db_uri)) LOGGER.debug('Connecting to DB: {}'.format(db_uri))
try: try:
...@@ -95,7 +94,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]: ...@@ -95,7 +94,7 @@ def context_s_mb(request) -> Tuple[Session, MessageBroker]:
return 1 return 1
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
_session = sessionmaker(bind=engine) _session = sessionmaker(bind=engine, expire_on_commit=False)
_message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings))
yield _session, _message_broker yield _session, _message_broker
...@@ -164,7 +163,7 @@ def test_grpc_context( ...@@ -164,7 +163,7 @@ def test_grpc_context(
assert len(response.contexts) == 0 assert len(response.contexts) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------ # ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.query_all(ContextModel) db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries: for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
...@@ -214,7 +213,7 @@ def test_grpc_context( ...@@ -214,7 +213,7 @@ def test_grpc_context(
assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------ # ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.query_all(ContextModel) db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries: # for db_entry in db_entries:
...@@ -252,7 +251,7 @@ def test_grpc_context( ...@@ -252,7 +251,7 @@ def test_grpc_context(
events_collector.stop() events_collector.stop()
# ----- Dump state of database after remove the object ------------------------------------------------------------- # ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.query_all(ContextModel) db_entries = database.get_all(ContextModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries: # for db_entry in db_entries:
...@@ -295,7 +294,7 @@ def test_grpc_topology( ...@@ -295,7 +294,7 @@ def test_grpc_topology(
assert len(response.topologies) == 0 assert len(response.topologies) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------ # ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = database.query_all(TopologyModel) db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries: # for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
...@@ -337,7 +336,7 @@ def test_grpc_topology( ...@@ -337,7 +336,7 @@ def test_grpc_topology(
# assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # assert event.topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------ # ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = database.query_all(TopologyModel) db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries: # for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
...@@ -384,22 +383,22 @@ def test_grpc_topology( ...@@ -384,22 +383,22 @@ def test_grpc_topology(
# events_collector.stop() # events_collector.stop()
# ----- Dump state of database after remove the object ------------------------------------------------------------- # ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = database.query_all(TopologyModel) db_entries = database.get_all(TopologyModel)
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
# for db_entry in db_entries: # for db_entry in db_entries:
# LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------') LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0 assert len(db_entries) == 0
"""
def test_grpc_device( def test_grpc_device(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name
context_database = context_db_mb[0] session = context_s_mb[0]
database = Database(session)
# ----- Clean the database ----------------------------------------------------------------------------------------- # ----- Clean the database -----------------------------------------------------------------------------------------
context_database.clear_all() database.clear()
# ----- Initialize the EventsCollector ----------------------------------------------------------------------------- # ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(context_client_grpc) events_collector = EventsCollector(context_client_grpc)
...@@ -438,49 +437,49 @@ def test_grpc_device( ...@@ -438,49 +437,49 @@ def test_grpc_device(
assert len(response.devices) == 0 assert len(response.devices) == 0
# ----- Dump state of database before create the object ------------------------------------------------------------ # ----- Dump state of database before create the object ------------------------------------------------------------
db_entries = context_database.dump() db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries: # for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------') LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 5 assert len(db_entries) == 2
# ----- Create the object ------------------------------------------------------------------------------------------ # ----- Create the object ------------------------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e: with pytest.raises(grpc.RpcError) as e:
WRONG_DEVICE = copy.deepcopy(DEVICE_R1) WRONG_DEVICE = copy.deepcopy(DEVICE_R1)
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = 'wrong-device-uuid' WRONG_DEVICE_UUID = '3f03c76d-31fb-47f5-9c1d-bc6b6bfa2d08'
WRONG_DEVICE['device_endpoints'][0]['endpoint_id']['device_id']['device_uuid']['uuid'] = WRONG_DEVICE_UUID
context_client_grpc.SetDevice(Device(**WRONG_DEVICE)) context_client_grpc.SetDevice(Device(**WRONG_DEVICE))
assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT
msg = 'request.device_endpoints[0].device_id.device_uuid.uuid(wrong-device-uuid) is invalid; '\ msg = 'request.device_endpoints[0].device_id.device_uuid.uuid({}) is invalid; '\
'should be == request.device_id.device_uuid.uuid({:s})'.format(DEVICE_R1_UUID) 'should be == request.device_id.device_uuid.uuid({})'.format(WRONG_DEVICE_UUID, DEVICE_R1_UUID)
assert e.value.details() == msg assert e.value.details() == msg
response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check create event ----------------------------------------------------------------------------------------- # ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True) # event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent) # assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE # assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID # assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Update the object ------------------------------------------------------------------------------------------ # ----- Update the object ------------------------------------------------------------------------------------------
response = context_client_grpc.SetDevice(Device(**DEVICE_R1)) response = context_client_grpc.SetDevice(Device(**DEVICE_R1))
assert response.device_uuid.uuid == DEVICE_R1_UUID assert response.device_uuid.uuid == DEVICE_R1_UUID
# ----- Check update event ----------------------------------------------------------------------------------------- # ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True) # event = events_collector.get_event(block=True)
assert isinstance(event, DeviceEvent) # assert isinstance(event, DeviceEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE # assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID # assert event.device_id.device_uuid.uuid == DEVICE_R1_UUID
# ----- Dump state of database after create/update the object ------------------------------------------------------ # ----- Dump state of database after create/update the object ------------------------------------------------------
db_entries = context_database.dump() db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries: # for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------') LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 40 assert len(db_entries) == 36
# ----- Get when the object exists --------------------------------------------------------------------------------- # ----- Get when the object exists ---------------------------------------------------------------------------------
response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID)) response = context_client_grpc.GetDevice(DeviceId(**DEVICE_R1_ID))
...@@ -513,11 +512,11 @@ def test_grpc_device( ...@@ -513,11 +512,11 @@ def test_grpc_device(
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check update event ----------------------------------------------------------------------------------------- # ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True) # event = events_collector.get_event(block=True)
assert isinstance(event, TopologyEvent) # assert isinstance(event, TopologyEvent)
assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE # assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # assert response.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
# ----- Check relation was created --------------------------------------------------------------------------------- # ----- Check relation was created ---------------------------------------------------------------------------------
response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID)) response = context_client_grpc.GetTopology(TopologyId(**TOPOLOGY_ID))
...@@ -528,12 +527,12 @@ def test_grpc_device( ...@@ -528,12 +527,12 @@ def test_grpc_device(
assert len(response.link_ids) == 0 assert len(response.link_ids) == 0
# ----- Dump state of database after creating the object relation -------------------------------------------------- # ----- Dump state of database after creating the object relation --------------------------------------------------
db_entries = context_database.dump() db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries: # for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------') LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 40 assert len(db_entries) == 33
# ----- Remove the object ------------------------------------------------------------------------------------------ # ----- Remove the object ------------------------------------------------------------------------------------------
context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID)) context_client_grpc.RemoveDevice(DeviceId(**DEVICE_R1_ID))
...@@ -541,33 +540,33 @@ def test_grpc_device( ...@@ -541,33 +540,33 @@ def test_grpc_device(
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event ----------------------------------------------------------------------------------------- # ----- Check remove event -----------------------------------------------------------------------------------------
events = events_collector.get_events(block=True, count=3) # events = events_collector.get_events(block=True, count=3)
assert isinstance(events[0], DeviceEvent) # assert isinstance(events[0], DeviceEvent)
assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE # assert events[0].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[0].device_id.device_uuid.uuid == DEVICE_R1_UUID # assert events[0].device_id.device_uuid.uuid == DEVICE_R1_UUID
assert isinstance(events[1], TopologyEvent) # assert isinstance(events[1], TopologyEvent)
assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE # assert events[1].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # assert events[1].topology_id.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID # assert events[1].topology_id.topology_uuid.uuid == DEFAULT_TOPOLOGY_UUID
assert isinstance(events[2], ContextEvent) # assert isinstance(events[2], ContextEvent)
assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE # assert events[2].event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID # assert events[2].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Stop the EventsCollector ----------------------------------------------------------------------------------- # ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop() # events_collector.stop()
# ----- Dump state of database after remove the object ------------------------------------------------------------- # ----- Dump state of database after remove the object -------------------------------------------------------------
db_entries = context_database.dump() db_entries = database.dump_all()
LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries)))
for db_entry in db_entries: # for db_entry in db_entries:
LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover
LOGGER.info('-----------------------------------------------------------') LOGGER.info('-----------------------------------------------------------')
assert len(db_entries) == 0 assert len(db_entries) == 0
"""
def test_grpc_link( def test_grpc_link(
context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name
context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name
......