Skip to content
Snippets Groups Projects
Commit d01ce7b6 authored by Carlos Manso's avatar Carlos Manso
Browse files

Merge branch 'develop' into...

Merge branch 'develop' into feat/69-cttc-add-service-handler-for-flex-scale-end-to-end-connectivity-services
parents 9a35767b 1cdafbad
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!177Resolve "(CTTC) Add Service Handler for FLEX-SCALE End-to-End connectivity services"
...@@ -41,6 +41,10 @@ spec: ...@@ -41,6 +41,10 @@ spec:
value: "nats" value: "nats"
- name: LOG_LEVEL - name: LOG_LEVEL
value: "INFO" value: "INFO"
- name: ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY
value: "FALSE"
- name: ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY
value: "FALSE"
envFrom: envFrom:
- secretRef: - secretRef:
name: crdb-data name: crdb-data
......
...@@ -143,17 +143,34 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -143,17 +143,34 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId: def SetTopology(self, request: Topology, context : grpc.ServicerContext) -> TopologyId:
LOGGER.debug('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[SetTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.topology_id.context_id.context_uuid.uuid)) context_uuid = str(request.topology_id.context_id.context_uuid.uuid)
container_name = 'topology[{:s}]'.format(context_uuid)
topology_uuid = request.topology_id.topology_uuid.uuid topology_uuid = request.topology_id.topology_uuid.uuid
reply,_ = self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY) reply,_ = self._set(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _topology_id in context_.topology_ids:
if _topology_id.topology_uuid.uuid == topology_uuid: break
else:
# topology not found, add it
context_.topology_ids.add().topology_uuid.uuid = topology_uuid
LOGGER.debug('[SetTopology] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[SetTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty: def RemoveTopology(self, request: TopologyId, context : grpc.ServicerContext) -> Empty:
LOGGER.debug('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[RemoveTopology] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'topology[{:s}]'.format(str(request.context_id.context_uuid.uuid)) context_uuid = str(request.context_id.context_uuid.uuid)
container_name = 'topology[{:s}]'.format(context_uuid)
topology_uuid = request.topology_uuid.uuid topology_uuid = request.topology_uuid.uuid
reply = self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context) reply = self._del(request, container_name, topology_uuid, 'topology_id', TOPIC_TOPOLOGY, context)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _topology_id in context_.topology_ids:
if _topology_id.topology_uuid.uuid == topology_uuid:
context_.topology_ids.remove(_topology_id)
break
LOGGER.debug('[RemoveTopology] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[RemoveTopology] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
...@@ -368,17 +385,34 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -368,17 +385,34 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId:
LOGGER.debug('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[SetSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.slice_id.context_id.context_uuid.uuid)) context_uuid = str(request.slice_id.context_id.context_uuid.uuid)
container_name = 'slice[{:s}]'.format(context_uuid)
slice_uuid = request.slice_id.slice_uuid.uuid slice_uuid = request.slice_id.slice_uuid.uuid
reply,_ = self._set(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE) reply,_ = self._set(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _slice_id in context_.slice_ids:
if _slice_id.slice_uuid.uuid == slice_uuid: break
else:
# slice not found, add it
context_.slice_ids.add().slice_uuid.uuid = slice_uuid
LOGGER.debug('[SetSlice] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[SetSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty: def RemoveSlice(self, request: SliceId, context : grpc.ServicerContext) -> Empty:
LOGGER.debug('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[RemoveSlice] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'slice[{:s}]'.format(str(request.context_id.context_uuid.uuid)) context_uuid = str(request.slice_id.context_id.context_uuid.uuid)
container_name = 'slice[{:s}]'.format(context_uuid)
slice_uuid = request.slice_uuid.uuid slice_uuid = request.slice_uuid.uuid
reply = self._del(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE, context) reply = self._del(request, container_name, slice_uuid, 'slice_id', TOPIC_SLICE, context)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _slice_id in context_.slice_ids:
if _slice_id.slice_uuid.uuid == slice_uuid:
context_.slice_ids.remove(_slice_id)
break
LOGGER.debug('[RemoveSlice] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[RemoveSlice] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
...@@ -443,17 +477,34 @@ class MockServicerImpl_Context(ContextServiceServicer): ...@@ -443,17 +477,34 @@ class MockServicerImpl_Context(ContextServiceServicer):
def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId: def SetService(self, request: Service, context : grpc.ServicerContext) -> ServiceId:
LOGGER.debug('[SetService] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[SetService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.service_id.context_id.context_uuid.uuid)) context_uuid = str(request.service_id.context_id.context_uuid.uuid)
container_name = 'service[{:s}]'.format(context_uuid)
service_uuid = request.service_id.service_uuid.uuid service_uuid = request.service_id.service_uuid.uuid
reply,_ = self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE) reply,_ = self._set(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _service_id in context_.service_ids:
if _service_id.service_uuid.uuid == service_uuid: break
else:
# service not found, add it
context_.service_ids.add().service_uuid.uuid = service_uuid
LOGGER.debug('[SetService] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[SetService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty: def RemoveService(self, request: ServiceId, context : grpc.ServicerContext) -> Empty:
LOGGER.debug('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request))) LOGGER.debug('[RemoveService] request={:s}'.format(grpc_message_to_json_string(request)))
container_name = 'service[{:s}]'.format(str(request.context_id.context_uuid.uuid)) context_uuid = str(request.service_id.context_id.context_uuid.uuid)
container_name = 'service[{:s}]'.format(context_uuid)
service_uuid = request.service_uuid.uuid service_uuid = request.service_uuid.uuid
reply = self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context) reply = self._del(request, container_name, service_uuid, 'service_id', TOPIC_SERVICE, context)
context_ = self.obj_db.get_entry('context', context_uuid, context)
for _service_id in context_.service_ids:
if _service_id.service_uuid.uuid == service_uuid:
context_.service_ids.remove(_service_id)
break
LOGGER.debug('[RemoveService] reply={:s}'.format(grpc_message_to_json_string(reply))) LOGGER.debug('[RemoveService] reply={:s}'.format(grpc_message_to_json_string(reply)))
return reply return reply
......
...@@ -65,7 +65,7 @@ def get_topology( ...@@ -65,7 +65,7 @@ def get_topology(
def get_topology_details( def get_topology_details(
context_client : ContextClient, topology_uuid : str, context_uuid : str = DEFAULT_CONTEXT_NAME, context_client : ContextClient, topology_uuid : str, context_uuid : str = DEFAULT_CONTEXT_NAME,
rw_copy : bool = False rw_copy : bool = False
) -> Optional[Topology]: ) -> Optional[TopologyDetails]:
try: try:
# pylint: disable=no-member # pylint: disable=no-member
topology_id = TopologyId() topology_id = TopologyId()
......
...@@ -240,11 +240,16 @@ class DescriptorLoader: ...@@ -240,11 +240,16 @@ class DescriptorLoader:
self._process_descr('slice', 'add', self.__ctx_cli.SetSlice, Slice, self.__slices ) self._process_descr('slice', 'add', self.__ctx_cli.SetSlice, Slice, self.__slices )
self._process_descr('connection', 'add', self.__ctx_cli.SetConnection, Connection, self.__connections ) self._process_descr('connection', 'add', self.__ctx_cli.SetConnection, Connection, self.__connections )
# Update context and topology is useless: # By default the Context component automatically assigns devices and links to topologies based on their
# - devices and links are assigned to topologies automatically by Context component # endpoints, and assigns topologies, services, and slices to contexts based on their identifiers.
# - topologies, services, and slices are assigned to contexts automatically by Context component
# The following statement is useless; up to now, any use case requires assigning a topology, service, or
# slice to a different context.
#self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts ) #self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
#self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
# In some cases, it might be needed to assign devices and links to multiple topologies; the
# following statement performs that assignment.
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
#self.__ctx_cli.close() #self.__ctx_cli.close()
...@@ -271,12 +276,17 @@ class DescriptorLoader: ...@@ -271,12 +276,17 @@ class DescriptorLoader:
self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services ) self._process_descr('service', 'update', self.__svc_cli.UpdateService, Service, self.__services )
self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add ) self._process_descr('slice', 'add', self.__slc_cli.CreateSlice, Slice, self.__slices_add )
self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices ) self._process_descr('slice', 'update', self.__slc_cli.UpdateSlice, Slice, self.__slices )
# Update context and topology is useless: # By default the Context component automatically assigns devices and links to topologies based on their
# - devices and links are assigned to topologies automatically by Context component # endpoints, and assigns topologies, services, and slices to contexts based on their identifiers.
# - topologies, services, and slices are assigned to contexts automatically by Context component
# The following statement is useless; up to now, any use case requires assigning a topology, service, or
# slice to a different context.
#self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts ) #self._process_descr('context', 'update', self.__ctx_cli.SetContext, Context, self.__contexts )
#self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
# In some cases, it might be needed to assign devices and links to multiple topologies; the
# following statement performs that assignment.
self._process_descr('topology', 'update', self.__ctx_cli.SetTopology, Topology, self.__topologies )
#self.__slc_cli.close() #self.__slc_cli.close()
#self.__svc_cli.close() #self.__svc_cli.close()
......
...@@ -12,3 +12,15 @@ ...@@ -12,3 +12,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from common.Settings import get_setting
TRUE_VALUES = {'Y', 'YES', 'T', 'TRUE', 'E', 'ENABLE', 'ENABLED'}
def is_enabled(setting_name : str, default_value : bool) -> bool:
_is_enabled = get_setting(setting_name, default=None)
if _is_enabled is None: return default_value
str_is_enabled = str(_is_enabled).upper()
return str_is_enabled in TRUE_VALUES
DEFAULT_VALUE = False
ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY = is_enabled('ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY', DEFAULT_VALUE)
ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY = is_enabled('ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY', DEFAULT_VALUE)
...@@ -17,17 +17,20 @@ from sqlalchemy.dialects.postgresql import insert ...@@ -17,17 +17,20 @@ from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy.orm import Session, selectinload, sessionmaker
from sqlalchemy_cockroachdb import run_transaction from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional from typing import Dict, List, Optional, Set
from common.proto.context_pb2 import ( from common.proto.context_pb2 import (
ContextId, Empty, EventTypeEnum, Topology, TopologyDetails, TopologyId, TopologyIdList, TopologyList) ContextId, Empty, EventTypeEnum, Topology, TopologyDetails, TopologyId, TopologyIdList, TopologyList)
from common.message_broker.MessageBroker import MessageBroker from common.message_broker.MessageBroker import MessageBroker
from common.method_wrappers.ServiceExceptions import NotFoundException from common.method_wrappers.ServiceExceptions import NotFoundException
from common.tools.object_factory.Context import json_context_id from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id from common.tools.object_factory.Topology import json_topology_id
from context.Config import ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY, ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY
from .models.DeviceModel import DeviceModel from .models.DeviceModel import DeviceModel
from .models.LinkModel import LinkModel from .models.LinkModel import LinkModel
from .models.TopologyModel import TopologyDeviceModel, TopologyLinkModel, TopologyModel from .models.TopologyModel import TopologyDeviceModel, TopologyLinkModel, TopologyModel
from .uuids.Context import context_get_uuid from .uuids.Context import context_get_uuid
from .uuids.Device import device_get_uuid
from .uuids.Link import link_get_uuid
from .uuids.Topology import topology_get_uuid from .uuids.Topology import topology_get_uuid
from .Events import notify_event_context, notify_event_topology from .Events import notify_event_context, notify_event_topology
...@@ -94,15 +97,40 @@ def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : To ...@@ -94,15 +97,40 @@ def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : To
if len(topology_name) == 0: topology_name = request.topology_id.topology_uuid.uuid if len(topology_name) == 0: topology_name = request.topology_id.topology_uuid.uuid
context_uuid,topology_uuid = topology_get_uuid(request.topology_id, topology_name=topology_name, allow_random=True) context_uuid,topology_uuid = topology_get_uuid(request.topology_id, topology_name=topology_name, allow_random=True)
# Ignore request.device_ids and request.link_ids. They are used for retrieving devices and links added into the # By default, ignore request.device_ids and request.link_ids. They are used for retrieving
# topology. Explicit addition into the topology is done automatically when creating the devices and links, based # devices and links added into the topology. Explicit addition into the topology is done
# on the topologies specified in the endpoints associated with the devices and links. # automatically when creating the devices and links, based on the topologies specified in
# the endpoints associated with the devices and links.
# In some cases, it might be needed to add them explicitly; to allow that, activate flags
# ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY and/or ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY.
if len(request.device_ids) > 0: # pragma: no cover related_devices : List[Dict] = list()
LOGGER.warning('Items in field "device_ids" ignored. This field is used for retrieval purposes only.') if ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY:
device_uuids : Set[str] = set()
for device_id in request.device_ids:
device_uuid = device_get_uuid(device_id)
if device_uuid not in device_uuids: continue
related_devices.append({'topology_uuid': topology_uuid, 'device_uuid': device_uuid})
device_uuids.add(device_uuid)
else:
if len(request.device_ids) > 0: # pragma: no cover
MSG = 'ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY={:s}; '.format(str(ALLOW_EXPLICIT_ADD_DEVICE_TO_TOPOLOGY))
MSG += 'Items in field "device_ids" ignored. This field is used for retrieval purposes only.'
LOGGER.warning(MSG)
if len(request.link_ids) > 0: # pragma: no cover related_links : List[Dict] = list()
LOGGER.warning('Items in field "link_ids" ignored. This field is used for retrieval purposes only.') if ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY:
link_uuids : Set[str] = set()
for link_id in request.link_ids:
link_uuid = link_get_uuid(link_id)
if link_uuid not in link_uuids: continue
related_links.append({'topology_uuid': topology_uuid, 'link_uuid': link_uuid})
link_uuids.add(link_uuid)
else:
if len(request.link_ids) > 0: # pragma: no cover
MSG = 'ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY={:s}; '.format(str(ALLOW_EXPLICIT_ADD_LINK_TO_TOPOLOGY))
MSG += 'Items in field "link_ids" ignored. This field is used for retrieval purposes only.'
LOGGER.warning(MSG)
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
topology_data = [{ topology_data = [{
...@@ -124,7 +152,28 @@ def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : To ...@@ -124,7 +152,28 @@ def topology_set(db_engine : Engine, messagebroker : MessageBroker, request : To
) )
stmt = stmt.returning(TopologyModel.created_at, TopologyModel.updated_at) stmt = stmt.returning(TopologyModel.created_at, TopologyModel.updated_at)
created_at,updated_at = session.execute(stmt).fetchone() created_at,updated_at = session.execute(stmt).fetchone()
return updated_at > created_at
updated = updated_at > created_at
updated_topology_device = False
if len(related_devices) > 0:
stmt = insert(TopologyDeviceModel).values(related_devices)
stmt = stmt.on_conflict_do_nothing(
index_elements=[TopologyDeviceModel.topology_uuid, TopologyDeviceModel.device_uuid]
)
topology_device_inserts = session.execute(stmt)
updated_topology_device = int(topology_device_inserts.rowcount) > 0
updated_topology_link = False
if len(related_links) > 0:
stmt = insert(TopologyLinkModel).values(related_links)
stmt = stmt.on_conflict_do_nothing(
index_elements=[TopologyLinkModel.topology_uuid, TopologyLinkModel.link_uuid]
)
topology_link_inserts = session.execute(stmt)
updated_topology_link = int(topology_link_inserts.rowcount) > 0
return updated or updated_topology_device or updated_topology_link
updated = run_transaction(sessionmaker(bind=db_engine), callback) updated = run_transaction(sessionmaker(bind=db_engine), callback)
context_id = json_context_id(context_uuid) context_id = json_context_id(context_uuid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment