Skip to content
Snippets Groups Projects
Commit 6cf2056a authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- updatd EventsCollector get_events
- added field created_at in ContextModel
- added ChangeFeedClient
- WIP arrangements in unitary tests
- WIP arrangements in ServicerImpl
- arranged run_tests_locally script
parent 817f5f08
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!34Context Scalability extensions using CockroachDB + Removal of Stateful database inside Device + other
......@@ -20,7 +20,7 @@
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
export TFS_K8S_HOSTNAME="tfs-vm"
#export TFS_K8S_HOSTNAME="tfs-vm"
########################################################################################################################
# Automated steps start here
......@@ -29,15 +29,21 @@ export TFS_K8S_HOSTNAME="tfs-vm"
PROJECTDIR=`pwd`
cd $PROJECTDIR/src
RCFILE=$PROJECTDIR/coverage/.coveragerc
#RCFILE=$PROJECTDIR/coverage/.coveragerc
kubectl --namespace $TFS_K8S_NAMESPACE expose deployment contextservice --name=redis-tests --port=6379 --type=NodePort
#kubectl --namespace $TFS_K8S_NAMESPACE expose deployment contextservice --name=redis-tests --port=6379 --type=NodePort
#export REDIS_SERVICE_HOST=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.clusterIP}')
export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}')
export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}')
#export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}')
#export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}')
export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs?sslmode=require"
# Run unitary tests and analyze coverage of code at same time
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \
#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \
# context/tests/test_unitary.py
source tfs_runtime_env_vars.sh
pytest --log-level=INFO --verbose -o log_cli=true --maxfail=1 \
context/tests/test_unitary.py
kubectl --namespace $TFS_K8S_NAMESPACE delete service redis-tests
#kubectl --namespace $TFS_K8S_NAMESPACE delete service redis-tests
......@@ -132,7 +132,7 @@ class EventsCollector:
if event is None: break
events.append(event)
else:
for _ in range(count):
while len(events) < count:
if self._terminate.is_set(): break
event = self.get_event(block=block, timeout=timeout)
if event is None: continue
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pip install psycopg==3.1.6
# Ref: https://www.cockroachlabs.com/docs/stable/changefeed-for.html
# (current implementation) Ref: https://www.cockroachlabs.com/docs/v22.1/changefeed-for
# Ref: https://www.psycopg.org/psycopg3/docs/api/crdb.html
import contextlib, json, logging, psycopg, psycopg.conninfo, psycopg.crdb, sys, time
from typing import Any, Dict, Iterator, List, Optional, Tuple
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
SQL_ACTIVATE_CHANGE_FEED = 'SET CLUSTER SETTING kv.rangefeed.enabled = true'
SQL_START_CHANGE_FEED = 'EXPERIMENTAL CHANGEFEED FOR {:s}.{:s} WITH format=json, no_initial_scan, updated'
class ChangeFeedClient:
def __init__(self) -> None:
self._connection : Optional[psycopg.crdb.CrdbConnection] = None
self._conn_info_dict : Dict = dict()
self._is_crdb : bool = False
def initialize(self) -> bool:
crdb_uri = get_setting('CRDB_URI')
if crdb_uri is None:
LOGGER.error('Connection string not found in EnvVar CRDB_URI')
return False
try:
crdb_uri = crdb_uri.replace('cockroachdb://', 'postgres://')
self._conn_info_dict = psycopg.conninfo.conninfo_to_dict(crdb_uri)
except psycopg.ProgrammingError:
LOGGER.exception('Invalid connection string: {:s}'.format(str(crdb_uri)))
return False
self._connection = psycopg.crdb.connect(**self._conn_info_dict)
self._is_crdb = psycopg.crdb.CrdbConnection.is_crdb(self._connection)
LOGGER.debug('is_crdb = {:s}'.format(str(self._is_crdb)))
# disable multi-statement transactions
self._connection.autocommit = True
# activate change feeds
self._connection.execute(SQL_ACTIVATE_CHANGE_FEED)
return self._is_crdb
def get_changes(self, table_name : str) -> Iterator[Tuple[float, str, List[Any], bool, Dict]]:
db_name = self._conn_info_dict.get('dbname')
if db_name is None: raise Exception('ChangeFeed has not been initialized!')
cur = self._connection.cursor()
str_sql_query = SQL_START_CHANGE_FEED.format(db_name, table_name)
with contextlib.closing(cur.stream(str_sql_query)) as feed:
for change in feed:
LOGGER.info(change)
table_name, primary_key, data = change[0], json.loads(change[1]), json.loads(change[2])
timestamp = data.get('updated') / 1.e9
if timestamp is None: timestamp = time.time()
after = data.get('after')
is_delete = ('after' in data) and (after is None)
yield timestamp, table_name, primary_key, is_delete, after
def main():
logging.basicConfig(level=logging.INFO)
cf = ChangeFeed()
ready = cf.initialize()
if not ready: raise Exception('Unable to initialize ChangeFeed')
for change in cf.get_changes('context'):
LOGGER.info(change)
return 0
if __name__ == '__main__':
sys.exit(main())
......@@ -13,13 +13,13 @@
# limitations under the License.
import grpc, json, logging, operator, sqlalchemy, threading, uuid
import grpc, json, logging, operator, sqlalchemy, threading, time, uuid
from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker
from sqlalchemy.dialects.postgresql import UUID, insert
#from sqlalchemy.dialects.postgresql import UUID, insert
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
from common.orm.backend.Tools import key_to_str
#from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
Context, ContextEvent, ContextId, ContextIdList, ContextList,
......@@ -30,36 +30,39 @@ from common.proto.context_pb2 import (
Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
ConfigActionEnum, Constraint)
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
#from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.tools.object_factory.Context import json_context_id
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.service.Database import Database
from context.service.database.ConfigModel import (
ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import (
ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
from common.rpc_method_wrapper.ServiceExceptions import (
InvalidArgumentException, NotFoundException, OperationFailedException)
#from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
#from context.service.Database import Database
#from context.service.database.ConfigModel import (
# ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
#from context.service.database.ConnectionModel import ConnectionModel, set_path
#from context.service.database.ConstraintModel import (
# ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
from context.service.database.Events import notify_event
from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type
from context.service.database.LinkModel import LinkModel
from context.service.database.PolicyRuleModel import PolicyRuleModel
from context.service.database.RelationModels import (
ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
from context.service.database.ServiceModel import (
ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
from context.service.database.TopologyModel import TopologyModel
from .Constants import (
CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
TOPIC_TOPOLOGY)
#from context.service.database.DeviceModel import (
# DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel)
#from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
#from context.service.database.Events import notify_event
#from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type
#from context.service.database.LinkModel import LinkModel
#from context.service.database.PolicyRuleModel import PolicyRuleModel
#from context.service.database.RelationModels import (
# ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
# SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
#from context.service.database.ServiceModel import (
# ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
#from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
#from context.service.database.TopologyModel import TopologyModel
#from .Constants import (
# CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
# TOPIC_TOPOLOGY)
from .ChangeFeedClient import ChangeFeedClient
LOGGER = logging.getLogger(__name__)
......@@ -106,7 +109,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))
context_uuid = request.context_uuid.uuid
def callback(session : Session) -> Optional[Dict]:
obj : Optional[ContextModel] = \
session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
......@@ -117,8 +120,8 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_id.context_uuid.uuid))
context_name = request.context_id.context_uuid.uuid
context_uuid = request.context_id.context_uuid.uuid
context_name = request.name
for i, topology_id in enumerate(request.topology_ids):
topology_context_uuid = topology_id.context_id.context_uuid.uuid
......@@ -134,15 +137,24 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
for i, slice_id in enumerate(request.slice_ids):
slice_context_uuid = slice_id.context_id.context_uuid.uuid
if slice_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.slice_ids[{:d}].context_id.context_uuid.uuid'.format(i), slice_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
def callback(session : Session) -> Tuple[Optional[Dict], bool]:
obj : Optional[ContextModel] = \
session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none()
updated = obj is not None
obj = ContextModel(context_uuid=context_uuid, context_name=context_name)
session.merge(obj)
session.commit()
is_update = obj is not None
if is_update:
obj.context_name = context_name
session.merge(obj)
else:
session.add(ContextModel(context_uuid=context_uuid, context_name=context_name, created_at=time.time()))
obj = session.get(ContextModel, {'context_uuid': context_uuid})
return (None if obj is None else obj.dump_id()), updated
return (None if obj is None else obj.dump_id()), is_update
obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
......@@ -153,7 +165,7 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))
context_uuid = request.context_uuid.uuid
def callback(session : Session) -> bool:
num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
......@@ -164,11 +176,24 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
# notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
return Empty()
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
# for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
# yield ContextEvent(**json.loads(message.content))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
#for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
# yield ContextEvent(**json.loads(message.content))
cf = ChangeFeedClient()
ready = cf.initialize()
if not ready: raise OperationFailedException('Initialize ChangeFeed')
for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'):
if is_delete:
event_type = EventTypeEnum.EVENTTYPE_REMOVE
else:
is_create = (timestamp - after.get('created_at')) < 1.0
event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE
event = {
'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type},
'context_id': json_context_id(primary_key[0]),
}
yield ContextEvent(**event)
# ----- Topology ---------------------------------------------------------------------------------------------------
......
from typing import Tuple, List
from sqlalchemy import MetaData
from sqlalchemy.orm import Session, joinedload
from context.service.database._Base import Base
import logging
from common.orm.backend.Tools import key_to_str
from sqlalchemy import MetaData
from sqlalchemy.orm import Session #, joinedload
from typing import Tuple #, List
from context.service.database._Base import _Base
#from common.orm.backend.Tools import key_to_str
from common.rpc_method_wrapper.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__)
class Database(Session):
def __init__(self, session):
super().__init__()
......@@ -62,8 +59,8 @@ class Database(Session):
def clear(self):
with self.session() as session:
engine = session.get_bind()
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
_Base.metadata.drop_all(engine)
_Base.metadata.create_all(engine)
def dump_by_table(self):
with self.session() as session:
......@@ -90,7 +87,7 @@ class Database(Session):
return result
def get_object(self, model_class: Base, main_key: str, raise_if_not_found=False):
def get_object(self, model_class: _Base, main_key: str, raise_if_not_found=False):
filt = {model_class.main_pk_name(): main_key}
with self.session() as session:
get = session.query(model_class).filter_by(**filt).one_or_none()
......@@ -104,7 +101,7 @@ class Database(Session):
dump = get.dump()
return get, dump
def get_object_filter(self, model_class: Base, filt, raise_if_not_found=False):
def get_object_filter(self, model_class: _Base, filt, raise_if_not_found=False):
with self.session() as session:
get = session.query(model_class).filter_by(**filt).all()
......@@ -119,7 +116,7 @@ class Database(Session):
return get, get.dump()
def get_or_create(self, model_class: Base, key_parts: str, filt=None) -> Tuple[Base, bool]:
def get_or_create(self, model_class: _Base, key_parts: str, filt=None) -> Tuple[_Base, bool]:
if not filt:
filt = {model_class.main_pk_name(): key_parts}
with self.session() as session:
......
......@@ -21,20 +21,20 @@ APP_NAME = 'tfs'
class Engine:
def get_engine(self) -> sqlalchemy.engine.Engine:
ccdb_url = get_setting('CCDB_URL')
crdb_uri = get_setting('CRDB_URI')
try:
engine = sqlalchemy.create_engine(
ccdb_url, connect_args={'application_name': APP_NAME}, echo=False, future=True)
crdb_uri, connect_args={'application_name': APP_NAME}, echo=False, future=True)
except: # pylint: disable=bare-except
LOGGER.exception('Failed to connect to database: {:s}'.format(ccdb_url))
LOGGER.exception('Failed to connect to database: {:s}'.format(crdb_uri))
return None
try:
if not sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.create_database(engine.url)
except: # pylint: disable=bare-except
LOGGER.exception('Failed to check/create to database: {:s}'.format(ccdb_url))
LOGGER.exception('Failed to check/create to database: {:s}'.format(crdb_uri))
return None
return engine
......@@ -19,7 +19,7 @@ from common.proto.context_pb2 import ConfigActionEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from sqlalchemy import Column, ForeignKey, INTEGER, CheckConstraint, Enum, String
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from context.service.database._Base import Base
from context.service.database._Base import _Base
from sqlalchemy.orm import relationship
from context.service.Database import Database
......
......@@ -14,7 +14,7 @@
import logging
from typing import Dict
from sqlalchemy import Column, String
from sqlalchemy import Column, Float, String
from sqlalchemy.dialects.postgresql import UUID
from ._Base import _Base
#from sqlalchemy.orm import relationship
......@@ -25,6 +25,7 @@ class ContextModel(_Base):
__tablename__ = 'context'
context_uuid = Column(UUID(as_uuid=False), primary_key=True)
context_name = Column(String(), nullable=False)
created_at = Column(Float)
#topology = relationship('TopologyModel', back_populates='context')
......
......@@ -13,3 +13,4 @@
# limitations under the License.
from ._Base import _Base, rebuild_database
from .ContextModel import ContextModel
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment