Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Commits on Source (30)
Showing
with 742 additions and 1 deletion
......@@ -62,3 +62,10 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(qkd_app/.*)
- pathType: Prefix
backend:
service:
name: qkdappservice
port:
number: 8005
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: qkdappservice
spec:
selector:
matchLabels:
app: qkdappservice
#replicas: 1
template:
metadata:
labels:
app: qkdappservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: labs.etsi.org:5050/tfs/controller/qkd_app:latest
imagePullPolicy: Always
ports:
- containerPort: 10060
- containerPort: 9192
- containerPort: 8005
env:
- name: MB_BACKEND
value: "nats"
- name: LOG_LEVEL
value: "INFO"
- name: CRDB_DATABASE_APP
value: "qkd_app"
envFrom:
- secretRef:
name: crdb-data
- secretRef:
name: nats-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
resources:
requests:
cpu: 150m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
---
apiVersion: v1
kind: Service
metadata:
name: qkdappservice
labels:
app: qkdappservice
spec:
type: ClusterIP
selector:
app: qkdappservice
ports:
- name: grpc
protocol: TCP
port: 10060
targetPort: 10060
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
- name: http
port: 8005
targetPort: 8005
......@@ -71,7 +71,14 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
#fi
# Uncomment to activate QKD App
#export TFS_COMPONENTS="${TFS_COMPONENTS} app"
# To manage QKD Apps, "service" requires "qkd_app" to be deployed
# before "service", thus we "hack" the TFS_COMPONENTS environment variable prepending the
# "qkd_app" only if "service" is already in TFS_COMPONENTS, and re-export it.
#if [[ "$TFS_COMPONENTS" == *"service"* ]]; then
# BEFORE="${TFS_COMPONENTS% service*}"
# AFTER="${TFS_COMPONENTS#* service}"
# export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}"
#fi
# Set the tag you want to use for your images.
......
syntax = "proto3";
package qkd_app;
import "context.proto";
// Optare: Change this if you want to change App's structure or enums.
// Optare: If a message (structure) is changed it must be changed in src/app/service/database
enum QKDAppStatusEnum {
QKDAPPSTATUS_ON = 0;
QKDAPPSTATUS_DISCONNECTED = 1;
QKDAPPSTATUS_OUT_OF_TIME = 2;
QKDAPPSTATUS_ZOMBIE = 3;
}
enum QKDAppTypesEnum {
QKDAPPTYPES_INTERNAL = 0;
QKDAPPTYPES_CLIENT = 1;
}
message QKDLId {
context.Uuid qkdl_uuid = 1;
}
message App {
AppId app_id = 1;
QKDAppStatusEnum app_status = 2;
QKDAppTypesEnum app_type = 3;
string server_app_id = 4;
repeated string client_app_id = 5;
repeated QKDLId backing_qkdl_id = 6;
context.DeviceId local_device_id = 7;
context.DeviceId remote_device_id = 8;
}
message AppId {
context.ContextId context_id = 1;
context.Uuid app_uuid = 2;
}
service AppService {
rpc RegisterApp(App) returns (context.Empty) {}
rpc ListApps (context.ContextId ) returns ( AppList ) {}
}
message AppList {
repeated App apps = 1;
}
......@@ -61,6 +61,7 @@ class ServiceNameEnum(Enum):
E2EORCHESTRATOR = 'e2e-orchestrator'
OPTICALCONTROLLER = 'opticalcontroller'
BGPLS = 'bgpls-speaker'
QKD_APP = 'qkd_app'
KPIMANAGER = 'kpi-manager'
KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer'
......@@ -96,6 +97,7 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.FORECASTER .value : 10040,
ServiceNameEnum.E2EORCHESTRATOR .value : 10050,
ServiceNameEnum.OPTICALCONTROLLER .value : 10060,
ServiceNameEnum.QKD_APP .value : 10070,
ServiceNameEnum.BGPLS .value : 20030,
ServiceNameEnum.KPIMANAGER .value : 30010,
ServiceNameEnum.KPIVALUEAPI .value : 30020,
......@@ -115,10 +117,12 @@ DEFAULT_SERVICE_HTTP_PORTS = {
ServiceNameEnum.CONTEXT .value : 8080,
ServiceNameEnum.NBI .value : 8080,
ServiceNameEnum.WEBUI .value : 8004,
ServiceNameEnum.QKD_APP .value : 8005,
}
# Default HTTP/REST-API service base URLs
DEFAULT_SERVICE_HTTP_BASEURLS = {
ServiceNameEnum.NBI .value : None,
ServiceNameEnum.WEBUI .value : None,
ServiceNameEnum.QKD_APP .value : None,
}
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from typing import Dict, List, Optional
from common.Constants import DEFAULT_CONTEXT_NAME
from common.tools.object_factory.Context import json_context_id
def json_app_id(app_uuid : str, context_id : Optional[Dict] = None) -> Dict:
result = {'app_uuid': {'uuid': app_uuid}}
if context_id is not None: result['context_id'] = copy.deepcopy(context_id)
return result
......@@ -42,6 +42,16 @@ def json_service(
'service_config' : {'config_rules': copy.deepcopy(config_rules)},
}
def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
):
return json_service(
service_uuid, ServiceTypeEnum.SERVICETYPE_QKD, context_id=json_context_id(context_uuid),
status=ServiceStatusEnum.SERVICESTATUS_PLANNED, endpoint_ids=endpoint_ids, constraints=constraints,
config_rules=config_rules)
def json_service_l2nm_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
......
......@@ -38,3 +38,4 @@ def test_qkd_driver_timeout_connection(mock_get, qkd_driver):
mock_get.side_effect = requests.exceptions.Timeout
qkd_driver.timeout = 0.001 # Simulate very short timeout
assert qkd_driver.Connect() is False
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
build app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build app
- unit_test service
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 10070:10070 -p 8005:8005 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/qkd_app
WORKDIR /var/teraflow/qkd_app
COPY src/qkd_app/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/service/. service/
COPY src/qkd_app/. qkd_app/
# Start the service
ENTRYPOINT ["python", "-m", "qkd_app.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList
from common.proto.qkd_app_pb2_grpc import AppServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class QKDAppClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.APP)
if not port: port = get_service_port_grpc(ServiceNameEnum.APP)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = AppServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def RegisterApp(self, request : App) -> Empty:
LOGGER.debug('RegisterApp request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RegisterApp(request)
LOGGER.debug('RegisterApp result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ListApps(self, request: ContextId) -> AppList:
LOGGER.debug('ListApps request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ListApps(request)
LOGGER.debug('ListApps result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Flask==2.1.3
Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9
jsonschema==4.4.0
requests==2.27.1
werkzeug==2.3.7
nats-py==2.6.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
from common.proto.qkd_app_pb2_grpc import add_AppServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from qkd_app.service.QKDAppServiceServicerImpl import AppServiceServicerImpl
# Custom gRPC settings
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
LOGGER = logging.getLogger(__name__)
class AppService(GenericGrpcService):
def __init__(
self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker, cls_name: str = __name__
) -> None:
port = get_service_port_grpc(ServiceNameEnum.APP)
super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
self.app_servicer = AppServiceServicerImpl(db_engine, messagebroker)
def install_servicers(self):
add_AppServiceServicer_to_server(self.app_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, sqlalchemy
#from typing import Iterator, Optional
from common.message_broker.MessageBroker import MessageBroker
import grpc, json, logging #, deepdiff
from common.proto.context_pb2 import (
Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId)
from common.proto.qkd_app_pb2 import (App, AppId, AppList, QKDAppTypesEnum)
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
#from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain
#from common.tools.grpc.ConfigRules import copy_config_rules
#from common.tools.grpc.Constraints import copy_constraints
#from common.tools.grpc.EndPointIds import copy_endpoint_ids
#from common.tools.grpc.ServiceIds import update_service_ids
#from common.tools.grpc.Tools import grpc_message_to_json_string
#from context.client.ContextClient import ContextClient
#from qkd_app.client.QKDAppClient import QKDAppClient
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server
from common.method_wrappers.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('QkdApp', 'RPC')
# Optare: This file must be edited based on app's logic
class AppServiceServicerImpl(AppServiceServicer):
def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker):
LOGGER.debug('Creating Servicer...')
self.db_engine = db_engine
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RegisterApp(self, request : App, context : grpc.ServicerContext) -> Empty:
# Optare: This is the main function required for the project.
# Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too
# Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
app_set(self.db_engine, self.messagebroker, request)
else:
try:
app = app_get_by_server(self.db_engine, request.server_app_id)
except NotFoundException:
app = request
app_set(self.db_engine, self.messagebroker, app)
else:
app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, app)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListApps(self, request: ContextId, context : grpc.ServicerContext) -> AppList:
return app_list_objs(self.db_engine)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
#from common.Constants import ServiceNameEnum
from common.Settings import (
#ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port, wait_for_environment_variables)
from qkd_app.service.QKDAppService import AppService
from qkd_app.service.rest_server.RestServer import RestServer
from qkd_app.service.rest_server.qkd_app import register_qkd_app
#from common.message_broker.Factory import get_messagebroker_backend
#from common.message_broker.MessageBroker import MessageBroker
from qkd_app.service.database.Engine import Engine
from qkd_app.service.database.models._Base import rebuild_database
terminate = threading.Event()
LOGGER : logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Get Database Engine instance and initialize database, if needed
LOGGER.info('Getting SQLAlchemy DB Engine...')
db_engine = Engine.get_engine()
if db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return -1
try:
Engine.create_database(db_engine)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))
rebuild_database(db_engine)
# Get message broker instance
messagebroker = None #MessageBroker(get_messagebroker_backend())
# Starting context service
grpc_service = AppService(db_engine, messagebroker)
grpc_service.start()
rest_server = RestServer()
register_qkd_app(rest_server)
rest_server.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy, sqlalchemy_utils
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # true: dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class Engine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = get_setting('CRDB_DATABASE_APP')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(
crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None
return engine
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)