Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tfs/controller
1 result
Show changes
Showing
with 901 additions and 2 deletions
<acl xmlns="http://www.ipinfusion.com/yang/ocnos/ipi-acl">
<interfaces>
<interface>
<name>{{interface}}</name>
<config>
<name>{{interface}}</name>
</config>
<ingress-acl-sets>
<ingress-acl-set {% if operation == "delete" %}operation="delete"{% endif %}>
{% if type is defined %}<acl-type>{{type}}</acl-type>{% endif %}
<access-groups>
<access-group>
<acl-name>{{name}}</acl-name>
<config>
<acl-name>{{name}}</acl-name>
</config>
</access-group>
</access-groups>
<config>
{% if type is defined %}<acl-type>{{type}}</acl-type>{% endif %}
</config>
</ingress-acl-set>
</ingress-acl-sets>
</interface>
</interfaces>
</acl>
\ No newline at end of file
<profiles xmlns="http://www.ipinfusion.com/yang/ocnos/ipi-platform">
<hardware-profile>
<filters>
<config>
<ingress-ipv4-extended {% if operation == "delete" %}operation="delete"{% endif %}></ingress-ipv4-extended>
</config>
</filters>
</hardware-profile>
</profiles>
\ No newline at end of file
/*
* Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
* Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#numpy==1.23.*
numpy<2.0.0
pandas==1.5.*
#prophet==1.1.*
scikit-learn==1.1.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build kpi-manager:
variables:
IMAGE_NAME: 'kpi_manager' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test kpi-manager:
variables:
IMAGE_NAME: 'kpi_manager' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build kpi-manager
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep crdb; then docker rm -f crdb; else echo "CockroachDB container is not in the system"; fi
- if docker volume ls | grep crdb; then docker volume rm -f crdb; else echo "CockroachDB volume is not in the system"; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME container is not in the system"; fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker pull "cockroachdb/cockroach:latest-v22.2"
- docker volume create crdb
- >
docker run --name crdb -d --network=teraflowbridge -p 26257:26257 -p 8080:8080
--env COCKROACH_DATABASE=tfs_test --env COCKROACH_USER=tfs --env COCKROACH_PASSWORD=tfs123
--volume "crdb:/cockroach/cockroach-data"
cockroachdb/cockroach:latest-v22.2 start-single-node
- echo "Waiting for initialization..."
- while ! docker logs crdb 2>&1 | grep -q 'finished creating default user \"tfs\"'; do sleep 1; done
- docker logs crdb
- docker ps -a
- CRDB_ADDRESS=$(docker inspect crdb --format "{{.NetworkSettings.Networks.teraflowbridge.IPAddress}}")
- echo $CRDB_ADDRESS
- >
docker run --name $IMAGE_NAME -d -p 30010:30010
--env "CRDB_URI=cockroachdb://tfs:tfs123@${CRDB_ADDRESS}:26257/tfs_test?sslmode=require"
--volume "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- docker ps -a
- sleep 5
- docker logs $IMAGE_NAME
- >
docker exec -i $IMAGE_NAME bash -c
"coverage run -m pytest --log-level=INFO --verbose --junitxml=/opt/results/${IMAGE_NAME}_report.xml $IMAGE_NAME/tests/test_*.py"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker volume rm -f crdb
- docker network rm teraflowbridge
- docker volume prune --force
- docker image prune --force
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
## Deployment of the service in Kubernetes Cluster
#deploy context:
# variables:
# IMAGE_NAME: 'context' # name of the microservice
# IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
# stage: deploy
# needs:
# - unit test context
# # - integ_test execute
# script:
# - 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml'
# - kubectl version
# - kubectl get all
# - kubectl apply -f "manifests/${IMAGE_NAME}service.yaml"
# - kubectl get all
# # environment:
# # name: test
# # url: https://example.com
# # kubernetes:
# # namespace: test
# rules:
# - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
# when: manual
# - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
# when: manual
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
WORKDIR /var/teraflow
COPY common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /var/teraflow/common
COPY src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /var/teraflow/common/proto
WORKDIR /var/teraflow/common/proto
RUN touch __init__.py
COPY proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/kpi_manager
WORKDIR /var/teraflow/kpi_manager
COPY src/kpi_manager/requirements.in requirements.in
RUN pip-compile --quiet --output-file=requirements.txt requirements.in
RUN python3 -m pip install -r requirements.txt
# Add component files into working directory
WORKDIR /var/teraflow
COPY src/kpi_manager/. kpi_manager/
# Start the service
ENTRYPOINT ["python", "-m", "kpi_manager.service"]
# How to locally run and test KPI manager micro-service
## --- File links need to be updated. ---
### Pre-requisets
The following requirements should be fulfilled before the execuation of KPI management service.
1. Verify that [kpi_management.proto](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/proto/kpi_management.proto) file exists and grpcs file are generated sucessfully.
2. Virtual enviornment exist with all the required packages listed in ["requirements.in"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/requirements.in) are installed sucessfully.
3. Verify the creation of required database and table.
[KPI DB test](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/database/tests/KpiDBtests.py) python file enlist the functions to create tables and database and
[KPI Engine](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/service/database/KpiEngine.py) contains the DB string, update the string as per your deployment.
### Messages format templates
["Messages"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_messages.py) python file enlist the basic gRPC messages format used during the testing.
### Test file
["KPI management test"](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/kpi_management/kpi_manager/tests/test_kpi_manager.py) python file enlist different tests conducted during the experiment.
### Flow of execution (Kpi Maanager Service functions)
1. Call the `create_database()` and `create_tables()` functions from `Kpi_DB` class to create the required database and table if they don't exist. Call `verify_tables` to verify the existence of KPI table.
2. Call the gRPC method `SetKpiDescriptor(KpiDescriptor)->KpiId` to add the KpiDescriptor in `Kpi` DB. `KpiDescriptor` and `KpiId` are both pre-defined gRPC message types.
3. Call `GetKpiDescriptor(KpiId)->KpiDescriptor` to read the `KpiDescriptor` from DB and `DeleteKpiDescriptor(KpiId)` to delete the `KpiDescriptor` from DB.
4. Call `SelectKpiDescriptor(KpiDescriptorFilter)->KpiDescriptorList` to get all `KpiDescriptor` objects that matches the filter criteria. `KpiDescriptorFilter` and `KpiDescriptorList` are pre-defined gRPC message types.
## For KPI composer and KPI writer
The functionalities of KPI composer and writer is heavily dependent upon Telemetery service. Therfore, these services has other pre-requsites that are mention [here](https://labs.etsi.org/rep/tfs/controller/-/blob/feat/71-cttc-separation-of-monitoring/src/telemetry/requirements.in).
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from common.tools.client.RetryDecorator import retry, delay_exponential
from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceStub
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 10
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class KpiManagerClient:
def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.KPIMANAGER)
if not port: port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = KpiManagerServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def SetKpiDescriptor(self, request : KpiDescriptor) -> KpiId:
LOGGER.debug('SetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.SetKpiDescriptor(request)
LOGGER.debug('SetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def DeleteKpiDescriptor(self,request : KpiId) -> Empty:
LOGGER.debug('DeleteKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.DeleteKpiDescriptor(request)
LOGGER.debug('DeleteKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def GetKpiDescriptor(self, request : KpiId) -> KpiDescriptor:
LOGGER.debug('GetKpiDescriptor: {:s}'.format(grpc_message_to_json_string(request)))
response = self.stub.GetKpiDescriptor(request)
LOGGER.debug('GetKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response)))
return response
@RETRY_DECORATOR
def SelectKpiDescriptor(self, filter : KpiDescriptorFilter) -> KpiDescriptorList:
LOGGER.debug('SelectKpiDescriptor: {:s}'.format(grpc_message_to_json_string(filter)))
response = self.stub.SelectKpiDescriptor(filter)
LOGGER.debug('SelectKpiDescriptor result: {:s}'.format(grpc_message_to_json_string(response)))
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, sqlalchemy
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
# CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@127.0.0.1:{:s}/{:s}?sslmode={:s}'
CRDB_URI_TEMPLATE = 'cockroachdb://{:s}:{:s}@cockroachdb-public.{:s}.svc.cluster.local:{:s}/{:s}?sslmode={:s}'
class KpiEngine:
@staticmethod
def get_engine() -> sqlalchemy.engine.Engine:
crdb_uri = get_setting('CRDB_URI', default=None)
if crdb_uri is None:
CRDB_NAMESPACE = get_setting('CRDB_NAMESPACE')
CRDB_SQL_PORT = get_setting('CRDB_SQL_PORT')
CRDB_DATABASE = 'tfs_kpi_mgmt' # TODO: define variable get_setting('CRDB_DATABASE_KPI_MGMT')
CRDB_USERNAME = get_setting('CRDB_USERNAME')
CRDB_PASSWORD = get_setting('CRDB_PASSWORD')
CRDB_SSLMODE = get_setting('CRDB_SSLMODE')
crdb_uri = CRDB_URI_TEMPLATE.format(
CRDB_USERNAME, CRDB_PASSWORD, CRDB_NAMESPACE, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
# crdb_uri = CRDB_URI_TEMPLATE.format(
# CRDB_USERNAME, CRDB_PASSWORD, CRDB_SQL_PORT, CRDB_DATABASE, CRDB_SSLMODE)
try:
engine = sqlalchemy.create_engine(crdb_uri, echo=False)
LOGGER.info(' KpiDBmanager initalized with DB URL: {:}'.format(crdb_uri))
except: # pylint: disable=bare-except # pragma: no cover
LOGGER.exception('Failed to connect to database: {:s}'.format(str(crdb_uri)))
return None # type: ignore
return engine
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, Integer, String, Text
from sqlalchemy.orm import registry
from common.proto.kpi_manager_pb2 import KpiDescriptor
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
# Create a base class for declarative models
Base = registry().generate_base()
class Kpi(Base):
__tablename__ = 'kpi'
kpi_id = Column(UUID(as_uuid=False), primary_key=True)
kpi_description = Column(Text , nullable=False)
kpi_sample_type = Column(Integer , nullable=False)
device_id = Column(String , nullable=False)
endpoint_id = Column(String , nullable=False)
service_id = Column(String , nullable=False)
slice_id = Column(String , nullable=False)
connection_id = Column(String , nullable=False)
link_id = Column(String , nullable=False)
# helps in logging the information
def __repr__(self):
return (f"<Kpi(kpi_id='{self.kpi_id}', kpi_description='{self.kpi_description}', "
f"kpi_sample_type='{self.kpi_sample_type}', device_id='{self.device_id}', "
f"endpoint_id='{self.endpoint_id}', service_id='{self.service_id}', "
f"slice_id='{self.slice_id}', connection_id='{self.connection_id}', "
f"link_id='{self.link_id}')>")
@classmethod
def convert_KpiDescriptor_to_row(cls, request):
"""
Create an instance of Kpi from a request object.
Args: request: The request object containing the data.
Returns: An instance of Kpi initialized with data from the request.
"""
return cls(
kpi_id = request.kpi_id.kpi_id.uuid,
kpi_description = request.kpi_description,
kpi_sample_type = request.kpi_sample_type,
device_id = request.device_id.device_uuid.uuid,
endpoint_id = request.endpoint_id.endpoint_uuid.uuid,
service_id = request.service_id.service_uuid.uuid,
slice_id = request.slice_id.slice_uuid.uuid,
connection_id = request.connection_id.connection_uuid.uuid,
link_id = request.link_id.link_uuid.uuid
)
@classmethod
def convert_row_to_KpiDescriptor(cls, row):
"""
Create and return a dictionary representation of a Kpi instance.
Args: row: The Kpi instance (row) containing the data.
Returns: KpiDescriptor object
"""
response = KpiDescriptor()
response.kpi_id.kpi_id.uuid = row.kpi_id
response.kpi_description = row.kpi_description
response.kpi_sample_type = row.kpi_sample_type
response.service_id.service_uuid.uuid = row.service_id
response.device_id.device_uuid.uuid = row.device_id
response.slice_id.slice_uuid.uuid = row.slice_id
response.endpoint_id.endpoint_uuid.uuid = row.endpoint_id
response.connection_id.connection_uuid.uuid = row.connection_id
response.link_id.link_uuid.uuid = row.link_id
return response
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sqlalchemy_utils
from sqlalchemy.orm import sessionmaker
from kpi_manager.database.KpiEngine import KpiEngine
from kpi_manager.database.KpiModel import Kpi as KpiModel
from common.method_wrappers.ServiceExceptions import (
AlreadyExistsException, OperationFailedException , NotFoundException)
LOGGER = logging.getLogger(__name__)
DB_NAME = "tfs_kpi_mgmt"
class KpiDB:
def __init__(self):
self.db_engine = KpiEngine.get_engine()
if self.db_engine is None:
LOGGER.error('Unable to get SQLAlchemy DB Engine...')
return False
self.db_name = DB_NAME
self.Session = sessionmaker(bind=self.db_engine)
def create_database(self) -> None:
if not sqlalchemy_utils.database_exists(self.db_engine.url):
LOGGER.debug("Database created. {:}".format(self.db_engine.url))
sqlalchemy_utils.create_database(self.db_engine.url)
def drop_database(self) -> None:
if sqlalchemy_utils.database_exists(self.db_engine.url):
sqlalchemy_utils.drop_database(self.db_engine.url)
def create_tables(self):
try:
KpiModel.metadata.create_all(self.db_engine) # type: ignore
LOGGER.debug("Tables created in the DB Name: {:}".format(self.db_name))
except Exception as e:
LOGGER.debug("Tables cannot be created in the kpi database. {:s}".format(str(e)))
raise OperationFailedException ("Tables can't be created", extra_details=["unable to create table {:}".format(e)])
def verify_tables(self):
try:
with self.db_engine.connect() as connection:
result = connection.execute("SHOW TABLES;")
tables = result.fetchall() # type: ignore
LOGGER.debug("Tables verified: {:}".format(tables))
except Exception as e:
LOGGER.debug("Unable to fetch Table names. {:s}".format(str(e)))
def add_row_to_db(self, row):
session = self.Session()
try:
session.add(row)
session.commit()
LOGGER.debug(f"Row inserted into {row.__class__.__name__} table.")
return True
except Exception as e:
session.rollback()
if "psycopg2.errors.UniqueViolation" in str(e):
LOGGER.error(f"Unique key voilation: {row.__class__.__name__} table. {str(e)}")
raise AlreadyExistsException(row.__class__.__name__, row,
extra_details=["Unique key voilation: {:}".format(e)] )
else:
LOGGER.error(f"Failed to insert new row into {row.__class__.__name__} table. {str(e)}")
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def search_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
entity = session.query(model).filter_by(**{col_name: id_to_search}).first()
if entity:
# LOGGER.debug(f"{model.__name__} ID found: {str(entity)}")
return entity
else:
LOGGER.debug(f"{model.__name__} ID not found, No matching row: {str(id_to_search)}")
print("{:} ID not found, No matching row: {:}".format(model.__name__, id_to_search))
return None
except Exception as e:
session.rollback()
LOGGER.debug(f"Failed to retrieve {model.__name__} ID. {str(e)}")
raise OperationFailedException ("search by column id", extra_details=["unable to search row {:}".format(e)])
finally:
session.close()
def delete_db_row_by_id(self, model, col_name, id_to_search):
session = self.Session()
try:
record = session.query(model).filter_by(**{col_name: id_to_search}).first()
if record:
session.delete(record)
session.commit()
LOGGER.debug("Deleted %s with %s: %s", model.__name__, col_name, id_to_search)
else:
LOGGER.debug("%s with %s %s not found", model.__name__, col_name, id_to_search)
return None
except Exception as e:
session.rollback()
LOGGER.error("Error deleting %s with %s %s: %s", model.__name__, col_name, id_to_search, e)
raise OperationFailedException ("Deletion by column id", extra_details=["unable to delete row {:}".format(e)])
finally:
session.close()
def select_with_filter(self, model, filter_object):
session = self.Session()
try:
query = session.query(KpiModel)
# Apply filters based on the filter_object
if filter_object.kpi_id:
query = query.filter(KpiModel.kpi_id.in_([k.kpi_id.uuid for k in filter_object.kpi_id]))
if filter_object.kpi_sample_type:
query = query.filter(KpiModel.kpi_sample_type.in_(filter_object.kpi_sample_type))
if filter_object.device_id:
query = query.filter(KpiModel.device_id.in_([d.device_uuid.uuid for d in filter_object.device_id]))
if filter_object.endpoint_id:
query = query.filter(KpiModel.endpoint_id.in_([e.endpoint_uuid.uuid for e in filter_object.endpoint_id]))
if filter_object.service_id:
query = query.filter(KpiModel.service_id.in_([s.service_uuid.uuid for s in filter_object.service_id]))
if filter_object.slice_id:
query = query.filter(KpiModel.slice_id.in_([s.slice_uuid.uuid for s in filter_object.slice_id]))
if filter_object.connection_id:
query = query.filter(KpiModel.connection_id.in_([c.connection_uuid.uuid for c in filter_object.connection_id]))
if filter_object.link_id:
query = query.filter(KpiModel.link_id.in_([l.link_uuid.uuid for l in filter_object.link_id]))
result = query.all()
if result:
LOGGER.debug(f"Fetched filtered rows from {model.__name__} table with filters: {filter_object}") # - Results: {result}
else:
LOGGER.debug(f"No matching row found in {model.__name__} table with filters: {filter_object}")
return result
except Exception as e:
LOGGER.error(f"Error fetching filtered rows from {model.__name__} table with filters {filter_object} ::: {e}")
raise OperationFailedException ("Select by filter", extra_details=["unable to apply the filter {:}".format(e)])
finally:
session.close()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
psycopg2-binary==2.9.*
SQLAlchemy==1.4.*
sqlalchemy-cockroachdb==1.4.*
SQLAlchemy-Utils==0.38.*
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from common.proto.kpi_manager_pb2_grpc import add_KpiManagerServiceServicer_to_server
from kpi_manager.service.KpiManagerServiceServicerImpl import KpiManagerServiceServicerImpl
class KpiManagerService(GenericGrpcService):
def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.KPIMANAGER)
super().__init__(port, cls_name=cls_name)
self.kpiManagerService_servicer = KpiManagerServiceServicerImpl()
def install_servicers(self):
add_KpiManagerServiceServicer_to_server(self.kpiManagerService_servicer, self.server)
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, grpc
from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method
from common.proto.context_pb2 import Empty
from common.proto.kpi_manager_pb2_grpc import KpiManagerServiceServicer
from common.proto.kpi_manager_pb2 import KpiId, KpiDescriptor, KpiDescriptorFilter, KpiDescriptorList
from kpi_manager.database.Kpi_DB import KpiDB
from kpi_manager.database.KpiModel import Kpi as KpiModel
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool('KpiManager', 'NBIgRPC')
class KpiManagerServiceServicerImpl(KpiManagerServiceServicer):
def __init__(self):
LOGGER.info('Init KpiManagerService')
self.kpi_db_obj = KpiDB()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SetKpiDescriptor(self, request: KpiDescriptor, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiId: # type: ignore
response = KpiId()
LOGGER.info("Received gRPC message object: {:}".format(request))
try:
kpi_to_insert = KpiModel.convert_KpiDescriptor_to_row(request)
if(self.kpi_db_obj.add_row_to_db(kpi_to_insert)):
response.kpi_id.uuid = request.kpi_id.kpi_id.uuid
# LOGGER.info("Added Row: {:}".format(response))
return response
except Exception as e:
LOGGER.info("Unable to create KpiModel class object. {:}".format(e))
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiDescriptor: # type: ignore
response = KpiDescriptor()
print("--> Received gRPC message object: {:}".format(request))
LOGGER.info("Received gRPC message object: {:}".format(request))
try:
kpi_id_to_search = request.kpi_id.uuid
row = self.kpi_db_obj.search_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
if row is None:
print ('No matching row found for kpi id: {:}'.format(kpi_id_to_search))
LOGGER.info('No matching row found kpi id: {:}'.format(kpi_id_to_search))
return Empty()
else:
response = KpiModel.convert_row_to_KpiDescriptor(row)
return response
except Exception as e:
print ('Unable to search kpi id. {:}'.format(e))
LOGGER.info('Unable to search kpi id. {:}'.format(e))
raise e
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def DeleteKpiDescriptor(self, request: KpiId, grpc_context: grpc.ServicerContext # type: ignore
) -> Empty: # type: ignore
LOGGER.info("Received gRPC message object: {:}".format(request))
try:
kpi_id_to_search = request.kpi_id.uuid
self.kpi_db_obj.delete_db_row_by_id(KpiModel, 'kpi_id', kpi_id_to_search)
except Exception as e:
LOGGER.info('Unable to search kpi id. {:}'.format(e))
finally:
return Empty()
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def SelectKpiDescriptor(self, filter: KpiDescriptorFilter, grpc_context: grpc.ServicerContext # type: ignore
) -> KpiDescriptorList: # type: ignore
LOGGER.info("Received gRPC message object: {:}".format(filter))
response = KpiDescriptorList()
try:
rows = self.kpi_db_obj.select_with_filter(KpiModel, filter)
except Exception as e:
LOGGER.info('Unable to apply filter on kpi descriptor. {:}'.format(e))
try:
for row in rows:
kpiDescriptor_obj = KpiModel.convert_row_to_KpiDescriptor(row)
response.kpi_descriptor_list.append(kpiDescriptor_obj)
return response
except Exception as e:
LOGGER.info('Unable to process filter response {:}'.format(e))
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, signal, sys, threading
from common.Settings import get_log_level
from .KpiManagerService import KpiManagerService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.debug('Starting...')
grpc_service = KpiManagerService()
grpc_service.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1.0): pass
LOGGER.debug('Terminating...')
grpc_service.stop()
LOGGER.debug('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from kpi_manager.database.Kpi_DB import KpiDB
LOGGER = logging.getLogger(__name__)
def test_verify_databases_and_Tables():
LOGGER.info('>>> test_verify_Tables : START <<< ')
kpiDBobj = KpiDB()
kpiDBobj.drop_database()
kpiDBobj.verify_tables()
kpiDBobj.create_database()
kpiDBobj.create_tables()
kpiDBobj.verify_tables()