diff --git a/scripts/run_tests_locally-context.sh b/scripts/run_tests_locally-context.sh index 7033fcb01a468731b498708096a80fac8d9a9a85..bf0cccd6b1ba3597595331b52a522da61698caf3 100755 --- a/scripts/run_tests_locally-context.sh +++ b/scripts/run_tests_locally-context.sh @@ -20,7 +20,7 @@ # If not already set, set the name of the Kubernetes namespace to deploy to. export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} -export TFS_K8S_HOSTNAME="tfs-vm" +#export TFS_K8S_HOSTNAME="tfs-vm" ######################################################################################################################## # Automated steps start here @@ -29,15 +29,21 @@ export TFS_K8S_HOSTNAME="tfs-vm" PROJECTDIR=`pwd` cd $PROJECTDIR/src -RCFILE=$PROJECTDIR/coverage/.coveragerc +#RCFILE=$PROJECTDIR/coverage/.coveragerc -kubectl --namespace $TFS_K8S_NAMESPACE expose deployment contextservice --name=redis-tests --port=6379 --type=NodePort +#kubectl --namespace $TFS_K8S_NAMESPACE expose deployment contextservice --name=redis-tests --port=6379 --type=NodePort #export REDIS_SERVICE_HOST=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.clusterIP}') -export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}') -export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}') +#export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}') +#export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}') + +export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs?sslmode=require" # Run unitary tests and analyze coverage of code at same time -coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \ +#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \ +# context/tests/test_unitary.py + +source tfs_runtime_env_vars.sh +pytest --log-level=INFO --verbose -o log_cli=true --maxfail=1 \ context/tests/test_unitary.py -kubectl --namespace $TFS_K8S_NAMESPACE delete service redis-tests +#kubectl --namespace $TFS_K8S_NAMESPACE delete service redis-tests diff --git a/src/context/client/EventsCollector.py b/src/context/client/EventsCollector.py index f5fc3fbc735c2f62b39223b9ed20aa3730ecd11d..9ad6e101b5130d6bbb1e6b33ba926dc4c0c128b0 100644 --- a/src/context/client/EventsCollector.py +++ b/src/context/client/EventsCollector.py @@ -132,7 +132,7 @@ class EventsCollector: if event is None: break events.append(event) else: - for _ in range(count): + while len(events) < count: if self._terminate.is_set(): break event = self.get_event(block=block, timeout=timeout) if event is None: continue diff --git a/src/context/service/ChangeFeedClient.py b/src/context/service/ChangeFeedClient.py new file mode 100644 index 0000000000000000000000000000000000000000..8285dc6c32f2cc2476f157941aa3cfd9318f2afe --- /dev/null +++ b/src/context/service/ChangeFeedClient.py @@ -0,0 +1,87 @@ +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pip install psycopg==3.1.6 +# Ref: https://www.cockroachlabs.com/docs/stable/changefeed-for.html +# (current implementation) Ref: https://www.cockroachlabs.com/docs/v22.1/changefeed-for +# Ref: https://www.psycopg.org/psycopg3/docs/api/crdb.html + +import contextlib, json, logging, psycopg, psycopg.conninfo, psycopg.crdb, sys, time +from typing import Any, Dict, Iterator, List, Optional, Tuple +from common.Settings import get_setting + +LOGGER = logging.getLogger(__name__) + +SQL_ACTIVATE_CHANGE_FEED = 'SET CLUSTER SETTING kv.rangefeed.enabled = true' +SQL_START_CHANGE_FEED = 'EXPERIMENTAL CHANGEFEED FOR {:s}.{:s} WITH format=json, no_initial_scan, updated' + +class ChangeFeedClient: + def __init__(self) -> None: + self._connection : Optional[psycopg.crdb.CrdbConnection] = None + self._conn_info_dict : Dict = dict() + self._is_crdb : bool = False + + def initialize(self) -> bool: + crdb_uri = get_setting('CRDB_URI') + if crdb_uri is None: + LOGGER.error('Connection string not found in EnvVar CRDB_URI') + return False + + try: + crdb_uri = crdb_uri.replace('cockroachdb://', 'postgres://') + self._conn_info_dict = psycopg.conninfo.conninfo_to_dict(crdb_uri) + except psycopg.ProgrammingError: + LOGGER.exception('Invalid connection string: {:s}'.format(str(crdb_uri))) + return False + + self._connection = psycopg.crdb.connect(**self._conn_info_dict) + self._is_crdb = psycopg.crdb.CrdbConnection.is_crdb(self._connection) + LOGGER.debug('is_crdb = {:s}'.format(str(self._is_crdb))) + + # disable multi-statement transactions + self._connection.autocommit = True + + # activate change feeds + self._connection.execute(SQL_ACTIVATE_CHANGE_FEED) + + return self._is_crdb + + def get_changes(self, table_name : str) -> Iterator[Tuple[float, str, List[Any], bool, Dict]]: + db_name = self._conn_info_dict.get('dbname') + if db_name is None: raise Exception('ChangeFeed has not been initialized!') + cur = self._connection.cursor() + str_sql_query = SQL_START_CHANGE_FEED.format(db_name, table_name) + with contextlib.closing(cur.stream(str_sql_query)) as feed: + for change in feed: + LOGGER.info(change) + table_name, primary_key, data = change[0], json.loads(change[1]), json.loads(change[2]) + timestamp = data.get('updated') / 1.e9 + if timestamp is None: timestamp = time.time() + after = data.get('after') + is_delete = ('after' in data) and (after is None) + yield timestamp, table_name, primary_key, is_delete, after + +def main(): + logging.basicConfig(level=logging.INFO) + + cf = ChangeFeed() + ready = cf.initialize() + if not ready: raise Exception('Unable to initialize ChangeFeed') + for change in cf.get_changes('context'): + LOGGER.info(change) + + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/context/service/ContextServiceServicerImpl.py b/src/context/service/ContextServiceServicerImpl.py index b5725f00741a2d24575cd77b210aa41b4343287e..fcb0024d294a0d27ec528fca3bb89f5fc124927b 100644 --- a/src/context/service/ContextServiceServicerImpl.py +++ b/src/context/service/ContextServiceServicerImpl.py @@ -13,13 +13,13 @@ # limitations under the License. -import grpc, json, logging, operator, sqlalchemy, threading, uuid +import grpc, json, logging, operator, sqlalchemy, threading, time, uuid from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker -from sqlalchemy.dialects.postgresql import UUID, insert +#from sqlalchemy.dialects.postgresql import UUID, insert from sqlalchemy_cockroachdb import run_transaction from typing import Dict, Iterator, List, Optional, Set, Tuple, Union from common.message_broker.MessageBroker import MessageBroker -from common.orm.backend.Tools import key_to_str +#from common.orm.backend.Tools import key_to_str from common.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, @@ -30,36 +30,39 @@ from common.proto.context_pb2 import ( Slice, SliceEvent, SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList, ConfigActionEnum, Constraint) -from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule +#from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule from common.proto.context_pb2_grpc import ContextServiceServicer from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer +from common.tools.object_factory.Context import json_context_id from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException -from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string -from context.service.Database import Database -from context.service.database.ConfigModel import ( - ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config) -from context.service.database.ConnectionModel import ConnectionModel, set_path -from context.service.database.ConstraintModel import ( - ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints) +from common.rpc_method_wrapper.ServiceExceptions import ( + InvalidArgumentException, NotFoundException, OperationFailedException) +#from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string +#from context.service.Database import Database +#from context.service.database.ConfigModel import ( +# ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config) +#from context.service.database.ConnectionModel import ConnectionModel, set_path +#from context.service.database.ConstraintModel import ( +# ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints) from context.service.database.ContextModel import ContextModel -from context.service.database.DeviceModel import ( - DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel) -from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types -from context.service.database.Events import notify_event -from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type -from context.service.database.LinkModel import LinkModel -from context.service.database.PolicyRuleModel import PolicyRuleModel -from context.service.database.RelationModels import ( - ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, - SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel) -from context.service.database.ServiceModel import ( - ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) -from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status -from context.service.database.TopologyModel import TopologyModel -from .Constants import ( - CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, - TOPIC_TOPOLOGY) +#from context.service.database.DeviceModel import ( +# DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel) +#from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types +#from context.service.database.Events import notify_event +#from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type +#from context.service.database.LinkModel import LinkModel +#from context.service.database.PolicyRuleModel import PolicyRuleModel +#from context.service.database.RelationModels import ( +# ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel, +# SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel) +#from context.service.database.ServiceModel import ( +# ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type) +#from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status +#from context.service.database.TopologyModel import TopologyModel +#from .Constants import ( +# CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE, +# TOPIC_TOPOLOGY) +from .ChangeFeedClient import ChangeFeedClient LOGGER = logging.getLogger(__name__) @@ -106,7 +109,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS, LOGGER) def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context: - context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid)) + context_uuid = request.context_uuid.uuid def callback(session : Session) -> Optional[Dict]: obj : Optional[ContextModel] = \ session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none() @@ -117,8 +120,8 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS, LOGGER) def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId: - context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_id.context_uuid.uuid)) - context_name = request.context_id.context_uuid.uuid + context_uuid = request.context_id.context_uuid.uuid + context_name = request.name for i, topology_id in enumerate(request.topology_ids): topology_context_uuid = topology_id.context_id.context_uuid.uuid @@ -134,15 +137,24 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer 'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid, ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) + for i, slice_id in enumerate(request.slice_ids): + slice_context_uuid = slice_id.context_id.context_uuid.uuid + if slice_context_uuid != context_uuid: + raise InvalidArgumentException( + 'request.slice_ids[{:d}].context_id.context_uuid.uuid'.format(i), slice_context_uuid, + ['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)]) + def callback(session : Session) -> Tuple[Optional[Dict], bool]: obj : Optional[ContextModel] = \ session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none() - updated = obj is not None - obj = ContextModel(context_uuid=context_uuid, context_name=context_name) - session.merge(obj) - session.commit() + is_update = obj is not None + if is_update: + obj.context_name = context_name + session.merge(obj) + else: + session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time())) obj = session.get(ContextModel, {'context_uuid': context_uuid}) - return (None if obj is None else obj.dump_id()), updated + return (None if obj is None else obj.dump_id()), is_update obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback) if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid) @@ -153,7 +165,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS, LOGGER) def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty: - context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid)) + context_uuid = request.context_uuid.uuid def callback(session : Session) -> bool: num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete() @@ -164,11 +176,24 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer # notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request}) return Empty() -# @safe_and_metered_rpc_method(METRICS, LOGGER) -# def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: -# for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): -# yield ContextEvent(**json.loads(message.content)) - + @safe_and_metered_rpc_method(METRICS, LOGGER) + def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]: + #for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT): + # yield ContextEvent(**json.loads(message.content)) + cf = ChangeFeedClient() + ready = cf.initialize() + if not ready: raise OperationFailedException('Initialize ChangeFeed') + for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'): + if is_delete: + event_type = EventTypeEnum.EVENTTYPE_REMOVE + else: + is_create = (timestamp - after.get('created_at')) < 1.0 + event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE + event = { + 'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type}, + 'context_id': json_context_id(primary_key[0]), + } + yield ContextEvent(**event) # ----- Topology --------------------------------------------------------------------------------------------------- diff --git a/src/context/service/Database.py b/src/context/service/Database.py index 8aa5682390e79146f3cec661af645ac277534c88..03598a97fb5df6b1cb6951449d8df2d3a360e0ff 100644 --- a/src/context/service/Database.py +++ b/src/context/service/Database.py @@ -1,16 +1,13 @@ -from typing import Tuple, List - -from sqlalchemy import MetaData -from sqlalchemy.orm import Session, joinedload -from context.service.database._Base import Base import logging -from common.orm.backend.Tools import key_to_str - +from sqlalchemy import MetaData +from sqlalchemy.orm import Session #, joinedload +from typing import Tuple #, List +from context.service.database._Base import _Base +#from common.orm.backend.Tools import key_to_str from common.rpc_method_wrapper.ServiceExceptions import NotFoundException LOGGER = logging.getLogger(__name__) - class Database(Session): def __init__(self, session): super().__init__() @@ -62,8 +59,8 @@ class Database(Session): def clear(self): with self.session() as session: engine = session.get_bind() - Base.metadata.drop_all(engine) - Base.metadata.create_all(engine) + _Base.metadata.drop_all(engine) + _Base.metadata.create_all(engine) def dump_by_table(self): with self.session() as session: @@ -90,7 +87,7 @@ class Database(Session): return result - def get_object(self, model_class: Base, main_key: str, raise_if_not_found=False): + def get_object(self, model_class: _Base, main_key: str, raise_if_not_found=False): filt = {model_class.main_pk_name(): main_key} with self.session() as session: get = session.query(model_class).filter_by(**filt).one_or_none() @@ -104,7 +101,7 @@ class Database(Session): dump = get.dump() return get, dump - def get_object_filter(self, model_class: Base, filt, raise_if_not_found=False): + def get_object_filter(self, model_class: _Base, filt, raise_if_not_found=False): with self.session() as session: get = session.query(model_class).filter_by(**filt).all() @@ -119,7 +116,7 @@ class Database(Session): return get, get.dump() - def get_or_create(self, model_class: Base, key_parts: str, filt=None) -> Tuple[Base, bool]: + def get_or_create(self, model_class: _Base, key_parts: str, filt=None) -> Tuple[_Base, bool]: if not filt: filt = {model_class.main_pk_name(): key_parts} with self.session() as session: diff --git a/src/context/service/Engine.py b/src/context/service/Engine.py index 7944d86012d6d7bd76539aee6dc3b282c718fd03..08e1e4f93f6a67a3fe43b8f672613ab78ffc0c81 100644 --- a/src/context/service/Engine.py +++ b/src/context/service/Engine.py @@ -21,20 +21,20 @@ APP_NAME = 'tfs' class Engine: def get_engine(self) -> sqlalchemy.engine.Engine: - ccdb_url = get_setting('CCDB_URL') + crdb_uri = get_setting('CRDB_URI') try: engine = sqlalchemy.create_engine( - ccdb_url, connect_args={'application_name': APP_NAME}, echo=False, future=True) + crdb_uri, connect_args={'application_name': APP_NAME}, echo=False, future=True) except: # pylint: disable=bare-except - LOGGER.exception('Failed to connect to database: {:s}'.format(ccdb_url)) + LOGGER.exception('Failed to connect to database: {:s}'.format(crdb_uri)) return None try: if not sqlalchemy_utils.database_exists(engine.url): sqlalchemy_utils.create_database(engine.url) except: # pylint: disable=bare-except - LOGGER.exception('Failed to check/create to database: {:s}'.format(ccdb_url)) + LOGGER.exception('Failed to check/create to database: {:s}'.format(crdb_uri)) return None return engine diff --git a/src/context/service/database/ConfigModel.py b/src/context/service/database/ConfigModel.py index 5f71119819a8f72cbb994c2b19f0bb5cbde57da4..d36622e765b6011c0ac49ef382888438e9139979 100644 --- a/src/context/service/database/ConfigModel.py +++ b/src/context/service/database/ConfigModel.py @@ -19,7 +19,7 @@ from common.proto.context_pb2 import ConfigActionEnum from common.tools.grpc.Tools import grpc_message_to_json_string from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String from sqlalchemy.dialects.postgresql import UUID, ARRAY -from context.service.database._Base import Base +from context.service.database._Base import _Base from sqlalchemy.orm import relationship from context.service.Database import Database diff --git a/src/context/service/database/ContextModel.py b/src/context/service/database/ContextModel.py index 46f0741e5ce05e3489c36da1fe9a1cd448a075f2..9ad5e0bcb25a227005affb902eb5517666381a87 100644 --- a/src/context/service/database/ContextModel.py +++ b/src/context/service/database/ContextModel.py @@ -14,7 +14,7 @@ import logging from typing import Dict -from sqlalchemy import Column, String +from sqlalchemy import Column, Float, String from sqlalchemy.dialects.postgresql import UUID from ._Base import _Base #from sqlalchemy.orm import relationship @@ -25,6 +25,7 @@ class ContextModel(_Base): __tablename__ = 'context' context_uuid = Column(UUID(as_uuid=False), primary_key=True) context_name = Column(String(), nullable=False) + created_at = Column(Float) #topology = relationship('TopologyModel', back_populates='context') diff --git a/src/context/service/database/__init__.py b/src/context/service/database/__init__.py index 27b5f5dd22d6a16809e219ebaa6526d249e5c2a8..9802657861aa8d2cf99a1e5c64daefbacf587b92 100644 --- a/src/context/service/database/__init__.py +++ b/src/context/service/database/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from ._Base import _Base, rebuild_database +from .ContextModel import ContextModel diff --git a/src/context/tests/test_unitary.py b/src/context/tests/test_unitary.py index aaa8c7fbd0cad3015c911a77d925c215cf2c61fe..8bf1b4ff1cb322ac416e5520d9e7cf156472d490 100644 --- a/src/context/tests/test_unitary.py +++ b/src/context/tests/test_unitary.py @@ -13,7 +13,7 @@ # limitations under the License. # pylint: disable=too-many-lines -import copy, grpc, logging, os, pytest, requests, time, urllib +import copy, grpc, logging, os, pytest, requests, sqlalchemy, time, urllib, uuid from typing import Tuple from common.Constants import DEFAULT_CONTEXT_UUID, DEFAULT_TOPOLOGY_UUID, ServiceNameEnum from common.Settings import ( @@ -27,6 +27,10 @@ from common.proto.context_pb2 import ( DeviceOperationalStatusEnum, Empty, EventTypeEnum, Link, LinkEvent, LinkId, Service, ServiceEvent, ServiceId, ServiceStatusEnum, ServiceTypeEnum, Topology, TopologyEvent, TopologyId) from common.proto.policy_pb2 import (PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule) +from common.tools.object_factory.Context import json_context, json_context_id +from common.tools.object_factory.Service import json_service_id +from common.tools.object_factory.Slice import json_slice_id +from common.tools.object_factory.Topology import json_topology_id from common.type_checkers.Assertions import ( validate_connection, validate_connection_ids, validate_connections, validate_context, validate_context_ids, validate_contexts, validate_device, validate_device_ids, validate_devices, validate_link, validate_link_ids, @@ -36,14 +40,17 @@ from context.client.ContextClient import ContextClient from context.client.EventsCollector import EventsCollector from context.service.database.Tools import ( FASTHASHER_DATA_ACCEPTED_FORMAT, FASTHASHER_ITEM_ACCEPTED_FORMAT, fast_hasher) -from context.service.grpc_server.ContextService import ContextService -from context.service._old_code.Populate import populate -from context.service.rest_server.RestServer import RestServer -from context.service.rest_server.Resources import RESOURCES +from context.service.ContextService import ContextService +#from context.service._old_code.Populate import populate +#from context.service.rest_server.RestServer import RestServer +#from context.service.rest_server.Resources import RESOURCES from requests import Session from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from context.service.database._Base import Base +from context.service.database._Base import _Base +from common.Settings import get_setting +from context.service.Engine import Engine +from context.service.database._Base import rebuild_database from .Objects import ( CONNECTION_R1_R3, CONNECTION_R1_R3_ID, CONNECTION_R1_R3_UUID, CONTEXT, CONTEXT_ID, DEVICE_R1, DEVICE_R1_ID, @@ -63,90 +70,86 @@ os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT) os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT) -DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST -DEFAULT_REDIS_SERVICE_PORT = 6379 -DEFAULT_REDIS_DATABASE_ID = 0 +#DEFAULT_REDIS_SERVICE_HOST = LOCAL_HOST +#DEFAULT_REDIS_SERVICE_PORT = 6379 +#DEFAULT_REDIS_DATABASE_ID = 0 -REDIS_CONFIG = { - 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), - 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), - 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), -} +#REDIS_CONFIG = { +# 'REDIS_SERVICE_HOST': os.environ.get('REDIS_SERVICE_HOST', DEFAULT_REDIS_SERVICE_HOST), +# 'REDIS_SERVICE_PORT': os.environ.get('REDIS_SERVICE_PORT', DEFAULT_REDIS_SERVICE_PORT), +# 'REDIS_DATABASE_ID' : os.environ.get('REDIS_DATABASE_ID', DEFAULT_REDIS_DATABASE_ID ), +#} -SCENARIOS = [ - ('all_sqlalchemy', {}, MessageBrokerBackendEnum.INMEMORY, {} ), - ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ) +#SCENARIOS = [ +# ('db:cockroach_mb:inmemory', None, {}, None, {}), +# ('all_inmemory', DatabaseBackendEnum.INMEMORY, {}, MessageBrokerBackendEnum.INMEMORY, {} ) # ('all_redis', DatabaseBackendEnum.REDIS, REDIS_CONFIG, MessageBrokerBackendEnum.REDIS, REDIS_CONFIG), -] +#] -@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) -def context_s_mb(request) -> Tuple[Session, MessageBroker]: - name,db_session,mb_backend,mb_settings = request.param - msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...' - LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings))) - - db_uri = 'cockroachdb://root@10.152.183.111:26257/defaultdb?sslmode=disable' - LOGGER.debug('Connecting to DB: {}'.format(db_uri)) - - try: - engine = create_engine(db_uri) - except Exception as e: - LOGGER.error("Failed to connect to database.") - LOGGER.error(f"{e}") - return 1 +#@pytest.fixture(scope='session', ids=[str(scenario[0]) for scenario in SCENARIOS], params=SCENARIOS) +@pytest.fixture(scope='session') +def context_db_mb(request) -> Tuple[Session, MessageBroker]: + #name,db_session,mb_backend,mb_settings = request.param + #msg = 'Running scenario {:s} db_session={:s}, mb_backend={:s}, mb_settings={:s}...' + #LOGGER.info(msg.format(str(name), str(db_session), str(mb_backend.value), str(mb_settings))) - Base.metadata.create_all(engine) - _session = sessionmaker(bind=engine, expire_on_commit=False) + _db_engine = Engine().get_engine() - _message_broker = MessageBroker(get_messagebroker_backend(backend=mb_backend, **mb_settings)) - yield _session, _message_broker - _message_broker.terminate() + _msg_broker = MessageBroker(get_messagebroker_backend(backend=MessageBrokerBackendEnum.INMEMORY)) + yield _db_engine, _msg_broker + _msg_broker.terminate() @pytest.fixture(scope='session') -def context_service_grpc(context_s_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - _service = ContextService(context_s_mb[0], context_s_mb[1]) +def context_service_grpc(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + _service = ContextService(context_db_mb[0], context_db_mb[1]) _service.start() yield _service _service.stop() -@pytest.fixture(scope='session') -def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - database = context_db_mb[0] - _rest_server = RestServer() - for endpoint_name, resource_class, resource_url in RESOURCES: - _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,)) - _rest_server.start() - time.sleep(1) # bring time for the server to start - yield _rest_server - _rest_server.shutdown() - _rest_server.join() + +#@pytest.fixture(scope='session') +#def context_service_rest(context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name +# database = context_db_mb[0] +# _rest_server = RestServer() +# for endpoint_name, resource_class, resource_url in RESOURCES: +# _rest_server.add_resource(resource_class, resource_url, endpoint=endpoint_name, resource_class_args=(database,)) +# _rest_server.start() +# time.sleep(1) # bring time for the server to start +# yield _rest_server +# _rest_server.shutdown() +# _rest_server.join() + @pytest.fixture(scope='session') def context_client_grpc(context_service_grpc : ContextService): # pylint: disable=redefined-outer-name _client = ContextClient() yield _client _client.close() -""" -def do_rest_request(url : str): - base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT) - request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url) - LOGGER.warning('Request: GET {:s}'.format(str(request_url))) - reply = requests.get(request_url) - LOGGER.warning('Reply: {:s}'.format(str(reply.text))) - assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code) - return reply.json() -""" -"""# ----- Test gRPC methods ---------------------------------------------------------------------------------------------- -def test_grpc_context( - context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name - context_s_mb : Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - Session = context_s_mb[0] +#def do_rest_request(url : str): +# base_url = get_service_baseurl_http(ServiceNameEnum.CONTEXT) +# request_url = 'http://{:s}:{:s}{:s}{:s}'.format(str(LOCAL_HOST), str(HTTP_PORT), str(base_url), url) +# LOGGER.warning('Request: GET {:s}'.format(str(request_url))) +# reply = requests.get(request_url) +# LOGGER.warning('Reply: {:s}'.format(str(reply.text))) +# assert reply.status_code == 200, 'Reply failed with code {}'.format(reply.status_code) +# return reply.json() - database = Database(Session) +# ----- Test gRPC methods ---------------------------------------------------------------------------------------------- + +def test_grpc_context( + context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name + context_db_mb : Tuple[sqlalchemy.engine.Engine, MessageBroker] # pylint: disable=redefined-outer-name +) -> None: + db_engine = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- - database.clear() + rebuild_database(db_engine, drop_if_exists=True) + # ----- Initialize the EventsCollector ----------------------------------------------------------------------------- - events_collector = EventsCollector(context_client_grpc) + events_collector = EventsCollector( + context_client_grpc, log_events_received=True, + activate_context_collector = True, activate_topology_collector = False, activate_device_collector = False, + activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False, + activate_connection_collector = False) events_collector.start() # ----- Get when the object does not exist ------------------------------------------------------------------------- @@ -163,71 +166,95 @@ def test_grpc_context( assert len(response.contexts) == 0 # ----- Dump state of database before create the object ------------------------------------------------------------ - db_entries = database.dump_all() - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 0 + #db_entries = database.dump_all() + #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + #for db_entry in db_entries: + # LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover + #LOGGER.info('-----------------------------------------------------------') + #assert len(db_entries) == 0 # ----- Create the object ------------------------------------------------------------------------------------------ response = context_client_grpc.SetContext(Context(**CONTEXT)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID - wrong_uuid = 'c97c4185-e1d1-4ea7-b6b9-afbf76cb61f4' + wrong_context_uuid = str(uuid.uuid4()) + wrong_context_id = json_context_id(wrong_context_uuid) with pytest.raises(grpc.RpcError) as e: - WRONG_TOPOLOGY_ID = copy.deepcopy(TOPOLOGY_ID) - WRONG_TOPOLOGY_ID['context_id']['context_uuid']['uuid'] = wrong_uuid WRONG_CONTEXT = copy.deepcopy(CONTEXT) - WRONG_CONTEXT['topology_ids'].append(WRONG_TOPOLOGY_ID) + WRONG_CONTEXT['topology_ids'].append(json_topology_id(str(uuid.uuid4()), context_id=wrong_context_id)) context_client_grpc.SetContext(Context(**WRONG_CONTEXT)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'request.topology_ids[0].context_id.context_uuid.uuid({}) is invalid; '\ - 'should be == request.context_id.context_uuid.uuid({})'.format(wrong_uuid, DEFAULT_CONTEXT_UUID) + 'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID) assert e.value.details() == msg with pytest.raises(grpc.RpcError) as e: - WRONG_SERVICE_ID = copy.deepcopy(SERVICE_R1_R2_ID) - WRONG_SERVICE_ID['context_id']['context_uuid']['uuid'] = wrong_uuid WRONG_CONTEXT = copy.deepcopy(CONTEXT) - WRONG_CONTEXT['service_ids'].append(WRONG_SERVICE_ID) + WRONG_CONTEXT['service_ids'].append(json_service_id(str(uuid.uuid4()), context_id=wrong_context_id)) context_client_grpc.SetContext(Context(**WRONG_CONTEXT)) assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT msg = 'request.service_ids[0].context_id.context_uuid.uuid({}) is invalid; '\ - 'should be == request.context_id.context_uuid.uuid({})'.format(wrong_uuid, DEFAULT_CONTEXT_UUID) + 'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID) + assert e.value.details() == msg + + with pytest.raises(grpc.RpcError) as e: + WRONG_CONTEXT = copy.deepcopy(CONTEXT) + WRONG_CONTEXT['slice_ids'].append(json_slice_id(str(uuid.uuid4()), context_id=wrong_context_id)) + context_client_grpc.SetContext(Context(**WRONG_CONTEXT)) + assert e.value.code() == grpc.StatusCode.INVALID_ARGUMENT + msg = 'request.slice_ids[0].context_id.context_uuid.uuid({}) is invalid; '\ + 'should be == request.context_id.context_uuid.uuid({})'.format(wrong_context_uuid, DEFAULT_CONTEXT_UUID) assert e.value.details() == msg # ----- Check create event ----------------------------------------------------------------------------------------- - event = events_collector.get_event(block=True) + event = events_collector.get_event(block=True, timeout=10.0) assert isinstance(event, ContextEvent) - assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE - assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE + #assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + + # ----- Get when the object exists --------------------------------------------------------------------------------- + response = context_client_grpc.GetContext(ContextId(**CONTEXT_ID)) + assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.name == '' + assert len(response.topology_ids) == 0 + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + + # ----- List when the object exists -------------------------------------------------------------------------------- + response = context_client_grpc.ListContextIds(Empty()) + assert len(response.context_ids) == 1 + assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID + + response = context_client_grpc.ListContexts(Empty()) + assert len(response.contexts) == 1 + assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.contexts[0].name == '' + assert len(response.contexts[0].topology_ids) == 0 + assert len(response.contexts[0].service_ids) == 0 + assert len(response.contexts[0].slice_ids) == 0 + # ----- Update the object ------------------------------------------------------------------------------------------ - response = context_client_grpc.SetContext(Context(**CONTEXT)) + new_context_name = 'new' + CONTEXT_WITH_NAME = copy.deepcopy(CONTEXT) + CONTEXT_WITH_NAME['name'] = new_context_name + response = context_client_grpc.SetContext(Context(**CONTEXT_WITH_NAME)) assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID # ----- Check update event ----------------------------------------------------------------------------------------- - event = events_collector.get_event(block=True) + event = events_collector.get_event(block=True, timeout=10.0) assert isinstance(event, ContextEvent) - assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE - assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - - # ----- Dump state of database after create/update the object ------------------------------------------------------ - db_entries = database.dump_all() - - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 1 + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE + #assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID - # ----- Get when the object exists --------------------------------------------------------------------------------- + # ----- Get when the object is modified ---------------------------------------------------------------------------- response = context_client_grpc.GetContext(ContextId(**CONTEXT_ID)) assert response.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.name == new_context_name assert len(response.topology_ids) == 0 assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 - # ----- List when the object exists -------------------------------------------------------------------------------- + # ----- List when the object is modified --------------------------------------------------------------------------- response = context_client_grpc.ListContextIds(Empty()) assert len(response.context_ids) == 1 assert response.context_ids[0].context_uuid.uuid == DEFAULT_CONTEXT_UUID @@ -235,35 +262,53 @@ def test_grpc_context( response = context_client_grpc.ListContexts(Empty()) assert len(response.contexts) == 1 assert response.contexts[0].context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + assert response.contexts[0].name == new_context_name assert len(response.contexts[0].topology_ids) == 0 assert len(response.contexts[0].service_ids) == 0 + assert len(response.contexts[0].slice_ids) == 0 + + # ----- Dump state of database after create/update the object ------------------------------------------------------ + #db_entries = database.dump_all() + #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + #for db_entry in db_entries: + # LOGGER.info(db_entry) + #LOGGER.info('-----------------------------------------------------------') + #assert len(db_entries) == 1 # ----- Remove the object ------------------------------------------------------------------------------------------ context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID)) # ----- Check remove event ----------------------------------------------------------------------------------------- - # event = events_collector.get_event(block=True) - # assert isinstance(event, ContextEvent) - # assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE - # assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + event = events_collector.get_event(block=True, timeout=10.0) + assert isinstance(event, ContextEvent) + #assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE + #assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID + + # ----- List after deleting the object ----------------------------------------------------------------------------- + response = context_client_grpc.ListContextIds(Empty()) + assert len(response.context_ids) == 0 + + response = context_client_grpc.ListContexts(Empty()) + assert len(response.contexts) == 0 # ----- Stop the EventsCollector ----------------------------------------------------------------------------------- events_collector.stop() # ----- Dump state of database after remove the object ------------------------------------------------------------- - db_entries = database.dump_all() - - LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) - for db_entry in db_entries: - LOGGER.info(db_entry) - LOGGER.info('-----------------------------------------------------------') - assert len(db_entries) == 0 + #db_entries = database.dump_all() + #LOGGER.info('----- Database Dump [{:3d} entries] -------------------------'.format(len(db_entries))) + #for db_entry in db_entries: + # LOGGER.info(db_entry) + #LOGGER.info('-----------------------------------------------------------') + #assert len(db_entries) == 0 + raise Exception() +""" def test_grpc_topology( context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - session = context_s_mb[0] + context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name + session = context_db_mb[0] database = Database(session) @@ -394,8 +439,8 @@ def test_grpc_topology( def test_grpc_device( context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - session = context_s_mb[0] + context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name + session = context_db_mb[0] database = Database(session) @@ -571,8 +616,8 @@ def test_grpc_device( def test_grpc_link( context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name - context_s_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name - session = context_s_mb[0] + context_db_mb: Tuple[Session, MessageBroker]): # pylint: disable=redefined-outer-name + session = context_db_mb[0] database = Database(session) @@ -753,10 +798,11 @@ def test_grpc_link( assert len(db_entries) == 0 """ +""" def test_grpc_service( context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name - context_s_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - Session = context_s_mb[0] + context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name + Session = context_db_mb[0] # ----- Clean the database ----------------------------------------------------------------------------------------- database = Database(Session) database.clear() @@ -941,14 +987,13 @@ def test_grpc_service( LOGGER.info(' [{:>4s}] {:40s} :: {:s}'.format(*db_entry)) # pragma: no cover LOGGER.info('-----------------------------------------------------------') assert len(db_entries) == 0 - - """ +""" def test_grpc_connection( context_client_grpc : ContextClient, # pylint: disable=redefined-outer-name context_db_mb : Tuple[Database, MessageBroker]): # pylint: disable=redefined-outer-name - Session = context_s_mb[0] + Session = context_db_mb[0] database = Database(Session)