Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 876 additions and 11 deletions
......@@ -16,14 +16,23 @@
package org.etsi.tfs.policy.policy.model;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class AlarmTopicDTO {
private String startTimestamp;
private String endTimestamp;
@JsonProperty("window_start")
private String windowStart;
@JsonProperty("THRESHOLD_FALL")
private boolean thresholdFall;
@JsonProperty("THRESHOLD_RAISE")
private boolean thresholdRaise;
private String value;
@JsonProperty("kpi_id")
private String kpiId;
private Map<String, Boolean> alarms;
}
......@@ -76,10 +76,11 @@ mp:
messaging:
incoming:
topic-alarms:
failure-strategy: ignore
connector: smallrye-kafka
topic: topic-alarms
value:
deserializer: org.etsi.tfs.policy.policy.kafka.TopicAlarmDeserializer
kafka:
bootstrap:
servers: ${quarkus.kubernetes.env.vars.kafka-broker-host}:9092
\ No newline at end of file
servers: ${quarkus.kubernetes.env.vars.kafka-broker-host}:9092
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
build app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build app
- unit_test service
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 10070:10070 -p 8005:8005 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/qkd_app
WORKDIR /var/teraflow/qkd_app
COPY src/qkd_app/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/service/. service/
COPY src/qkd_app/. qkd_app/
# Start the service
ENTRYPOINT ["python", "-m", "qkd_app.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList
from common.proto.qkd_app_pb2_grpc import AppServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class QKDAppClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.QKD_APP)
if not port: port = get_service_port_grpc(ServiceNameEnum.QKD_APP)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = AppServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def RegisterApp(self, request : App) -> Empty:
LOGGER.debug('RegisterApp request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RegisterApp(request)
LOGGER.debug('RegisterApp result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ListApps(self, request: ContextId) -> AppList:
LOGGER.debug('ListApps request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ListApps(request)
LOGGER.debug('ListApps result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Flask==2.1.3
Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9
jsonschema==4.4.0
requests==2.27.1
werkzeug==2.3.7
nats-py==2.6.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
......@@ -4,7 +4,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
......@@ -13,28 +13,25 @@
# limitations under the License.
import logging, sqlalchemy
from common.Settings import get_setting
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
from common.proto.qkd_app_pb2_grpc import add_AppServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from qkd_app.service.QKDAppServiceServicerImpl import AppServiceServicerImpl
# Custom gRPC settings
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
LOGGER = logging.getLogger(__name__)
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class TelemetryEngine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = "tfs-telemetry" # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' TelemetryDB initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore
return engine
class AppService(GenericGrpcService):
def __init__(
self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker, cls_name: str = __name__
) -> None:
port = get_service_port_grpc(ServiceNameEnum.QKD_APP)
super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
self.app_servicer = AppServiceServicerImpl(db_engine, messagebroker)
def install_servicers(self):
add_AppServiceServicer_to_server(self.app_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, sqlalchemy
#from typing import Iterator, Optional
from common.message_broker.MessageBroker import MessageBroker
import grpc, json, logging #, deepdiff
from common.proto.context_pb2 import (
Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId)
from common.proto.qkd_app_pb2 import (App, AppId, AppList, QKDAppTypesEnum)
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
#from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain
#from common.tools.grpc.ConfigRules import copy_config_rules
#from common.tools.grpc.Constraints import copy_constraints
#from common.tools.grpc.EndPointIds import copy_endpoint_ids
#from common.tools.grpc.ServiceIds import update_service_ids
#from common.tools.grpc.Tools import grpc_message_to_json_string
#from context.client.ContextClient import ContextClient
#from qkd_app.client.QKDAppClient import QKDAppClient
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server
from common.method_wrappers.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('QkdApp', 'RPC')
# Optare: This file must be edited based on app's logic
class AppServiceServicerImpl(AppServiceServicer):
def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker):
LOGGER.debug('Creating Servicer...')
self.db_engine = db_engine
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RegisterApp(self, request : App, context : grpc.ServicerContext) -> Empty:
# Optare: This is the main function required for the project.
# Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too
# Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
app_set(self.db_engine, self.messagebroker, request)
else:
try:
app = app_get_by_server(self.db_engine, request.server_app_id)
except NotFoundException:
app = request
app_set(self.db_engine, self.messagebroker, app)
else:
app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, app)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListApps(self, request: ContextId, context : grpc.ServicerContext) -> AppList:
return app_list_objs(self.db_engine)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
#from common.Constants import ServiceNameEnum
from common.Settings import (
#ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port, wait_for_environment_variables)
from qkd_app.service.QKDAppService import AppService
from qkd_app.service.rest_server.RestServer import RestServer
from qkd_app.service.rest_server.qkd_app import register_qkd_app
#from common.message_broker.Factory import get_messagebroker_backend
#from common.message_broker.MessageBroker import MessageBroker
from qkd_app.service.database.Engine import Engine
from qkd_app.service.database.models._Base import rebuild_database
terminate = threading.Event()
LOGGER : logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Get Database Engine instance and initialize database, if needed
LOGGER.info('Getting SQLAlchemy DB Engine...')
db_engine = Engine.get_engine()
if db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return -1
try:
Engine.create_database(db_engine)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))
rebuild_database(db_engine)
# Get message broker instance
messagebroker = None #MessageBroker(get_messagebroker_backend())
# Starting context service
grpc_service = AppService(db_engine, messagebroker)
grpc_service.start()
rest_server = RestServer()
register_qkd_app(rest_server)
rest_server.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
......@@ -12,29 +12,44 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy
import logging, sqlalchemy, sqlalchemy_utils
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # true: dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class KpiEngine:
class Engine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_DATABASE = get_setting('CRDB_DATABASE_APP')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri))
engine = sqlalchemy.create_engine(
crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore
return None
return engine
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime, logging, uuid
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, selectinload, sessionmaker
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, List, Optional, Set, Tuple
from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.message_broker.MessageBroker import MessageBroker
from common.proto.context_pb2 import Empty
from common.proto.qkd_app_pb2 import (
AppList, App, AppId)
from common.tools.grpc.Tools import grpc_message_to_json_string
from .models.QKDAppModel import AppModel
from .models.enums.QKDAppStatus import grpc_to_enum__qkd_app_status
from .models.enums.QKDAppTypes import grpc_to_enum__qkd_app_types
from .uuids.QKDApp import app_get_uuid
from common.tools.object_factory.Context import json_context_id
from common.tools.object_factory.QKDApp import json_app_id
from context.service.database.uuids.Context import context_get_uuid
#from .Events import notify_event_context, notify_event_device, notify_event_topology
LOGGER = logging.getLogger(__name__)
def app_list_objs(db_engine : Engine) -> AppList:
def callback(session : Session) -> List[Dict]:
obj_list : List[AppModel] = session.query(AppModel)\
.all()
return [obj.dump() for obj in obj_list]
apps = run_transaction(sessionmaker(bind=db_engine), callback)
return AppList(apps=apps)
def app_get(db_engine : Engine, request : AppId) -> App:
app_uuid = app_get_uuid(request, allow_random=False)
def callback(session : Session) -> Optional[Dict]:
obj : Optional[AppModel] = session.query(AppModel)\
.filter_by(app_uuid=app_uuid).one_or_none()
return None if obj is None else obj.dump()
obj = run_transaction(sessionmaker(bind=db_engine), callback)
if obj is None:
raw_app_uuid = request.app_uuid.uuid
raise NotFoundException('App', raw_app_uuid, extra_details=[
'app_uuid generated was: {:s}'.format(app_uuid)
])
return App(**obj)
def app_set(db_engine : Engine, messagebroker : MessageBroker, request : App) -> AppId:
context_uuid = context_get_uuid(request.app_id.context_id, allow_random=False)
raw_app_uuid = request.app_id.app_uuid.uuid
app_uuid = app_get_uuid(request.app_id, allow_random=True)
app_type = request.app_type
app_status = grpc_to_enum__qkd_app_status(request.app_status)
app_type = grpc_to_enum__qkd_app_types(request.app_type)
now = datetime.datetime.utcnow()
app_data = [{
'context_uuid' : context_uuid,
'app_uuid' : app_uuid,
'app_status' : app_status,
'app_type' : app_type,
'server_app_id' : request.server_app_id,
'client_app_id' : request.client_app_id,
'backing_qkdl_uuid' : [qkdl_id.qkdl_uuid.uuid for qkdl_id in request.backing_qkdl_id],
'local_device_uuid' : request.local_device_id.device_uuid.uuid,
'remote_device_uuid' : request.remote_device_id.device_uuid.uuid or None,
'created_at' : now,
'updated_at' : now,
}]
def callback(session : Session) -> Tuple[bool, List[Dict]]:
stmt = insert(AppModel).values(app_data)
stmt = stmt.on_conflict_do_update(
index_elements=[AppModel.app_uuid],
set_=dict(
app_status = stmt.excluded.app_status,
app_type = stmt.excluded.app_type,
server_app_id = stmt.excluded.server_app_id,
client_app_id = stmt.excluded.client_app_id,
backing_qkdl_uuid = stmt.excluded.backing_qkdl_uuid,
local_device_uuid = stmt.excluded.local_device_uuid,
remote_device_uuid = stmt.excluded.remote_device_uuid,
updated_at = stmt.excluded.updated_at,
)
)
stmt = stmt.returning(AppModel.created_at, AppModel.updated_at)
created_at,updated_at = session.execute(stmt).fetchone()
updated = updated_at > created_at
return updated
updated = run_transaction(sessionmaker(bind=db_engine), callback)
context_id = json_context_id(context_uuid)
app_id = json_app_id(app_uuid, context_id=context_id)
#event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#notify_event_app(messagebroker, event_type, app_id)
#notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
return AppId(**app_id)
def app_get_by_server(db_engine : Engine, request : str) -> App:
def callback(session : Session) -> Optional[Dict]:
obj : Optional[AppModel] = session.query(AppModel)\
.filter_by(server_app_id=request).one_or_none()
return None if obj is None else obj.dump()
obj = run_transaction(sessionmaker(bind=db_engine), callback)
if obj is None:
raise NotFoundException('No app match found for', request)
return App(**obj)
"""
def device_delete(db_engine : Engine, messagebroker : MessageBroker, request : DeviceId) -> Empty:
device_uuid = device_get_uuid(request, allow_random=False)
def callback(session : Session) -> Tuple[bool, List[Dict]]:
query = session.query(TopologyDeviceModel)
query = query.filter_by(device_uuid=device_uuid)
topology_device_list : List[TopologyDeviceModel] = query.all()
topology_ids = [obj.topology.dump_id() for obj in topology_device_list]
num_deleted = session.query(DeviceModel).filter_by(device_uuid=device_uuid).delete()
return num_deleted > 0, topology_ids
deleted, updated_topology_ids = run_transaction(sessionmaker(bind=db_engine), callback)
device_id = json_device_id(device_uuid)
if deleted:
notify_event_device(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, device_id)
context_ids : Dict[str, Dict] = dict()
topology_ids : Dict[str, Dict] = dict()
for topology_id in updated_topology_ids:
topology_uuid = topology_id['topology_uuid']['uuid']
topology_ids[topology_uuid] = topology_id
context_id = topology_id['context_id']
context_uuid = context_id['context_uuid']['uuid']
context_ids[context_uuid] = context_id
for topology_id in topology_ids.values():
notify_event_topology(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, topology_id)
for context_id in context_ids.values():
notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
return Empty()
def device_select(db_engine : Engine, request : DeviceFilter) -> DeviceList:
device_uuids = [
device_get_uuid(device_id, allow_random=False)
for device_id in request.device_ids.device_ids
]
dump_params = dict(
include_endpoints =request.include_endpoints,
include_config_rules=request.include_config_rules,
include_components =request.include_components,
)
def callback(session : Session) -> List[Dict]:
query = session.query(DeviceModel)
if request.include_endpoints : query = query.options(selectinload(DeviceModel.endpoints))
if request.include_config_rules: query = query.options(selectinload(DeviceModel.config_rules))
#if request.include_components : query = query.options(selectinload(DeviceModel.components))
obj_list : List[DeviceModel] = query.filter(DeviceModel.device_uuid.in_(device_uuids)).all()
return [obj.dump(**dump_params) for obj in obj_list]
devices = run_transaction(sessionmaker(bind=db_engine), callback)
return DeviceList(devices=devices)
"""
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import operator
from sqlalchemy import CheckConstraint, Column, DateTime, Float, Enum, ForeignKey, Integer, String
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from sqlalchemy.orm import relationship
from typing import Dict
from ._Base import _Base
from .enums.QKDAppStatus import ORM_QKDAppStatusEnum
from .enums.QKDAppTypes import ORM_QKDAppTypesEnum
class AppModel(_Base):
__tablename__ = 'qkd_app'
app_uuid = Column(UUID(as_uuid=False), primary_key=True)
context_uuid = Column(UUID(as_uuid=False), nullable=False) # Supposed to be Foreign Key
app_status = Column(Enum(ORM_QKDAppStatusEnum), nullable=False)
app_type = Column(Enum(ORM_QKDAppTypesEnum), nullable=False)
server_app_id = Column(String, nullable=False)
client_app_id = Column(ARRAY(String), nullable=False)
backing_qkdl_uuid = Column(ARRAY(UUID(as_uuid=False)), nullable=False)
local_device_uuid = Column(UUID(as_uuid=False), nullable=False)
remote_device_uuid = Column(UUID(as_uuid=False), nullable=True)
# Optare: Created_at and Updated_at are only used to know if an app was updated later on the code. Don't change it
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
#__table_args__ = (
# CheckConstraint(... >= 0, name='name_value_...'),
#)
def dump_id(self) -> Dict:
return {
'context_id': {'context_uuid': {'uuid': self.context_uuid}},
'app_uuid': {'uuid': self.app_uuid}
}
def dump(self) -> Dict:
result = {
'app_id' : self.dump_id(),
'app_status' : self.app_status.value,
'app_type' : self.app_type.value,
'server_app_id' : self.server_app_id,
'client_app_id' : self.client_app_id,
'backing_qkdl_id' : [{'qkdl_uuid': {'uuid': qkdl_id}} for qkdl_id in self.backing_qkdl_uuid],
'local_device_id' : {'device_uuid': {'uuid': self.local_device_uuid}},
'remote_device_id' : {'device_uuid': {'uuid': self.remote_device_uuid}},
}
return result
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy
from typing import Any, List
from sqlalchemy.orm import Session, sessionmaker, declarative_base
from sqlalchemy.sql import text
from sqlalchemy_cockroachdb import run_transaction
_Base = declarative_base()
'''
def create_performance_enhancers(db_engine : sqlalchemy.engine.Engine) -> None:
def index_storing(
index_name : str, table_name : str, index_fields : List[str], storing_fields : List[str]
) -> Any:
str_index_fields = ','.join(['"{:s}"'.format(index_field) for index_field in index_fields])
str_storing_fields = ','.join(['"{:s}"'.format(storing_field) for storing_field in storing_fields])
INDEX_STORING = 'CREATE INDEX IF NOT EXISTS {:s} ON "{:s}" ({:s}) STORING ({:s});'
return text(INDEX_STORING.format(index_name, table_name, str_index_fields, str_storing_fields))
statements = [
# In case of relations
]
def callback(session : Session) -> bool:
for stmt in statements: session.execute(stmt)
run_transaction(sessionmaker(bind=db_engine), callback)
'''
def rebuild_database(db_engine : sqlalchemy.engine.Engine, drop_if_exists : bool = False):
if drop_if_exists: _Base.metadata.drop_all(db_engine)
_Base.metadata.create_all(db_engine)
#create_performance_enhancers(db_engine)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, functools
from common.proto.qkd_app_pb2 import QKDAppStatusEnum
from ._GrpcToEnum import grpc_to_enum
class ORM_QKDAppStatusEnum(enum.Enum):
ON = QKDAppStatusEnum.QKDAPPSTATUS_ON
DISCONNECTED = QKDAppStatusEnum.QKDAPPSTATUS_DISCONNECTED
OUT_OF_TIME = QKDAppStatusEnum.QKDAPPSTATUS_OUT_OF_TIME
ZOMBIE = QKDAppStatusEnum.QKDAPPSTATUS_ZOMBIE
grpc_to_enum__qkd_app_status = functools.partial(
grpc_to_enum, QKDAppStatusEnum, ORM_QKDAppStatusEnum)