Skip to content
Snippets Groups Projects
Commit a8e2c9b3 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Context component:

- updated manifest
- corrected README.md notes
- corrected script run-tests-locally
- partial code implementation
parent 6cf2056a
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!34Context Scalability extensions using CockroachDB + Removal of Stateful database inside Device + other
......@@ -12,7 +12,7 @@ kubectl apply -f "${DEPLOY_PATH}/crds.yaml"
# Deploy CockroachDB Operator
curl -o "${DEPLOY_PATH}/operator.yaml" "${OPERATOR_BASE_URL}/install/operator.yaml"
# edit "${DEPLOY_PATH}/operator.yaml"
# - add env var: WATCH_NAMESPACE='tfs-ccdb'
# - add env var: WATCH_NAMESPACE='tfs-crdb'
kubectl apply -f "${DEPLOY_PATH}/operator.yaml"
# Deploy CockroachDB
......@@ -20,21 +20,21 @@ curl -o "${DEPLOY_PATH}/cluster.yaml" "${OPERATOR_BASE_URL}/examples/example.yam
# edit "${DEPLOY_PATH}/cluster.yaml"
# - set version
# - set number of replicas
kubectl create namespace tfs-ccdb
kubectl apply --namespace tfs-ccdb -f "${DEPLOY_PATH}/cluster.yaml"
kubectl create namespace tfs-crdb
kubectl apply --namespace tfs-crdb -f "${DEPLOY_PATH}/cluster.yaml"
# Deploy CockroachDB Client
curl -o "${DEPLOY_PATH}/client-secure-operator.yaml" "${OPERATOR_BASE_URL}/examples/client-secure-operator.yaml"
kubectl create --namespace tfs-ccdb -f "${DEPLOY_PATH}/client-secure-operator.yaml"
kubectl create --namespace tfs-crdb -f "${DEPLOY_PATH}/client-secure-operator.yaml"
# Add tfs user with admin rights
$ kubectl exec -it ccdb-client-secure --namespace tfs-ccdb -- ./cockroach sql --certs-dir=/cockroach/cockroach-certs --host=cockroachdb-public
$ kubectl exec -it cockroachdb-client-secure --namespace tfs-crdb -- ./cockroach sql --certs-dir=/cockroach/cockroach-certs --host=cockroachdb-public
-- CREATE USER tfs WITH PASSWORD 'tfs123';
-- GRANT admin TO tfs;
# Expose CockroachDB SQL port (26257)
PORT=$(kubectl --namespace cockroachdb get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
PATCH='{"data": {"'${PORT}'": "cockroachdb/cockroachdb-public:'${PORT}'"}}'
PORT=$(kubectl --namespace tfs-crdb get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="sql")].port}')
PATCH='{"data": {"'${PORT}'": "tfs-crdb/cockroachdb-public:'${PORT}'"}}'
kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}"
PORT_MAP='{"containerPort": '${PORT}', "hostPort": '${PORT}'}'
......@@ -43,8 +43,8 @@ PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}'
kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}"
# Expose CockroachDB Console port (8080)
PORT=$(kubectl --namespace cockroachdb get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="http")].port}')
PATCH='{"data": {"'${PORT}'": "cockroachdb/cockroachdb-public:'${PORT}'"}}'
PORT=$(kubectl --namespace tfs-crdb get service cockroachdb-public -o 'jsonpath={.spec.ports[?(@.name=="http")].port}')
PATCH='{"data": {"'${PORT}'": "tfs-crdb/cockroachdb-public:'${PORT}'"}}'
kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}"
PORT_MAP='{"containerPort": '${PORT}', "hostPort": '${PORT}'}'
......
......@@ -47,7 +47,7 @@ spec:
- containerPort: 8080
env:
- name: CCDB_URL
value: "cockroachdb://tfs:tfs123@cockroachdb-public.cockroachdb.svc.cluster.local:26257/tfs?sslmode=require"
value: "cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs?sslmode=require"
- name: DB_BACKEND
value: "redis"
- name: MB_BACKEND
......
......@@ -36,13 +36,13 @@ cd $PROJECTDIR/src
#export REDIS_SERVICE_HOST=$(kubectl get node $TFS_K8S_HOSTNAME -o 'jsonpath={.status.addresses[?(@.type=="InternalIP")].address}')
#export REDIS_SERVICE_PORT=$(kubectl --namespace $TFS_K8S_NAMESPACE get service redis-tests -o 'jsonpath={.spec.ports[?(@.port==6379)].nodePort}')
export CRDB_URI="cockroachdb://tfs:tfs123@127.0.0.1:26257/tfs?sslmode=require"
export CRDB_URI="cockroachdb://tfs:tfs123@10.1.7.195:26257/tfs?sslmode=require"
export PYTHONPATH=/home/tfs/tfs-ctrl/src
# Run unitary tests and analyze coverage of code at same time
#coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose --maxfail=1 \
# context/tests/test_unitary.py
source tfs_runtime_env_vars.sh
pytest --log-level=INFO --verbose -o log_cli=true --maxfail=1 \
context/tests/test_unitary.py
......
......@@ -62,7 +62,7 @@ from context.service.database.ContextModel import ContextModel
#from .Constants import (
# CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
# TOPIC_TOPOLOGY)
from .ChangeFeedClient import ChangeFeedClient
#from .ChangeFeedClient import ChangeFeedClient
LOGGER = logging.getLogger(__name__)
......@@ -178,37 +178,54 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
pass
#for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
# yield ContextEvent(**json.loads(message.content))
cf = ChangeFeedClient()
ready = cf.initialize()
if not ready: raise OperationFailedException('Initialize ChangeFeed')
for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'):
if is_delete:
event_type = EventTypeEnum.EVENTTYPE_REMOVE
else:
is_create = (timestamp - after.get('created_at')) < 1.0
event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE
event = {
'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type},
'context_id': json_context_id(primary_key[0]),
}
yield ContextEvent(**event)
#cf = ChangeFeedClient()
#ready = cf.initialize()
#if not ready: raise OperationFailedException('Initialize ChangeFeed')
#for timestamp, _, primary_key, is_delete, after in cf.get_changes('context'):
# if is_delete:
# event_type = EventTypeEnum.EVENTTYPE_REMOVE
# else:
# is_create = (timestamp - after.get('created_at')) < 1.0
# event_type = EventTypeEnum.EVENTTYPE_CREATE if is_create else EventTypeEnum.EVENTTYPE_UPDATE
# event = {
# 'event': {'timestamp': {'timestamp': timestamp}, 'event_type': event_type},
# 'context_id': json_context_id(primary_key[0]),
# }
# yield ContextEvent(**event)
# ----- Topology ---------------------------------------------------------------------------------------------------
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
# context_uuid = request.context_uuid.uuid
#
# with self.session() as session:
# result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
# if not result:
# raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#
# db_topologies = result.topology
# return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
#
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
context_uuid = request.context_uuid.uuid
with self.session() as session:
result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
if not result:
raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
db_topologies = result.topology
return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
return [obj.dump_id() for obj in obj_list]
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
return [obj.dump() for obj in obj_list]
return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback))
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
# context_uuid = request.context_uuid.uuid
......
......@@ -145,12 +145,12 @@ def test_grpc_context(
rebuild_database(db_engine, drop_if_exists=True)
# ----- Initialize the EventsCollector -----------------------------------------------------------------------------
events_collector = EventsCollector(
context_client_grpc, log_events_received=True,
activate_context_collector = True, activate_topology_collector = False, activate_device_collector = False,
activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False,
activate_connection_collector = False)
events_collector.start()
#events_collector = EventsCollector(
# context_client_grpc, log_events_received=True,
# activate_context_collector = True, activate_topology_collector = False, activate_device_collector = False,
# activate_link_collector = False, activate_service_collector = False, activate_slice_collector = False,
# activate_connection_collector = False)
#events_collector.start()
# ----- Get when the object does not exist -------------------------------------------------------------------------
with pytest.raises(grpc.RpcError) as e:
......@@ -207,8 +207,8 @@ def test_grpc_context(
assert e.value.details() == msg
# ----- Check create event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True, timeout=10.0)
assert isinstance(event, ContextEvent)
#event = events_collector.get_event(block=True, timeout=10.0)
#assert isinstance(event, ContextEvent)
#assert event.event.event_type == EventTypeEnum.EVENTTYPE_CREATE
#assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
......@@ -241,8 +241,8 @@ def test_grpc_context(
assert response.context_uuid.uuid == DEFAULT_CONTEXT_UUID
# ----- Check update event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True, timeout=10.0)
assert isinstance(event, ContextEvent)
#event = events_collector.get_event(block=True, timeout=10.0)
#assert isinstance(event, ContextEvent)
#assert event.event.event_type == EventTypeEnum.EVENTTYPE_UPDATE
#assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
......@@ -279,8 +279,8 @@ def test_grpc_context(
context_client_grpc.RemoveContext(ContextId(**CONTEXT_ID))
# ----- Check remove event -----------------------------------------------------------------------------------------
event = events_collector.get_event(block=True, timeout=10.0)
assert isinstance(event, ContextEvent)
#event = events_collector.get_event(block=True, timeout=10.0)
#assert isinstance(event, ContextEvent)
#assert event.event.event_type == EventTypeEnum.EVENTTYPE_REMOVE
#assert event.context_id.context_uuid.uuid == DEFAULT_CONTEXT_UUID
......@@ -292,7 +292,7 @@ def test_grpc_context(
assert len(response.contexts) == 0
# ----- Stop the EventsCollector -----------------------------------------------------------------------------------
events_collector.stop()
#events_collector.stop()
# ----- Dump state of database after remove the object -------------------------------------------------------------
#db_entries = database.dump_all()
......@@ -302,8 +302,6 @@ def test_grpc_context(
#LOGGER.info('-----------------------------------------------------------')
#assert len(db_entries) == 0
raise Exception()
"""
def test_grpc_topology(
context_client_grpc: ContextClient, # pylint: disable=redefined-outer-name
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment