Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 542 additions and 64 deletions
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
LOGGER = logging.getLogger(__name__)
def test_device_emulated_add_error_cases():
LOGGER.info("Start Tests")
LOGGER.info("Second log Tests")
assert True
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
LOGGER = logging.getLogger(__name__)
def test_device_emulated_add_error_cases():
LOGGER.info("Start Tests")
assert True
......@@ -36,6 +36,7 @@ INTERDOMAIN_TOPOLOGY_NAME = 'inter' # contains the abstract inter-domain top
# Default service names
class ServiceNameEnum(Enum):
AUTOMATION = 'automation'
CONTEXT = 'context'
DEVICE = 'device'
SERVICE = 'service'
......@@ -65,10 +66,11 @@ class ServiceNameEnum(Enum):
KPIMANAGER = 'kpi-manager'
KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer'
TELEMETRYFRONTEND = 'telemetry-frontend'
TELEMETRY = 'telemetry'
TELEMETRYBACKEND = 'telemetry-backend'
ANALYTICSFRONTEND = 'analytics-frontend'
ANALYTICS = 'analytics'
ANALYTICSBACKEND = 'analytics-backend'
QOSPROFILE = 'qos-profile'
# Used for test and debugging only
DLT_GATEWAY = 'dltgateway'
......@@ -99,13 +101,15 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.OPTICALCONTROLLER .value : 10060,
ServiceNameEnum.QKD_APP .value : 10070,
ServiceNameEnum.BGPLS .value : 20030,
ServiceNameEnum.QOSPROFILE .value : 20040,
ServiceNameEnum.KPIMANAGER .value : 30010,
ServiceNameEnum.KPIVALUEAPI .value : 30020,
ServiceNameEnum.KPIVALUEWRITER .value : 30030,
ServiceNameEnum.TELEMETRYFRONTEND .value : 30050,
ServiceNameEnum.TELEMETRY .value : 30050,
ServiceNameEnum.TELEMETRYBACKEND .value : 30060,
ServiceNameEnum.ANALYTICSFRONTEND .value : 30080,
ServiceNameEnum.ANALYTICS .value : 30080,
ServiceNameEnum.ANALYTICSBACKEND .value : 30090,
ServiceNameEnum.AUTOMATION .value : 30200,
# Used for test and debugging only
ServiceNameEnum.DLT_GATEWAY .value : 50051,
......@@ -114,15 +118,12 @@ DEFAULT_SERVICE_GRPC_PORTS = {
# Default HTTP/REST-API service ports
DEFAULT_SERVICE_HTTP_PORTS = {
ServiceNameEnum.CONTEXT .value : 8080,
ServiceNameEnum.NBI .value : 8080,
ServiceNameEnum.WEBUI .value : 8004,
ServiceNameEnum.QKD_APP .value : 8005,
ServiceNameEnum.NBI .value : 8080,
ServiceNameEnum.WEBUI.value : 8004,
}
# Default HTTP/REST-API service base URLs
DEFAULT_SERVICE_HTTP_BASEURLS = {
ServiceNameEnum.NBI .value : None,
ServiceNameEnum.WEBUI .value : None,
ServiceNameEnum.QKD_APP .value : None,
ServiceNameEnum.NBI .value : None,
ServiceNameEnum.WEBUI.value : None,
}
......@@ -15,9 +15,11 @@
import logging, os, re, time
from typing import Dict, List
from common.Constants import (
DEFAULT_GRPC_BIND_ADDRESS, DEFAULT_GRPC_GRACE_PERIOD, DEFAULT_GRPC_MAX_WORKERS, DEFAULT_HTTP_BIND_ADDRESS,
DEFAULT_LOG_LEVEL, DEFAULT_METRICS_PORT, DEFAULT_SERVICE_GRPC_PORTS, DEFAULT_SERVICE_HTTP_BASEURLS,
DEFAULT_SERVICE_HTTP_PORTS, ServiceNameEnum
DEFAULT_GRPC_BIND_ADDRESS, DEFAULT_GRPC_GRACE_PERIOD,
DEFAULT_GRPC_MAX_WORKERS, DEFAULT_HTTP_BIND_ADDRESS,
DEFAULT_LOG_LEVEL, DEFAULT_METRICS_PORT, DEFAULT_SERVICE_GRPC_PORTS,
DEFAULT_SERVICE_HTTP_BASEURLS, DEFAULT_SERVICE_HTTP_PORTS,
ServiceNameEnum
)
LOGGER = logging.getLogger(__name__)
......
......@@ -33,20 +33,25 @@
# # do test ...
# descriptor_loader.unload()
import concurrent.futures, json, logging, operator
import concurrent.futures, copy, json, logging, operator
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import (
Connection, Context, ContextId, Device, DeviceId, Empty, Link, LinkId, Service, ServiceId, Slice, SliceId,
Topology, TopologyId)
Connection, Context, ContextId, Device, DeviceId, Empty,
Link, LinkId, Service, ServiceId, Slice, SliceId,
Topology, TopologyId
)
from common.tools.object_factory.Context import json_context_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.client.ServiceClient import ServiceClient
from slice.client.SliceClient import SliceClient
from .Tools import (
format_device_custom_config_rules, format_service_custom_config_rules, format_slice_custom_config_rules,
get_descriptors_add_contexts, get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices, split_devices_by_rules)
format_device_custom_config_rules, format_service_custom_config_rules,
format_slice_custom_config_rules, get_descriptors_add_contexts,
get_descriptors_add_services, get_descriptors_add_slices,
get_descriptors_add_topologies, split_controllers_and_network_devices,
split_devices_by_rules
)
LOGGER = logging.getLogger(__name__)
LOGGERS = {
......@@ -78,6 +83,30 @@ TypeResults = List[Tuple[str, str, int, List[str]]] # entity_name, action, num_o
TypeNotification = Tuple[str, str] # message, level
TypeNotificationList = List[TypeNotification]
SLICE_TEMPLATE = {
"slice_id": {
"context_id": {"context_uuid": {"uuid": "admin"}},
"slice_uuid": {"uuid": None}
},
"name": {},
"slice_config": {"config_rules": [
{"action": 1, "custom": {"resource_key": "/settings", "resource_value": {
"address_families": ["IPV4"], "bgp_as": 65000,
"bgp_route_target": "65000:333", "mtu": 1512
}}}
]},
"slice_constraints": [
{"sla_capacity": {"capacity_gbps": 20.0}},
{"sla_availability": {"availability": 20.0, "num_disjoint_paths": 1, "all_active": True}},
{"sla_isolation": {"isolation_level": [0]}}
],
"slice_endpoint_ids": [
],
"slice_status": {"slice_status": 1}
}
class DescriptorLoader:
def __init__(
self, descriptors : Optional[Union[str, Dict]] = None, descriptors_file : Optional[str] = None,
......@@ -106,8 +135,53 @@ class DescriptorLoader:
self.__links = self.__descriptors.get('links' , [])
self.__services = self.__descriptors.get('services' , [])
self.__slices = self.__descriptors.get('slices' , [])
self.__ietf_slices = self.__descriptors.get('ietf-network-slice-service:network-slice-services', {})
self.__connections = self.__descriptors.get('connections', [])
if len(self.__ietf_slices) > 0:
for slice_service in self.__ietf_slices["slice-service"]:
tfs_slice = copy.deepcopy(SLICE_TEMPLATE)
tfs_slice["slice_id"]["slice_uuid"]["uuid"] = slice_service["id"]
tfs_slice["name"] = slice_service["description"]
for sdp in slice_service["sdps"]["sdp"]:
sdp_id = sdp["id"]
for attcircuit in sdp["attachment-circuits"]["attachment-circuit"]:
att_cir_tp_id = attcircuit["ac-tp-id"]
RESOURCE_KEY = "/device[{:s}]/endpoint[{:s}]/settings"
resource_key = RESOURCE_KEY.format(str(sdp_id), str(att_cir_tp_id))
for tag in attcircuit['ac-tags']['ac-tag']:
if tag.get('tag-type') == 'ietf-nss:vlan-id':
vlan_id = tag.get('value')
else:
vlan_id = 0
tfs_slice["slice_config"]["config_rules"].append({
"action": 1, "custom": {
"resource_key": resource_key, "resource_value": {
"router_id": sdp.get("node-id",[]),
"sub_interface_index": 0,
"vlan_id": vlan_id
}
}
})
tfs_slice["slice_endpoint_ids"].append({
"device_id": {"device_uuid": {"uuid": sdp_id}},
"endpoint_uuid": {"uuid": att_cir_tp_id},
"topology_id": {"context_id": {"context_uuid": {"uuid": "admin"}},
"topology_uuid": {"uuid": "admin"}}
})
#tfs_slice["slice_constraints"].append({
# "endpoint_location": {
# "endpoint_id": {
# "device_id": {"device_uuid": {"uuid": sdp["id"]}},
# "endpoint_uuid": {"uuid": attcircuit["ac-tp-id"]}
# },
# "location": {"region": "4"}
# }
#})
self.__slices.append(tfs_slice)
self.__contexts_add = None
self.__topologies_add = None
self.__devices_add = None
......@@ -232,7 +306,9 @@ class DescriptorLoader:
def _load_dummy_mode(self) -> None:
# Dummy Mode: used to pre-load databases (WebUI debugging purposes) with no smart or automated tasks.
controllers, network_devices = split_controllers_and_network_devices(self.__devices)
self.__ctx_cli.connect()
self._process_descr('context', 'add', self.__ctx_cli.SetContext, Context, self.__contexts_add )
self._process_descr('topology', 'add', self.__ctx_cli.SetTopology, Topology, self.__topologies_add)
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# See usage example below
import grpc, logging, queue, threading, time
from typing import Any, Callable, List, Optional
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
class CollectorThread(threading.Thread):
def __init__(
self, subscription_func : Callable, events_queue = queue.PriorityQueue,
terminate = threading.Event, log_events_received: bool = False
) -> None:
super().__init__(daemon=False)
self._subscription_func = subscription_func
self._events_queue = events_queue
self._terminate = terminate
self._log_events_received = log_events_received
self._stream = None
def cancel(self) -> None:
if self._stream is None: return
self._stream.cancel()
def run(self) -> None:
while not self._terminate.is_set():
self._stream = self._subscription_func()
try:
for event in self._stream:
if self._log_events_received:
str_event = grpc_message_to_json_string(event)
LOGGER.info('[_collect] event: {:s}'.format(str_event))
timestamp = event.event.timestamp.timestamp
self._events_queue.put_nowait((timestamp, event))
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
LOGGER.info('[_collect] UNAVAILABLE... retrying...')
time.sleep(0.5)
continue
elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member
break
else:
raise # pragma: no cover
class BaseEventCollector:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._events_queue = queue.PriorityQueue()
self._terminate = threading.Event() if terminate is None else terminate
self._collector_threads : List[CollectorThread] = list()
def install_collector(
self, subscription_method : Callable, request_message : Any,
log_events_received : bool = False
) -> None:
self._collector_threads.append(CollectorThread(
lambda: subscription_method(request_message),
self._events_queue, self._terminate, log_events_received
))
def start(self):
self._terminate.clear()
for collector_thread in self._collector_threads:
collector_thread.start()
def stop(self):
self._terminate.set()
for collector_thread in self._collector_threads:
collector_thread.cancel()
for collector_thread in self._collector_threads:
collector_thread.join()
def get_events_queue(self) -> queue.PriorityQueue:
return self._events_queue
def get_event(self, block : bool = True, timeout : float = 0.1):
try:
_,event = self._events_queue.get(block=block, timeout=timeout)
return event
except queue.Empty: # pylint: disable=catching-non-exception
return None
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while not self._terminate.is_set():
event = self.get_event(block=block, timeout=timeout)
if event is None: break
events.append(event)
else:
while len(events) < count:
if self._terminate.is_set(): break
event = self.get_event(block=block, timeout=timeout)
if event is None: continue
events.append(event)
return sorted(events, key=lambda e: e.event.timestamp.timestamp)
def main() -> None:
logging.basicConfig(level=logging.INFO)
context_client = ContextClient()
context_client.connect()
event_collector = BaseEventCollector()
event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True)
event_collector.start()
time.sleep(60)
event_collector.stop()
context_client.close()
if __name__ == '__main__':
main()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# See usage example below
import logging, queue, threading, time
from typing import Any, Callable, Optional
from common.proto.context_pb2 import DeviceEvent, Empty, EventTypeEnum, LinkEvent
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
class BaseEventDispatcher(threading.Thread):
def __init__(
self, events_queue : queue.PriorityQueue,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(daemon=True)
self._events_queue = events_queue
self._terminate = threading.Event() if terminate is None else terminate
def stop(self):
self._terminate.set()
def _get_event(self, block : bool = True, timeout : Optional[float] = 0.5) -> Optional[Any]:
try:
_, event = self._events_queue.get(block=block, timeout=timeout)
return event
except queue.Empty:
return None
def _get_dispatcher(self, event : Any) -> Optional[Callable]:
object_name = str(event.__class__.__name__).lower().replace('event', '')
event_type = EventTypeEnum.Name(event.event.event_type).lower().replace('eventtype_', '')
method_name = 'dispatch_{:s}_{:s}'.format(object_name, event_type)
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
method_name = 'dispatch_{:s}'.format(object_name)
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
method_name = 'dispatch'
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
return None
def run(self) -> None:
while not self._terminate.is_set():
event = self._get_event()
if event is None: continue
dispatcher = self._get_dispatcher(event)
if dispatcher is None:
MSG = 'No dispatcher available for Event({:s})'
LOGGER.warning(MSG.format(grpc_message_to_json_string(event)))
continue
dispatcher(event)
class MyEventDispatcher(BaseEventDispatcher):
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_link(self, link_event : LinkEvent) -> None:
MSG = 'Processing Link Create/Update/Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(link_event)))
def dispatch(self, event : Any) -> None:
MSG = 'Processing any other Event: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
def main() -> None:
logging.basicConfig(level=logging.INFO)
context_client = ContextClient()
context_client.connect()
event_collector = BaseEventCollector()
event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True)
event_collector.start()
event_dispatcher = MyEventDispatcher(event_collector.get_events_queue())
event_dispatcher.start()
time.sleep(60)
event_dispatcher.stop()
event_collector.stop()
context_client.close()
if __name__ == '__main__':
main()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading, time
from typing import Optional
from common.proto.context_pb2 import DeviceEvent, Empty, ServiceEvent
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
class EventCollector(BaseEventCollector):
pass
class EventDispatcher(BaseEventDispatcher):
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_service_create(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_update(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_remove(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
class ExampleEventEngine:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._terminate = threading.Event() if terminate is None else terminate
self._context_client = ContextClient()
self._event_collector = EventCollector(terminate=self._terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetLinkEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetServiceEvents, Empty(),
log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(),
terminate=self._terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_collector.start()
self._event_dispatcher.start()
def stop(self) -> None:
self._terminate.set()
self._event_dispatcher.stop()
self._event_collector.stop()
self._context_client.close()
def main() -> None:
logging.basicConfig(level=logging.INFO)
event_engine = ExampleEventEngine()
event_engine.start()
time.sleep(60)
event_engine.stop()
if __name__ == '__main__':
main()
......@@ -46,6 +46,7 @@ class KafkaTopic(Enum):
RAW = 'topic_raw'
LABELED = 'topic_labeled'
VALUE = 'topic_value'
ALARMS = 'topic_alarms'
ANALYTICS_REQUEST = 'topic_request_analytics'
ANALYTICS_RESPONSE = 'topic_response_analytics'
......@@ -89,4 +90,4 @@ class KafkaTopic(Enum):
return False
return True
# create all topics after the deployments (Telemetry and Analytics)
# TODO: create all topics after the deployments (Telemetry and Analytics)
......@@ -112,9 +112,9 @@ unit_test context:
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
## Deployment of the service in Kubernetes Cluster
#deploy context:
......
......@@ -69,7 +69,8 @@ def compose_constraints_data(
constraint_name = '{:s}:{:s}:{:s}'.format(parent_kind, kind.value, endpoint_uuid)
elif kind in {
ConstraintKindEnum.SCHEDULE, ConstraintKindEnum.SLA_CAPACITY, ConstraintKindEnum.SLA_LATENCY,
ConstraintKindEnum.SLA_AVAILABILITY, ConstraintKindEnum.SLA_ISOLATION, ConstraintKindEnum.EXCLUSIONS
ConstraintKindEnum.SLA_AVAILABILITY, ConstraintKindEnum.SLA_ISOLATION, ConstraintKindEnum.EXCLUSIONS,
ConstraintKindEnum.QOS_PROFILE
}:
constraint_name = '{:s}:{:s}:'.format(parent_kind, kind.value)
else:
......
......@@ -31,6 +31,7 @@ class ConstraintKindEnum(enum.Enum):
SLA_LATENCY = 'sla_latency'
SLA_AVAILABILITY = 'sla_availability'
SLA_ISOLATION = 'sla_isolation'
QOS_PROFILE = 'qos_profile'
EXCLUSIONS = 'exclusions'
class ServiceConstraintModel(_Base):
......
......@@ -18,8 +18,9 @@ from _pytest.terminal import TerminalReporter
from typing import Tuple
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name,
get_service_port_grpc, get_service_port_http)
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_service_port_grpc
)
from common.message_broker.Factory import get_messagebroker_backend
from common.message_broker.MessageBroker import MessageBroker
from common.method_wrappers.Decorator import MetricsPool
......@@ -30,11 +31,9 @@ from context.service.database.models._Base import rebuild_database
LOCAL_HOST = '127.0.0.1'
GRPC_PORT = 10000 + int(get_service_port_grpc(ServiceNameEnum.CONTEXT)) # avoid privileged ports
HTTP_PORT = 10000 + int(get_service_port_http(ServiceNameEnum.CONTEXT)) # avoid privileged ports
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(GRPC_PORT)
os.environ[get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(HTTP_PORT)
@pytest.fixture(scope='session')
def context_db_mb(request) -> Tuple[sqlalchemy.engine.Engine, MessageBroker]: # pylint: disable=unused-argument
......
......@@ -251,8 +251,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
device_id = context_client.SetDevice(device)
device = context_client.GetDevice(device_id)
if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
device.device_operational_status = request.device_operational_status
ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
environment_variables = set(os.environ.keys())
if ztp_service_host in environment_variables:
# ZTP component is deployed; accept status updates
if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
device.device_operational_status = request.device_operational_status
else:
# ZTP is not deployed; activated during AddDevice and not modified
pass
t4 = time.time()
# TODO: use of datastores (might be virtual ones) to enable rollbacks
......
......@@ -83,6 +83,7 @@ unit_test kpi-manager:
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME crdb
- docker volume rm -f crdb
- docker network rm teraflowbridge
- docker volume prune --force
......
......@@ -14,39 +14,16 @@
import logging
# from kpi_manager.database.Kpi_DB import KpiDB
from common.proto.kpi_manager_pb2 import KpiDescriptorList
from .test_messages import create_kpi_filter_request
from kpi_manager.database.KpiModel import Kpi as KpiModel
from kpi_manager.database.KpiDB import KpiDB
# from common.tools.database.GenericDatabase import Database
from kpi_manager.database.KpiModel import Kpi as KpiModel
LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB()
kpiDBobj = KpiDB(KpiModel)
# kpiDBobj.drop_database()
# kpiDBobj.verify_tables()
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()
# def test_generic_DB_select_method():
# LOGGER.info("--> STARTED-test_generic_DB_select_method")
# kpi_obj = KpiDB()
# _filter = create_kpi_filter_request()
# # response = KpiDescriptorList()
# try:
# kpi_obj.select_with_filter(KpiModel, _filter)
# except Exception as e:
# LOGGER.error('Unable to apply filter on kpi descriptor. {:}'.format(e))
# LOGGER.info("--> FINISHED-test_generic_DB_select_method")
# # try:
# # for row in rows:
# # kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
# # response.kpi_descriptor_list.append(kpiDescriptor_obj)
# # return response
# # except Exception as e:
# # LOGGER.info('Unable to process filter response {:}'.format(e))
# # assert isinstance(r)
......@@ -138,10 +138,3 @@ def test_SelectKpiDescriptor(kpi_manager_client):
response = kpi_manager_client.SelectKpiDescriptor(create_kpi_filter_request())
LOGGER.info("Response gRPC message object: {:}".format(response))
assert isinstance(response, KpiDescriptorList)
# def test_set_list_of_KPIs(kpi_manager_client):
# LOGGER.debug(" >>> test_set_list_of_KPIs: START <<< ")
# KPIs_TO_SEARCH = ["node_in_power_total", "node_in_current_total", "node_out_power_total"]
# # adding KPI
# for kpi in KPIs_TO_SEARCH:
# kpi_manager_client.SetKpiDescriptor(create_kpi_descriptor_request_a(kpi))
......@@ -27,6 +27,8 @@ def create_kpi_id_request():
def create_kpi_descriptor_request(descriptor_name: str = "Test_name"):
_create_kpi_request = kpi_manager_pb2.KpiDescriptor()
_create_kpi_request.kpi_id.kpi_id.uuid = str(uuid.uuid4())
# _create_kpi_request.kpi_id.kpi_id.uuid = "6e22f180-ba28-4641-b190-2287bf448888"
# _create_kpi_request.kpi_id.kpi_id.uuid = "1e22f180-ba28-4641-b190-2287bf446666"
_create_kpi_request.kpi_description = descriptor_name
_create_kpi_request.kpi_sample_type = KpiSampleType.KPISAMPLETYPE_PACKETS_RECEIVED
_create_kpi_request.device_id.device_uuid.uuid = 'DEV2'
......
......@@ -59,9 +59,11 @@ unit_test kpi-value-api:
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
......@@ -84,6 +86,8 @@ unit_test kpi-value-api:
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker rm -f kafka
- docker rm -f zookeeper
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
......