Skip to content
Snippets Groups Projects
Commit 843911d9 authored by Mohammad Ismaeel's avatar Mohammad Ismaeel
Browse files

Merge branch 'develop' of https://labs.etsi.org/rep/tfs/controller into...

Merge branch 'develop' of https://labs.etsi.org/rep/tfs/controller into cnit_related_activity_openroadm
parents 9274f74b 459c6f14
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!285Resolve: "(CNIT) New SBI Driver based on OpenROADM for ROADMs"
Showing
with 742 additions and 1 deletion
......@@ -62,3 +62,10 @@ spec:
name: nbiservice
port:
number: 8080
- path: /()(qkd_app/.*)
- pathType: Prefix
backend:
service:
name: qkdappservice
port:
number: 8005
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: qkdappservice
spec:
selector:
matchLabels:
app: qkdappservice
#replicas: 1
template:
metadata:
labels:
app: qkdappservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: labs.etsi.org:5050/tfs/controller/qkd_app:latest
imagePullPolicy: Always
ports:
- containerPort: 10060
- containerPort: 9192
- containerPort: 8005
env:
- name: MB_BACKEND
value: "nats"
- name: LOG_LEVEL
value: "INFO"
- name: CRDB_DATABASE_APP
value: "qkd_app"
envFrom:
- secretRef:
name: crdb-data
- secretRef:
name: nats-data
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10060"]
resources:
requests:
cpu: 150m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
---
apiVersion: v1
kind: Service
metadata:
name: qkdappservice
labels:
app: qkdappservice
spec:
type: ClusterIP
selector:
app: qkdappservice
ports:
- name: grpc
protocol: TCP
port: 10060
targetPort: 10060
- name: metrics
protocol: TCP
port: 9192
targetPort: 9192
- name: http
port: 8005
targetPort: 8005
......@@ -71,7 +71,14 @@ export TFS_COMPONENTS="context device pathcomp opticalcontroller service slice
#fi
# Uncomment to activate QKD App
#export TFS_COMPONENTS="${TFS_COMPONENTS} app"
# To manage QKD Apps, "service" requires "qkd_app" to be deployed
# before "service", thus we "hack" the TFS_COMPONENTS environment variable prepending the
# "qkd_app" only if "service" is already in TFS_COMPONENTS, and re-export it.
#if [[ "$TFS_COMPONENTS" == *"service"* ]]; then
# BEFORE="${TFS_COMPONENTS% service*}"
# AFTER="${TFS_COMPONENTS#* service}"
# export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}"
#fi
# Set the tag you want to use for your images.
......
syntax = "proto3";
package qkd_app;
import "context.proto";
// Optare: Change this if you want to change App's structure or enums.
// Optare: If a message (structure) is changed it must be changed in src/app/service/database
enum QKDAppStatusEnum {
QKDAPPSTATUS_ON = 0;
QKDAPPSTATUS_DISCONNECTED = 1;
QKDAPPSTATUS_OUT_OF_TIME = 2;
QKDAPPSTATUS_ZOMBIE = 3;
}
enum QKDAppTypesEnum {
QKDAPPTYPES_INTERNAL = 0;
QKDAPPTYPES_CLIENT = 1;
}
message QKDLId {
context.Uuid qkdl_uuid = 1;
}
message App {
AppId app_id = 1;
QKDAppStatusEnum app_status = 2;
QKDAppTypesEnum app_type = 3;
string server_app_id = 4;
repeated string client_app_id = 5;
repeated QKDLId backing_qkdl_id = 6;
context.DeviceId local_device_id = 7;
context.DeviceId remote_device_id = 8;
}
message AppId {
context.ContextId context_id = 1;
context.Uuid app_uuid = 2;
}
service AppService {
rpc RegisterApp(App) returns (context.Empty) {}
rpc ListApps (context.ContextId ) returns ( AppList ) {}
}
message AppList {
repeated App apps = 1;
}
......@@ -61,6 +61,7 @@ class ServiceNameEnum(Enum):
E2EORCHESTRATOR = 'e2e-orchestrator'
OPTICALCONTROLLER = 'opticalcontroller'
BGPLS = 'bgpls-speaker'
QKD_APP = 'qkd_app'
KPIMANAGER = 'kpi-manager'
KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer'
......@@ -96,6 +97,7 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.FORECASTER .value : 10040,
ServiceNameEnum.E2EORCHESTRATOR .value : 10050,
ServiceNameEnum.OPTICALCONTROLLER .value : 10060,
ServiceNameEnum.QKD_APP .value : 10070,
ServiceNameEnum.BGPLS .value : 20030,
ServiceNameEnum.KPIMANAGER .value : 30010,
ServiceNameEnum.KPIVALUEAPI .value : 30020,
......@@ -115,10 +117,12 @@ DEFAULT_SERVICE_HTTP_PORTS = {
ServiceNameEnum.CONTEXT .value : 8080,
ServiceNameEnum.NBI .value : 8080,
ServiceNameEnum.WEBUI .value : 8004,
ServiceNameEnum.QKD_APP .value : 8005,
}
# Default HTTP/REST-API service base URLs
DEFAULT_SERVICE_HTTP_BASEURLS = {
ServiceNameEnum.NBI .value : None,
ServiceNameEnum.WEBUI .value : None,
ServiceNameEnum.QKD_APP .value : None,
}
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from typing import Dict, List, Optional
from common.Constants import DEFAULT_CONTEXT_NAME
from common.tools.object_factory.Context import json_context_id
def json_app_id(app_uuid : str, context_id : Optional[Dict] = None) -> Dict:
result = {'app_uuid': {'uuid': app_uuid}}
if context_id is not None: result['context_id'] = copy.deepcopy(context_id)
return result
......@@ -42,6 +42,16 @@ def json_service(
'service_config' : {'config_rules': copy.deepcopy(config_rules)},
}
def json_service_qkd_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
):
return json_service(
service_uuid, ServiceTypeEnum.SERVICETYPE_QKD, context_id=json_context_id(context_uuid),
status=ServiceStatusEnum.SERVICESTATUS_PLANNED, endpoint_ids=endpoint_ids, constraints=constraints,
config_rules=config_rules)
def json_service_l2nm_planned(
service_uuid : str, endpoint_ids : List[Dict] = [], constraints : List[Dict] = [],
config_rules : List[Dict] = [], context_uuid : str = DEFAULT_CONTEXT_NAME
......
......@@ -38,3 +38,4 @@ def test_qkd_driver_timeout_connection(mock_get, qkd_driver):
mock_get.side_effect = requests.exceptions.Timeout
qkd_driver.timeout = 0.001 # Simulate very short timeout
assert qkd_driver.Connect() is False
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
build app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test app:
variables:
IMAGE_NAME: 'qkd_app' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build app
- unit_test service
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 10070:10070 -p 8005:8005 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/qkd_app
WORKDIR /var/teraflow/qkd_app
COPY src/qkd_app/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/service/. service/
COPY src/qkd_app/. qkd_app/
# Start the service
ENTRYPOINT ["python", "-m", "qkd_app.service"]
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, ContextId
from common.proto.qkd_app_pb2 import App, AppId, AppList
from common.proto.qkd_app_pb2_grpc import AppServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class QKDAppClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.APP)
if not port: port = get_service_port_grpc(ServiceNameEnum.APP)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = AppServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def RegisterApp(self, request : App) -> Empty:
LOGGER.debug('RegisterApp request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.RegisterApp(request)
LOGGER.debug('RegisterApp result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def ListApps(self, request: ContextId) -> AppList:
LOGGER.debug('ListApps request: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.ListApps(request)
LOGGER.debug('ListApps result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Flask==2.1.3
Flask-HTTPAuth==4.5.0
Flask-RESTful==0.3.9
jsonschema==4.4.0
requests==2.27.1
werkzeug==2.3.7
nats-py==2.6.*
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.message_broker.MessageBroker import MessageBroker
from common.proto.qkd_app_pb2_grpc import add_AppServiceServicer_to_server
from common.tools.service.GenericGrpcService import GenericGrpcService
from qkd_app.service.QKDAppServiceServicerImpl import AppServiceServicerImpl
# Custom gRPC settings
GRPC_MAX_WORKERS = 200 # multiple clients might keep connections alive for Get*Events() RPC methods
LOGGER = logging.getLogger(__name__)
class AppService(GenericGrpcService):
def __init__(
self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker, cls_name: str = __name__
) -> None:
port = get_service_port_grpc(ServiceNameEnum.APP)
super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name)
self.app_servicer = AppServiceServicerImpl(db_engine, messagebroker)
def install_servicers(self):
add_AppServiceServicer_to_server(self.app_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging, sqlalchemy
#from typing import Iterator, Optional
from common.message_broker.MessageBroker import MessageBroker
import grpc, json, logging #, deepdiff
from common.proto.context_pb2 import (
Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ContextId)
from common.proto.qkd_app_pb2 import (App, AppId, AppList, QKDAppTypesEnum)
from common.proto.qkd_app_pb2_grpc import AppServiceServicer
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
#from common.tools.context_queries.InterDomain import is_inter_domain #, is_multi_domain
#from common.tools.grpc.ConfigRules import copy_config_rules
#from common.tools.grpc.Constraints import copy_constraints
#from common.tools.grpc.EndPointIds import copy_endpoint_ids
#from common.tools.grpc.ServiceIds import update_service_ids
#from common.tools.grpc.Tools import grpc_message_to_json_string
#from context.client.ContextClient import ContextClient
#from qkd_app.client.QKDAppClient import QKDAppClient
from .database.QKDApp import app_set, app_list_objs, app_get, app_get_by_server
from common.method_wrappers.ServiceExceptions import NotFoundException
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('QkdApp', 'RPC')
# Optare: This file must be edited based on app's logic
class AppServiceServicerImpl(AppServiceServicer):
def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker):
LOGGER.debug('Creating Servicer...')
self.db_engine = db_engine
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def RegisterApp(self, request : App, context : grpc.ServicerContext) -> Empty:
# Optare: This is the main function required for the project.
# Optare: If it's an internal it will save it directly. If it's an external one it will save it as pending by not providing the remote until the other party requests it too
# Optare: Ideally, the only thing needed to change is the code inside the try block. Currently it just searches by a pending app with the same server_id but you can put more restrictions or different search and raise the NotFoundException
if request.app_type == QKDAppTypesEnum.QKDAPPTYPES_INTERNAL:
app_set(self.db_engine, self.messagebroker, request)
else:
try:
app = app_get_by_server(self.db_engine, request.server_app_id)
except NotFoundException:
app = request
app_set(self.db_engine, self.messagebroker, app)
else:
app.remote_device_id.device_uuid.uuid = request.local_device_id.device_uuid.uuid
app_set(self.db_engine, self.messagebroker, app)
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def ListApps(self, request: ContextId, context : grpc.ServicerContext) -> AppList:
return app_list_objs(self.db_engine)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from prometheus_client import start_http_server
#from common.Constants import ServiceNameEnum
from common.Settings import (
#ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port, wait_for_environment_variables)
from qkd_app.service.QKDAppService import AppService
from qkd_app.service.rest_server.RestServer import RestServer
from qkd_app.service.rest_server.qkd_app import register_qkd_app
#from common.message_broker.Factory import get_messagebroker_backend
#from common.message_broker.MessageBroker import MessageBroker
from qkd_app.service.database.Engine import Engine
from qkd_app.service.database.models._Base import rebuild_database
terminate = threading.Event()
LOGGER : logging.Logger = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s")
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
#get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Get Database Engine instance and initialize database, if needed
LOGGER.info('Getting SQLAlchemy DB Engine...')
db_engine = Engine.get_engine()
if db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return -1
try:
Engine.create_database(db_engine)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to check/create the database: {:s}'.format(str(db_engine.url)))
rebuild_database(db_engine)
# Get message broker instance
messagebroker = None #MessageBroker(get_messagebroker_backend())
# Starting context service
grpc_service = AppService(db_engine, messagebroker)
grpc_service.start()
rest_server = RestServer()
register_qkd_app(rest_server)
rest_server.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.info('Terminating...')
grpc_service.stop()
rest_server.shutdown()
rest_server.join()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy, sqlalchemy_utils
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
APP_NAME = 'tfs'
ECHO = False # true: dump SQL commands and transactions executed
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class Engine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = get_setting('CRDB_DATABASE_APP')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(
crdb_uri, connect_args={'application_name': APP_NAME}, echo=ECHO, future=True)
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None
return engine
@staticmethod
def create_database(engine : sqlalchemy.engine.Engine) -> None:
if not sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.create_database(engine.url)
@staticmethod
def drop_database(engine : sqlalchemy.engine.Engine) -> None:
if sqlalchemy_utils.database_exists(engine.url):
sqlalchemy_utils.drop_database(engine.url)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment