Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 382 additions and 138 deletions
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, functools
from common.proto.qkd_app_pb2 import QKDAppTypesEnum
from ._GrpcToEnum import grpc_to_enum
class ORM_QKDAppTypesEnum(enum.Enum):
INTERNAL = QKDAppTypesEnum.QKDAPPTYPES_INTERNAL
CLIENT = QKDAppTypesEnum.QKDAPPTYPES_CLIENT
grpc_to_enum__qkd_app_types = functools.partial(
grpc_to_enum, QKDAppTypesEnum, ORM_QKDAppTypesEnum)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from enum import Enum
from typing import Optional
# Enumeration classes are redundant with gRPC classes, but gRPC does not provide a programmatical method to retrieve
# the values it expects from strings containing the desired value symbol or its integer value, so a kind of mapping is
# required. Besides, ORM Models expect Enum classes in EnumeratedFields; we create specific and conveniently defined
# Enum classes to serve both purposes.
def grpc_to_enum(grpc_enum_class, orm_enum_class : Enum, grpc_enum_value, grpc_enum_prefix : Optional[str] = None):
enum_name = grpc_enum_class.Name(grpc_enum_value)
if grpc_enum_prefix is None:
grpc_enum_prefix = orm_enum_class.__name__.upper()
#grpc_enum_prefix = re.sub(r'^ORM_(.+)$', r'\1', grpc_enum_prefix)
#grpc_enum_prefix = re.sub(r'^(.+)ENUM$', r'\1', grpc_enum_prefix)
#grpc_enum_prefix = grpc_enum_prefix + '_'
grpc_enum_prefix = re.sub(r'^ORM_(.+)ENUM$', r'\1_', grpc_enum_prefix)
if len(grpc_enum_prefix) > 0:
enum_name = enum_name.replace(grpc_enum_prefix, '')
orm_enum_value = orm_enum_class._member_map_.get(enum_name)
return orm_enum_value
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.proto.qkd_app_pb2 import AppId
from common.method_wrappers.ServiceExceptions import InvalidArgumentsException
from ._Builder import get_uuid_from_string, get_uuid_random
def app_get_uuid(
app_id : AppId, allow_random : bool = False
) -> str:
app_uuid = app_id.app_uuid.uuid
if len(app_uuid) > 0:
return get_uuid_from_string(app_uuid)
if allow_random: return get_uuid_random()
raise InvalidArgumentsException([
('app_id.app_uuid.uuid', app_uuid),
], extra_details=['At least one is required to produce a App UUID'])
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Union
from uuid import UUID, uuid4, uuid5
# Generate a UUIDv5-like from the SHA-1 of "TFS" and no namespace to be used as the NAMESPACE for all
# the context UUIDs generated. For efficiency purposes, the UUID is hardcoded; however, it is produced
# using the following code:
# from hashlib import sha1
# from uuid import UUID
# hash = sha1(bytes('TFS', 'utf-8')).digest()
# NAMESPACE_TFS = UUID(bytes=hash[:16], version=5)
NAMESPACE_TFS = UUID('200e3a1f-2223-534f-a100-758e29c37f40')
def get_uuid_from_string(str_uuid_or_name : Union[str, UUID], prefix_for_name : Optional[str] = None) -> str:
# if UUID given, assume it is already a valid UUID
if isinstance(str_uuid_or_name, UUID): return str_uuid_or_name
if not isinstance(str_uuid_or_name, str):
MSG = 'Parameter({:s}) cannot be used to produce a UUID'
raise Exception(MSG.format(str(repr(str_uuid_or_name))))
try:
# try to parse as UUID
return str(UUID(str_uuid_or_name))
except: # pylint: disable=bare-except
# produce a UUID within TFS namespace from parameter
if prefix_for_name is not None:
str_uuid_or_name = '{:s}/{:s}'.format(prefix_for_name, str_uuid_or_name)
return str(uuid5(NAMESPACE_TFS, str_uuid_or_name))
def get_uuid_random() -> str:
# Generate random UUID. No need to use namespace since "namespace + random = random".
return str(uuid4())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
from common.tools.service.GenericRestServer import GenericRestServer
class RestServer(GenericRestServer):
def __init__(self, cls_name: str = __name__) -> None:
bind_port = get_service_port_http(ServiceNameEnum.QKD_APP)
base_url = get_service_baseurl_http(ServiceNameEnum.QKD_APP)
super().__init__(bind_port, base_url, cls_name=cls_name)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid, json
from flask import request
from flask_restful import Resource
from common.proto.context_pb2 import Empty
from common.proto.qkd_app_pb2 import App, QKDAppTypesEnum
from common.Constants import DEFAULT_CONTEXT_NAME
from context.client.ContextClient import ContextClient
from qkd_app.client.QKDAppClient import QKDAppClient
class _Resource(Resource):
def __init__(self) -> None:
super().__init__()
self.context_client = ContextClient()
self.qkd_app_client = QKDAppClient()
class Index(_Resource):
def get(self):
return {'hello': 'world'}
class CreateQKDApp(_Resource):
# Optare: Post request for the QKD Node to call the TeraflowSDN. Example of requests below
def post(self):
app = request.get_json()['app']
devices = self.context_client.ListDevices(Empty())
devices = devices.devices
local_device = None
# This for-loop won't be necessary if we can garantee Device ID is the same as QKDN Id
for device in devices:
for config_rule in device.device_config.config_rules:
if config_rule.custom.resource_key == '__node__':
value = json.loads(config_rule.custom.resource_value)
qkdn_id = value['qkdn_id']
if app['local_qkdn_id'] == qkdn_id:
local_device = device
break
# Optare: Todo: Verify that a service is present for this app
'''
requests.post('http://10.211.36.220/app/create_qkd_app', json={'app': {'server_app_id':'1', 'client_app_id':[], 'app_status':'ON', 'local_qkdn_id':'00000001-0000-0000-0000-000000000000', 'backing_qkdl_id':['00000003-0002-0000-0000-000000000000']}})
requests.post('http://10.211.36.220/app/create_qkd_app', json={'app': {'server_app_id':'1', 'client_app_id':[], 'app_status':'ON', 'local_qkdn_id':'00000003-0000-0000-0000-000000000000', 'backing_qkdl_id':['00000003-0002-0000-0000-000000000000']}})
'''
if local_device is None:
return {"status": "fail"}
external_app_src_dst = {
'app_id': {'context_id': {'context_uuid': {'uuid': DEFAULT_CONTEXT_NAME}}, 'app_uuid': {'uuid': ''}},
'app_status': 'QKDAPPSTATUS_' + app['app_status'],
'app_type': QKDAppTypesEnum.QKDAPPTYPES_CLIENT,
'server_app_id': app['server_app_id'],
'client_app_id': app['client_app_id'],
'backing_qkdl_id': [{'qkdl_uuid': {'uuid': qkdl_id}} for qkdl_id in app['backing_qkdl_id']],
'local_device_id': local_device.device_id,
'remote_device_id': {'device_uuid': {'uuid': ''}},
}
# Optare: This will call our internal RegisterApp which supports the creation of both internal and external app.
# Optare the verification for knowing if two parties are requesting the same app is done inside RegisterApp's function
self.qkd_app_client.RegisterApp(App(**external_app_src_dst))
# Optare: Todo: Communicate by SBI with both Nodes of the new App
return {"status": "success"}
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from qkd_app.service.rest_server.RestServer import RestServer
from .Resources import (
CreateQKDApp, Index)
URL_PREFIX = '/qkd_app'
# Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type.
RESOURCES = [
# (endpoint_name, resource_class, resource_url)
('api.index', Index, '/'),
('api.register_qkd_app', CreateQKDApp, '/create_qkd_app'),
]
def register_qkd_app(app_server : RestServer):
for endpoint_name, resource_class, resource_url in RESOURCES:
app_server.add_resource(resource_class, URL_PREFIX + resource_url, endpoint=endpoint_name)
......@@ -70,6 +70,8 @@ COPY src/pathcomp/frontend/__init__.py pathcomp/frontend/__init__.py
COPY src/pathcomp/frontend/client/. pathcomp/frontend/client/
COPY src/e2e_orchestrator/__init__.py e2e_orchestrator/__init__.py
COPY src/e2e_orchestrator/client/. e2e_orchestrator/client/
COPY src/qkd_app/__init__.py qkd_app/__init__.py
COPY src/qkd_app/client/. qkd_app/client/
COPY src/service/. service/
# Start the service
......
......@@ -30,6 +30,9 @@ from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_s
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.Topology import json_topology_id
from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME
from common.Settings import (
is_deployed_e2e_orch, is_deployed_optical, is_deployed_te
)
from context.client.ContextClient import ContextClient
from e2e_orchestrator.client.E2EOrchestratorClient import E2EOrchestratorClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
......@@ -142,7 +145,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
service.service_type = request.service_type # pylint: disable=no-member
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED # pylint: disable=no-member
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
if is_deployed_te() and service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service:
context_client.SetService(request)
......@@ -164,7 +167,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
str_service_status = ServiceStatusEnum.Name(service_status.service_status)
raise Exception(MSG.format(service_key, str_service_status))
if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E:
if is_deployed_e2e_orch() and service.service_type == ServiceTypeEnum.SERVICETYPE_E2E:
# End-to-End service:
service_id_with_uuids = context_client.SetService(request)
......@@ -248,7 +251,7 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
tasks_scheduler = TasksScheduler(self.service_handler_factory)
if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
if is_deployed_optical() and service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
topology_id_x = json_topology_id(
DEFAULT_TOPOLOGY_NAME, context_id_x)
......@@ -341,14 +344,14 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PENDING_REMOVAL
context_client.SetService(service)
if service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
if is_deployed_te() and service.service_type == ServiceTypeEnum.SERVICETYPE_TE:
# TE service
te_service_client = TEServiceClient()
te_service_client.DeleteLSP(request)
context_client.RemoveService(request)
return Empty()
if service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
if is_deployed_optical() and service.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
devs = []
context_id_x = json_context_id(DEFAULT_CONTEXT_NAME)
......
......@@ -17,7 +17,7 @@ import json, logging, uuid
from typing import Any, Dict, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.proto.context_pb2 import ConfigRule, DeviceId, Service
from common.proto.app_pb2 import App, QKDAppStatusEnum, QKDAppTypesEnum
from common.proto.qkd_app_pb2 import App, QKDAppStatusEnum, QKDAppTypesEnum
from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type
......
......@@ -20,6 +20,7 @@ from common.proto.context_pb2 import (
Connection, ConnectionId, Device, DeviceDriverEnum, DeviceId, Service, ServiceId,
OpticalConfig, OpticalConfigId
)
from common.proto.qkd_app_pb2 import App
from common.tools.context_queries.Connection import get_connection_by_id
from common.tools.context_queries.Device import get_device
from common.tools.context_queries.Service import get_service_by_id
......@@ -27,11 +28,12 @@ from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.object_factory.Device import json_device_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from qkd_app.client.QKDAppClient import QKDAppClient
from service.service.service_handler_api.Exceptions import (
UnsatisfiedFilterException, UnsupportedFilterFieldException, UnsupportedFilterFieldValueException
)
from service.service.service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory, get_service_handler_class
from service.service.tools.ObjectKeys import get_connection_key, get_device_key, get_service_key
from service.service.tools.ObjectKeys import get_connection_key, get_device_key, get_service_key, get_app_key
if TYPE_CHECKING:
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
......@@ -44,11 +46,14 @@ class CacheableObjectType(Enum):
CONNECTION = 'connection'
DEVICE = 'device'
SERVICE = 'service'
QKD_APP = 'qkd-app'
class TaskExecutor:
def __init__(self, service_handler_factory : ServiceHandlerFactory) -> None:
self._service_handler_factory = service_handler_factory
self._context_client = ContextClient()
# DEPENDENCY QKD
self._qkd_app_client = QKDAppClient()
self._device_client = DeviceClient()
self._grpc_objects_cache : Dict[str, CacheableObject] = dict()
......@@ -220,3 +225,12 @@ class TaskExecutor:
str(dict_connection_devices)
)
)
# ----- QkdApp-related methods -------------------------------------------------------------------------------------
def register_app(self, app: App) -> None:
app_key = get_app_key(app.app_id)
self._qkd_app_client.RegisterApp(app)
LOGGER.info("reg registered")
self._store_grpc_object(CacheableObjectType.QKD_APP, app_key, app)
......@@ -13,6 +13,7 @@
# limitations under the License.
from common.proto.context_pb2 import ConnectionId, DeviceId, ServiceId
from common.proto.qkd_app_pb2 import AppId
def get_connection_key(connection_id : ConnectionId) -> str:
return connection_id.connection_uuid.uuid
......@@ -24,3 +25,7 @@ def get_service_key(service_id : ServiceId) -> str:
context_uuid = service_id.context_id.context_uuid.uuid
service_uuid = service_id.service_uuid.uuid
return '{:s}/{:s}'.format(context_uuid, service_uuid)
def get_app_key(app_id : AppId) -> str:
return app_id.app_uuid.uuid
......@@ -69,6 +69,7 @@ unit_test telemetry-backend:
- docker pull "bitnami/kafka:latest"
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- >
......@@ -94,12 +95,12 @@ unit_test telemetry-backend:
- docker exec -i ${IMAGE_NAME}-backend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f ${IMAGE_NAME}-backend
- docker rm -f kafka
- docker rm -f zookeeper
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-backend
- docker rm -f zookeeper
- docker rm -f kafka
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
......@@ -151,19 +152,20 @@ unit_test telemetry-frontend:
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS
- >
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
docker run --name zookeeper -d --network=teraflowbridge -p 2181:2181
--env ALLOW_ANONYMOUS_LOGIN=yes
bitnami/zookeeper:latest
- sleep 10 # Wait for Zookeeper to start
- docker run --name kafka -d --network=teraflowbridge -p 9092:9092
- >
docker run --name kafka -d --network=teraflowbridge -p 9092:9092
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env ALLOW_PLAINTEXT_LISTENER=yes
bitnami/kafka:latest
- sleep 20 # Wait for Kafka to start
- KAFKA_IP=$(docker inspect kafka --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $KAFKA_IP
# - docker logs zookeeper
# - docker logs kafka
- docker logs zookeeper
- docker logs kafka
- >
docker run --name $IMAGE_NAME-frontend -d -p 30050:30050
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
......@@ -180,13 +182,13 @@ unit_test telemetry-frontend:
- docker exec -i ${IMAGE_NAME}-frontend bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker volume rm -f crdb
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
- docker rm -f ${IMAGE_NAME}-frontend
- docker rm -f zookeeper
- docker rm -f kafka
- docker volume rm -f crdb
- docker volume prune --force
- docker image prune --force
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
......@@ -201,4 +203,4 @@ unit_test telemetry-frontend:
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml
\ No newline at end of file
junit: src/$IMAGE_NAME/frontend/tests/${IMAGE_NAME}-frontend_report.xml
......@@ -13,125 +13,32 @@
# limitations under the License.
import logging
import sqlalchemy_utils
from sqlalchemy import inspect
from sqlalchemy.orm import sessionmaker
from telemetry.database.TelemetryModel import Collector as CollectorModel
from telemetry.database.TelemetryEngine import TelemetryEngine
from common.method_wrappers.ServiceExceptions import (
OperationFailedException, AlreadyExistsException )
from common.method_wrappers.Decorator import MetricsPool
from common.tools.database.GenericDatabase import Database
from common.method_wrappers.ServiceExceptions import OperationFailedException
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_telemetry"
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('TelemteryFrontend', 'Database')
class TelemetryDB:
def __init__(self):
self.db_engine = TelemetryEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self):
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
try:
CollectorModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the database: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Tables cannot be created in the database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
inspect_object = inspect(self.db_engine)
if(inspect_object.has_table('collector', None)):
LOGGER.info("Table exists in DB: {:}".format(self.db_name))
except Exception as e:
LOGGER.info("Unable to fetch Table names. {:s}".format(str(e)))
# ----------------- CURD METHODs ---------------------
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def search_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
def delete_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
record = session.query(model).filter_by(**{col_name: id_to_search}).first()
if record:
session.delete(record)
session.commit()
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
class TelemetryDB(Database):
def __init__(self, model) -> None:
LOGGER.info('Init KpiManagerService')
super().__init__(model)
def select_with_filter(self, model, filter_object):
"""
Generic method to create filters dynamically based on filter_object attributes.
params: model: SQLAlchemy model class to query.
filter_object: Object that contains filtering criteria as attributes.
return: SQLAlchemy session, query and Model
"""
session = self.Session()
try:
query = session.query(CollectorModel)
# Apply filters based on the filter_object
query = session.query(model)
if filter_object.kpi_id:
query = query.filter(CollectorModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
result = query.all()
# query should be added to return all rows
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.warning(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
query = query.filter(model.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
LOGGER.error(f"Error creating filter of {model.__name__} table. ERROR: {e}")
raise OperationFailedException ("CreateKpiDescriptorFilter", extra_details=["unable to create the filter {:}".format(e)])
return super().select_with_filter(query, session, model)
......@@ -40,7 +40,7 @@ ACTIVE_COLLECTORS = [] # keep and can be populated from DB
class TelemetryFrontendServiceServicerImpl(TelemetryFrontendServiceServicer):
def __init__(self):
LOGGER.info('Init TelemetryFrontendService')
self.tele_db_obj = TelemetryDB()
self.tele_db_obj = TelemetryDB(CollectorModel)
self.kafka_producer = KafkaProducer({'bootstrap.servers' : KafkaConfig.get_kafka_address()})
self.kafka_consumer = KafkaConsumer({'bootstrap.servers' : KafkaConfig.get_kafka_address(),
'group.id' : 'frontend',
......
......@@ -16,6 +16,8 @@ import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from .TelemetryFrontendService import TelemetryFrontendService
from telemetry.database.TelemetryModel import Collector as Model
from common.tools.database.GenericDatabase import Database
terminate = threading.Event()
LOGGER = None
......@@ -36,6 +38,11 @@ def main():
LOGGER.info('Starting...')
# To create DB
kpiDBobj = Database(Model)
kpiDBobj.create_database()
kpiDBobj.create_tables()
# Start metrics server
# metrics_port = get_metrics_port()
# start_http_server(metrics_port)
......
......@@ -21,8 +21,8 @@ LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_tables():
LOGGER.info('>>> test_verify_databases_and_tables : START <<< ')
TelemetryDBobj = TelemetryDB()
TelemetryDBobj.drop_database()
TelemetryDBobj.verify_tables()
# TelemetryDBobj.drop_database()
# TelemetryDBobj.verify_tables()
TelemetryDBobj.create_database()
TelemetryDBobj.create_tables()
TelemetryDBobj.verify_tables()
\ No newline at end of file
TelemetryDBobj.verify_tables()
......@@ -28,6 +28,7 @@ from service.client.ServiceClient import ServiceClient
from .Objects import CONTEXT_ID, CONTEXTS, DEVICES, LINKS, TOPOLOGIES, SERVICES
from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId,\
DeviceOperationalStatusEnum
from common.tools.object_factory.Constraint import json_constraint_sla_latency
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
......@@ -62,4 +63,5 @@ def test_rules_entry(
service_p4 = copy.deepcopy(service)
service_client.CreateService(Service(**service_p4))
service_p4['service_endpoint_ids'].extend(endpoints)
service_p4['service_constraints'].extend([json_constraint_sla_latency(3)])
service_client.UpdateService(Service(**service_p4))