diff --git a/manifests/simap_connectorservice.yaml b/manifests/simap_connectorservice.yaml index 09796f6f8bc0fafe9867add81974859d1d1575a7..91fa65af3742c2a5c51ed8dc40f1241b1bd930cb 100644 --- a/manifests/simap_connectorservice.yaml +++ b/manifests/simap_connectorservice.yaml @@ -37,21 +37,21 @@ spec: env: - name: LOG_LEVEL value: "INFO" - - name: SIMAP_SERVER_SCHEME + - name: SIMAP_DATASTORE_SCHEME value: "http" - - name: SIMAP_SERVER_ADDRESS + - name: SIMAP_DATASTORE_ADDRESS # Assuming SIMAP Server is deployed in a local Docker container, as per: - # - ./src/tests/tools/simap_server/build.sh - # - ./src/tests/tools/simap_server/deploy.sh + # - ./src/tests/tools/simap_datastore/build.sh + # - ./src/tests/tools/simap_datastore/deploy.sh value: "172.17.0.1" - - name: SIMAP_SERVER_PORT + - name: SIMAP_DATASTORE_PORT # Assuming SIMAP Server is deployed in a local Docker container, as per: - # - ./src/tests/tools/simap_server/build.sh - # - ./src/tests/tools/simap_server/deploy.sh + # - ./src/tests/tools/simap_datastore/build.sh + # - ./src/tests/tools/simap_datastore/deploy.sh value: "8080" - - name: SIMAP_SERVER_USERNAME + - name: SIMAP_DATASTORE_USERNAME value: "admin" - - name: SIMAP_SERVER_PASSWORD + - name: SIMAP_DATASTORE_PASSWORD value: "admin" - name: CRDB_DATABASE value: "tfs_simap_connector" diff --git a/src/common/tools/rest_conf/client/README.md b/src/common/tools/rest_conf/client/README.md index 9605fc751ecd44c0404cfd4586012c91544890d5..ecfb4993b17c1fab5a961ad32ce3b705d6bbad39 100644 --- a/src/common/tools/rest_conf/client/README.md +++ b/src/common/tools/rest_conf/client/README.md @@ -2,4 +2,4 @@ This server implements a basic RESTCONF Client that can be potentially used for any case. -See a simple working example in folder `src/tests/tools/simap_server` +See a simple working example in folder `src/tests/tools/simap_datastore` diff --git a/src/common/tools/rest_conf/server/README.md b/src/common/tools/rest_conf/server/README.md index 542d836173a11bf65cdfb5dc52a8fdf8657725c5..761c07aed03a75abeee5caaf26e24754cbb90082 100644 --- a/src/common/tools/rest_conf/server/README.md +++ b/src/common/tools/rest_conf/server/README.md @@ -12,7 +12,7 @@ The server can be configured using the following environment variables: - `SECRET_KEY`, defaults to `secrets.token_hex(64)` -See a simple working example in folder `src/tests/tools/simap_server` +See a simple working example in folder `src/tests/tools/simap_datastore` ## Build the RESTCONF Server Docker image diff --git a/src/common/tools/rest_conf/server/restconf_server/app.py b/src/common/tools/rest_conf/server/restconf_server/app.py index de35b25240487d9213ae1aa33c0691c1fc399dca..a00548da412c9bc2ff3fbb19c75c0d9cb16ed7bb 100644 --- a/src/common/tools/rest_conf/server/restconf_server/app.py +++ b/src/common/tools/rest_conf/server/restconf_server/app.py @@ -18,8 +18,8 @@ from .RestConfServerApplication import RestConfServerApplication logging.basicConfig( - level=logging.INFO, - format='[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s', + level = logging.INFO, + format = '[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s', ) LOGGER = logging.getLogger(__name__) diff --git a/src/simap_connector/Config.py b/src/simap_connector/Config.py index 656e9a87584e14ea73b0cee7e75f3dec11852d9d..659b4a72dcf20fd456a28aead7363bb1826d6d8d 100644 --- a/src/simap_connector/Config.py +++ b/src/simap_connector/Config.py @@ -15,8 +15,8 @@ from common.Settings import get_setting -SIMAP_SERVER_SCHEME = str(get_setting('SIMAP_SERVER_SCHEME', default='http' )) -SIMAP_SERVER_ADDRESS = str(get_setting('SIMAP_SERVER_ADDRESS', default='127.0.0.1')) -SIMAP_SERVER_PORT = int(get_setting('SIMAP_SERVER_PORT', default='80' )) -SIMAP_SERVER_USERNAME = str(get_setting('SIMAP_SERVER_USERNAME', default='admin' )) -SIMAP_SERVER_PASSWORD = str(get_setting('SIMAP_SERVER_PASSWORD', default='admin' )) +SIMAP_DATASTORE_SCHEME = str(get_setting('SIMAP_DATASTORE_SCHEME', default='http' )) +SIMAP_DATASTORE_ADDRESS = str(get_setting('SIMAP_DATASTORE_ADDRESS', default='127.0.0.1')) +SIMAP_DATASTORE_PORT = int(get_setting('SIMAP_DATASTORE_PORT', default='80' )) +SIMAP_DATASTORE_USERNAME = str(get_setting('SIMAP_DATASTORE_USERNAME', default='admin' )) +SIMAP_DATASTORE_PASSWORD = str(get_setting('SIMAP_DATASTORE_PASSWORD', default='admin' )) diff --git a/src/simap_connector/service/__main__.py b/src/simap_connector/service/__main__.py index e52768f8e7808572368d4839549c1f98a6508bcb..d5df1bc30c4471855bae553fc033367f422611ff 100644 --- a/src/simap_connector/service/__main__.py +++ b/src/simap_connector/service/__main__.py @@ -21,8 +21,8 @@ from common.Settings import ( get_log_level, get_metrics_port, wait_for_environment_variables ) from simap_connector.Config import ( - SIMAP_SERVER_SCHEME, SIMAP_SERVER_ADDRESS, SIMAP_SERVER_PORT, - SIMAP_SERVER_USERNAME, SIMAP_SERVER_PASSWORD, + SIMAP_DATASTORE_SCHEME, SIMAP_DATASTORE_ADDRESS, SIMAP_DATASTORE_PORT, + SIMAP_DATASTORE_USERNAME, SIMAP_DATASTORE_PASSWORD, ) from .database.Engine import Engine from .database.models._Base import rebuild_database @@ -76,12 +76,12 @@ def main(): rebuild_database(db_engine) restconf_client = RestConfClient( - scheme=SIMAP_SERVER_SCHEME, address=SIMAP_SERVER_ADDRESS, - port=SIMAP_SERVER_PORT, username=SIMAP_SERVER_USERNAME, - password=SIMAP_SERVER_PASSWORD, + scheme = SIMAP_DATASTORE_SCHEME, address = SIMAP_DATASTORE_ADDRESS, + port = SIMAP_DATASTORE_PORT, username = SIMAP_DATASTORE_USERNAME, + password = SIMAP_DATASTORE_PASSWORD, ) - simap_client = SimapClient(restconf_client) + simap_client = SimapClient(restconf_client) telemetry_pool = TelemetryPool(simap_client, terminate=TERMINATE) grpc_service = SimapConnectorService(db_engine, restconf_client, telemetry_pool) diff --git a/src/simap_connector/service/simap_updater/AllowedLinks.py b/src/simap_connector/service/simap_updater/AllowedLinks.py index 4758eebbf343f55e5bafba7f8adf568eb8dfb8ea..e01d78451897a14b9994ba5a748432d90f0b6b6b 100644 --- a/src/simap_connector/service/simap_updater/AllowedLinks.py +++ b/src/simap_connector/service/simap_updater/AllowedLinks.py @@ -13,8 +13,8 @@ # limitations under the License. ALLOWED_LINKS_PER_CONTROLLER = { - 'e2e' : {'L1', 'L2', 'L3', 'L4'}, - 'agg' : {'L7ab', 'L7ba', 'L8ab', 'L8ba', 'L11ab', 'L11ba', - 'L12ab', 'L12ba', 'L13', 'L14'}, - 'trans-pkt': {'L5', 'L6', 'L9', 'L10'}, + 'e2e' : { 'L1', 'L2', 'L3', 'L4' }, + 'agg' : { 'L7ab', 'L7ba', 'L8ab', 'L8ba', 'L11ab', + 'L11ba', 'L12ab', 'L12ba', 'L13', 'L14' }, + 'trans-pkt': { 'L5', 'L6', 'L9', 'L10' }, } diff --git a/src/tests/.gitlab-ci.yml b/src/tests/.gitlab-ci.yml index 267d7ac23af5942bdec5065292e33b0b3b537d9d..144488cbd64c21719ea0845fa6fc2f064bed74db 100644 --- a/src/tests/.gitlab-ci.yml +++ b/src/tests/.gitlab-ci.yml @@ -32,4 +32,4 @@ include: - local: '/src/tests/tools/mock_tfs_nbi_dependencies/.gitlab-ci.yml' - local: '/src/tests/tools/mock_qkd_node/.gitlab-ci.yml' - local: '/src/tests/tools/mock_osm_nbi/.gitlab-ci.yml' - - local: '/src/tests/tools/simap_server/.gitlab-ci.yml' + - local: '/src/tests/tools/simap_datastore/.gitlab-ci.yml' diff --git a/src/tests/tools/mock_nce_t_ctrl/redeploy.sh b/src/tests/tools/mock_nce_t_ctrl/redeploy.sh index ed5c254576ba94955b8d090c7b623f27881ecc0c..c1878fe9fceb512fb7e1406c22a67a66b4b4f28e 100755 --- a/src/tests/tools/mock_nce_t_ctrl/redeploy.sh +++ b/src/tests/tools/mock_nce_t_ctrl/redeploy.sh @@ -14,20 +14,20 @@ # limitations under the License. -echo "Building SIMAP Server..." +echo "Building SIMAP Datastore..." cd ~/tfs-ctrl/ -docker buildx build -t simap-server:mock -f ./src/tests/tools/simap_server/Dockerfile . +docker buildx build -t simap-datastore:mock -f ./src/tests/tools/simap_datastore/Dockerfile . echo "Building NCE-T Controller..." cd ~/tfs-ctrl/ docker buildx build -t nce-t-ctrl:mock -f ./src/tests/tools/mock_nce_t_ctrl/Dockerfile . echo "Cleaning up..." -docker rm --force simap-server +docker rm --force simap-datastore docker rm --force nce-t-ctrl echo "Deploying support services..." -docker run --detach --name simap-server --publish 8080:8080 simap-server:mock +docker run --detach --name simap-datastore --publish 8080:8080 simap-datastore:mock docker run --detach --name nce-t-ctrl --publish 8081:8080 --env SIMAP_ADDRESS=172.17.0.1 --env SIMAP_PORT=8080 nce-t-ctrl:mock sleep 2 diff --git a/src/tests/tools/simap_ai_engine/ai_engine/Dockerfile b/src/tests/tools/simap_ai_engine/ai_engine/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..b0041a2109a54062d2cd8432d028d69816ba4dd2 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/Dockerfile @@ -0,0 +1,56 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.13-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Set working directory +WORKDIR /app + +# Copy common requirements and AI engine requirements +COPY common_requirements_py313.in /app/common_requirements.in +COPY src/tests/tools/simap_ai_engine/ai_engine/requirements.in /app/ai_requirements.in + +# Install Python dependencies +RUN pip install --no-cache-dir pip-tools && \ + pip-compile --output-file=requirements.txt common_requirements.in ai_requirements.in && \ + pip install --no-cache-dir -r requirements.txt + +# Copy minimal TFS common module (required for Settings, etc.) +COPY src/common /app/src/common + +# Copy AI Engine module +COPY src/tests/tools/simap_ai_engine/ai_engine /app/ai_engine + +# Set PYTHONPATH to include repo modules and the AI engine package +ENV PYTHONPATH=/app/src + +# Expose default REST API port +EXPOSE 8080 + +# Set default environment variables (can be overridden at runtime) +ENV AI_ENGINE_REST_HOST=0.0.0.0 +ENV AI_ENGINE_REST_PORT=8080 +ENV SIMAP_DATASTORE_SCHEME=http +ENV LOG_LEVEL=INFO + +# Note: SIMAP_DATASTORE_ADDRESS, SIMAP_DATASTORE_PORT, SIMAP_DATASTORE_USERNAME, +# and SIMAP_DATASTORE_PASSWORD should be provided at runtime via --env flags + +# Run the AI Engine +CMD ["python", "-m", "ai_engine"] diff --git a/src/tests/tools/simap_ai_engine/ai_engine/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..288ad6ef9510fda28274b144036855949e019253 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/__init__.py @@ -0,0 +1,54 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +AI Engine module. + +Provides REST API for AI-driven SLA policy analysis and violation detection +using data from SIMAP (network topology/devices) and InfluxDB (telemetry metrics). + +Module Structure: + - ai_model: AI/ML processing logic and SLA policy definitions + - api: Flask REST API endpoints + - clients: External service clients (SIMAP, InfluxDB, Decision Engine) + - config: Configuration management + - tests: Test suite + +Public API: + - AIEngineAPI: Main orchestrator and Flask application + - AIModelProcessor: AI/ML analysis engine + - SLAPolicyConfig: SLA policy configuration data model + - SimapDataFetcher: SIMAP client for device/topology data + - InfluxDBFetcher: InfluxDB client for telemetry metrics + - DecisionEngineClient: Decision engine notification client + - create_ai_engine_blueprint: Flask blueprint factory +""" + +from .ai_model.ai_processor import AIModelProcessor +from .api.api_blueprint import create_ai_engine_blueprint +from .engine import AIEngineAPI +from .clients.decision_client import DecisionEngineClient +from .clients.influxdb_fetcher import InfluxDBFetcher +from .clients.simap_fetcher import SimapDataFetcher +from .ai_model.sla_policy import SLAPolicyConfig + +__all__ = [ + 'AIEngineAPI', + 'AIModelProcessor', + 'DecisionEngineClient', + 'InfluxDBFetcher', + 'SimapDataFetcher', + 'SLAPolicyConfig', + 'create_ai_engine_blueprint', +] diff --git a/src/tests/tools/simap_ai_engine/ai_engine/__main__.py b/src/tests/tools/simap_ai_engine/ai_engine/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..dd73f54c6d91af2f17eb5fbf7d85d13d77ea58b2 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/__main__.py @@ -0,0 +1,57 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +AI Engine main entry point. + +This module provides the entry point for running the AI Engine REST API. +""" + +# Standard library imports +import logging +import sys + +# Local imports +from .engine import AIEngineAPI + +# Module-level logger +LOGGER = logging.getLogger(__name__) + + +def main() -> None: + """Main entry point for the AI Engine.""" + # Configure logging + logging.basicConfig( + level=logging.DEBUG, + format="[%(asctime)s] %(levelname)s:%(name)s:[%(funcName)s:%(lineno)d] - %(message)s" + ) + logging.getLogger('werkzeug').setLevel(logging.DEBUG) + + try: + LOGGER.info("Starting AI Engine") + engine = AIEngineAPI() + + logging.getLogger('werkzeug').setLevel(logging.DEBUG) + + engine.run() + except KeyboardInterrupt: + LOGGER.info("AI Engine stopped by user") + sys.exit(0) + except Exception as e: + LOGGER.exception(f"AI Engine failed to start: {e}") + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/src/tests/tools/simap_ai_engine/ai_engine/ai_model/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py new file mode 100644 index 0000000000000000000000000000000000000000..ace48a1bfe4ace1a901f79b3d3155b2536ed8e2e --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/ai_processor.py @@ -0,0 +1,84 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +AI Model Processor module. +Provides AI/ML processing functionality for SLA analysis. +""" + +import logging +from datetime import datetime, UTC +from random import Random +from typing import Any, Dict + +from .sla_policy import SLAPolicyConfig + +LOGGER = logging.getLogger(__name__) + + +class AIModelProcessor: + """ + Processes data through AI models for SLA analysis. + + This class implements the AI/ML processing logic to analyze device + and performance data against SLA policies, detecting violations and + generating recommendations. + """ + + def __init__(self) -> None: + """ + Initialize the AIModelProcessor. + + Loads AI models and prepares the processor for data analysis. + """ + LOGGER.info("AIModelProcessor initialized") + # TODO: Load AI/ML models here + # Example: self.model = load_model('sla_violation_detector.h5') + + def process_data( + self, + device_data: Dict[str, Any], + performance_data: Dict[str, Any], + sla_policy: SLAPolicyConfig + ) -> Dict[str, Any]: + """ + Process device and performance data through AI models. + + Analyzes the collected data against the SLA policy thresholds, + detects violations, and generates recommendations for remediation. + + Args: + device_data: Device and topology data from SIMAP. + performance_data: Performance metrics from InfluxDB. + sla_policy: The SLA policy configuration with thresholds. + + Returns: + Dictionary containing: + - 'violations': List of detected SLA violations with details. + - 'recommendations': List of recommended actions. + - 'summary': Dictionary with analysis summary statistics. + """ + LOGGER.debug("Processing data through AI models") + # TODO: Implement actual AI/ML processing logic + # Example: + # violations = self.model.predict(features) + # recommendations = self.generate_recommendations(violations) + return { + 'violations': [], + 'confidence_scores': round(Random().random(), 2), + 'summary': { + 'sla_policy': sla_policy.to_dict(), + 'timestamp': datetime.now(UTC).isoformat() + } + } diff --git a/src/tests/tools/simap_ai_engine/ai_engine/ai_model/sla_policy.py b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/sla_policy.py new file mode 100644 index 0000000000000000000000000000000000000000..478725823e5812d156efe91bb0a99a3373cb93c5 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/ai_model/sla_policy.py @@ -0,0 +1,84 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SLA Policy Configuration dataclass. + +Defines the structure for SLA policy configurations used in AI analysis. +""" + +from dataclasses import asdict, dataclass +from typing import Any, Dict + + +@dataclass +class SLAPolicyConfig: + """ + Configuration for an SLA policy to be analyzed. + + Attributes: + simap_id: Unique identifier for the SIMAP entity. + latency_threshold_ms: Maximum acceptable latency in milliseconds. + bandwidth_utilization_threshold_pct: Maximum acceptable bandwidth + utilization as a percentage (0-100). + time_window_seconds: Time window in seconds for data analysis. + """ + simap_id: str + latency_threshold_ms: float + bandwidth_utilization_threshold_pct: float + time_window_seconds: int + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'SLAPolicyConfig': + """ + Create an SLAPolicyConfig instance from a dictionary. + + Args: + data: Dictionary containing the SLA policy configuration fields. + Required keys: 'simap_id', 'latency_threshold_ms', + 'bandwidth_utilization_threshold_pct', 'time_window_seconds'. + Supports nested 'sla_metrics' structure. + + Returns: + A new SLAPolicyConfig instance. + + Raises: + KeyError: If a required field is missing from the data dictionary. + TypeError: If a field has an invalid type. + ValueError: If a field has an invalid value. + """ + try: + simap_id = str(data['simap_id']) + metrics = data['sla_metrics'] + latency_threshold_ms = float(metrics['latency_threshold_ms']) + bandwidth_threshold = float(metrics.get('bandwidth_utilization_threshold_pct')) + time_window = int(data.get('window_size_sec', 300)) + + return cls( + simap_id = simap_id, + latency_threshold_ms = latency_threshold_ms, + bandwidth_utilization_threshold_pct = bandwidth_threshold, + time_window_seconds = time_window + ) + except KeyError as e: + raise KeyError(f"Missing required field: {e.args[0]}") from e + + def to_dict(self) -> Dict[str, Any]: + """ + Convert the SLAPolicyConfig to a dictionary. + + Returns: + Dictionary representation of the SLA policy configuration. + """ + return asdict(self) diff --git a/src/tests/tools/simap_ai_engine/ai_engine/api/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/api/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py b/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py new file mode 100644 index 0000000000000000000000000000000000000000..062151f7e841b7e977dc66d7411bc92a3313f46d --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/api/api_blueprint.py @@ -0,0 +1,328 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Flask Blueprint for AI Engine REST API. + +Defines the REST API endpoints for the AI Engine. +""" + +import logging +from datetime import datetime, UTC + + +from flask import Blueprint, jsonify, request + +from ..config import Config +from ..ai_model.ai_processor import AIModelProcessor +from ..clients.decision_client import DecisionEngineClient +from ..clients.influxdb_fetcher import InfluxDBFetcher +from ..clients.simap_fetcher import SimapDataFetcher +from ..ai_model.sla_policy import SLAPolicyConfig + +LOGGER = logging.getLogger(__name__) + + +def create_ai_engine_blueprint( + simap_fetcher: SimapDataFetcher, + influxdb_fetcher: InfluxDBFetcher, + ai_processor: AIModelProcessor, + decision_client: DecisionEngineClient +) -> Blueprint: + """ + Create the Flask Blueprint for the AI Engine REST API. + + This function creates and configures a Flask Blueprint with all the + REST API endpoints for the AI Engine. + + Args: + simap_fetcher: Initialized SimapDataFetcher instance. + influxdb_fetcher: Initialized InfluxDBFetcher instance. + ai_processor: Initialized AIModelProcessor instance. + decision_client: Initialized DecisionEngineClient instance. + + Returns: + Configured Flask Blueprint with routes: + - POST /api/v1/analyze: Run SLA policy analysis + - GET /api/v1/health: Health check endpoint + - GET /api/v1/config: Get current configuration + """ + blueprint = Blueprint('ai_engine', __name__, url_prefix='/api/v1') + + @blueprint.route('/analyze', methods=['POST']) + def analyze(): + """ + Run SLA policy analysis. + + Expects JSON payload with SLA policy configuration. + Orchestrates the full analysis workflow: fetch data from SIMAP, + fetch metrics from InfluxDB, process through AI models, and + send results to Decision Engine. + + Returns: + JSON response with analysis results or error message. + """ + LOGGER.info("Received analysis request") + + # Parse and validate request JSON + try: + data = request.get_json() + if data is None: + LOGGER.error("Request body is empty or not valid JSON") + return jsonify({ + 'status': 'error', + 'message': 'Request body must be valid JSON' + }), 400 + except Exception as e: + LOGGER.error(f"Failed to parse request JSON: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Invalid JSON: {str(e)}' + }), 400 + + # Validate and create SLAPolicyConfig + try: + sla_policy = SLAPolicyConfig.from_dict(data) + LOGGER.info(f"Processing SLA policy for SIMAP ID: {sla_policy.simap_id}") + except KeyError as e: + LOGGER.error(f"Missing required field in request: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Missing required field: {str(e)}' + }), 400 + except (TypeError, ValueError) as e: + LOGGER.error(f"Invalid field value in request: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Invalid field value: {str(e)}' + }), 400 + + # Execute analysis workflow + try: + # Step 1: Fetch device data from SIMAP + LOGGER.debug("Step 1: Fetching device data from SIMAP") + device_data = simap_fetcher.fetch_device_data(sla_policy) + + # Step 2: Fetch performance data from InfluxDB + LOGGER.debug("Step 2: Fetching performance data from InfluxDB") + performance_data = influxdb_fetcher.fetch_performance_data( + sla_policy, device_data + ) + + # Step 3: Process data through AI models + LOGGER.debug("Step 3: Processing data through AI models") + results = ai_processor.process_data( + device_data, performance_data, sla_policy + ) + + # Step 4: Send results to Decision Engine + LOGGER.debug("Step 4: Sending results to Decision Engine") + if not decision_client.send_results(results): + LOGGER.error("Failed to send results to Decision Engine") + return jsonify({ + 'status': 'error', + 'message': 'Failed to send results to Decision Engine' + }), 500 + + LOGGER.info("Analysis completed successfully") + return jsonify({ + 'status': 'success', + 'data': results, + 'message': 'Analysis completed successfully' + }), 200 + + except Exception as e: + # Check if this is a retry failure (service unavailable) + error_msg = str(e) + if 'Giving up' in error_msg or 'unavailable' in error_msg.lower(): + LOGGER.error(f"External service unavailable: {e}") + return jsonify({ + 'status': 'error', + 'message': f'External service unavailable: {error_msg}' + }), 503 + else: + LOGGER.exception(f"Unexpected error during analysis: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Internal server error: {error_msg}' + }), 500 + + @blueprint.route('/health', methods=['GET']) + def health(): + """ + Health check endpoint. + + Returns: + JSON response with service health status. + """ + LOGGER.debug("Health check requested") + return jsonify({ + 'status': 'healthy', + 'service': 'AI Engine', + 'timestamp': datetime.now(UTC).isoformat() + }), 200 + + @blueprint.route('/config', methods=['GET']) + def config(): + """ + Get current configuration. + + Returns: + JSON response with SIMAP and InfluxDB connection details. + """ + LOGGER.debug("Configuration requested") + return jsonify({ + 'simap': { + 'scheme': Config.SIMAP_DATASTORE_SCHEME, + 'address': Config.SIMAP_DATASTORE_ADDRESS, + 'port': Config.SIMAP_DATASTORE_PORT, + 'username': Config.SIMAP_DATASTORE_USERNAME, + 'password': Config.SIMAP_DATASTORE_PASSWORD + }, + 'influxdb': { + 'host': Config.INFLUXDB_HOST, + 'port': Config.INFLUXDB_PORT, + 'token': Config.INFLUXDB_TOKEN, + 'database': Config.INFLUXDB_DATABASE + }, + 'api': { + 'host': Config.AI_ENGINE_REST_HOST, + 'port': Config.AI_ENGINE_REST_PORT + } + }), 200 + + @blueprint.route('/notify', methods=['POST']) + def notify(): + """ + Handle telemetry update notifications. + + Accepts status notifications (UPGRADE or DOWNGRADE) with optional + timestamp, validates the payload structure, and forwards to + InfluxDBFetcher for storage. + + Expected JSON payload: + { + "status": "UPGRADE" | "DOWNGRADE", # Required, uppercase only + "timestamp": "" # Optional + } + + Returns: + JSON response with success/error status. + """ + LOGGER.info("Received telemetry notification request") + + # Parse and validate request JSON + try: + data = request.get_json() + if data is None: + LOGGER.error("Request body is empty or not valid JSON") + return jsonify({ + 'status': 'error', + 'message': 'Request body must be valid JSON' + }), 400 + except Exception as e: + LOGGER.error(f"Failed to parse request JSON: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Invalid JSON: {str(e)}' + }), 400 + + # Validate payload structure + try: + # Check for unexpected keys + allowed_keys = {'status', 'timestamp'} + received_keys = set(data.keys()) + unexpected_keys = received_keys - allowed_keys + if unexpected_keys: + LOGGER.error(f"Unexpected keys in payload: {unexpected_keys}") + return jsonify({ + 'status': 'error', + 'message': f'Unexpected keys: {", ".join(unexpected_keys)}' + }), 400 + + # Check for required 'status' key + if 'status' not in data: + LOGGER.error("Missing required field: status") + return jsonify({ + 'status': 'error', + 'message': 'Missing required field: status' + }), 400 + + # Validate status value + status_value = data['status'] + if status_value not in {'UPGRADE', 'DOWNGRADE'}: + LOGGER.error(f"Invalid status value: {status_value}") + return jsonify({ + 'status': 'error', + 'message': 'Invalid status value: must be UPGRADE or DOWNGRADE' + }), 400 + + # Validate timestamp if present (accept any string) + if 'timestamp' in data and not isinstance(data['timestamp'], str): + LOGGER.error(f"Invalid timestamp type: {type(data['timestamp'])}") + return jsonify({ + 'status': 'error', + 'message': 'Timestamp must be a string' + }), 400 + + LOGGER.debug(f"Payload validation passed for status: {status_value}") + + except Exception as e: + LOGGER.error(f"Payload validation error: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Validation error: {str(e)}' + }), 400 + + # Forward to InfluxDBFetcher + try: + result = influxdb_fetcher.notify_telemetry_update(data) + if result: + LOGGER.info("Telemetry notification processed successfully") + return jsonify({ + 'status': 'success', + 'message': 'Notification processed successfully' + }), 200 + else: + LOGGER.error("InfluxDBFetcher returned False") + return jsonify({ + 'status': 'error', + 'message': 'Failed to process notification' + }), 500 + + except ValueError as e: + # Validation errors from InfluxDBFetcher + LOGGER.error(f"Validation error from InfluxDBFetcher: {e}") + return jsonify({ + 'status': 'error', + 'message': str(e) + }), 400 + + except Exception as e: + # Check if this is a retry failure (service unavailable) + error_msg = str(e) + if 'Giving up' in error_msg or 'unavailable' in error_msg.lower(): + LOGGER.error(f"InfluxDB unavailable: {e}") + return jsonify({ + 'status': 'error', + 'message': f'InfluxDB service unavailable: {error_msg}' + }), 503 + else: + LOGGER.exception(f"Unexpected error processing notification: {e}") + return jsonify({ + 'status': 'error', + 'message': f'Internal server error: {error_msg}' + }), 500 + + return blueprint diff --git a/src/tests/tools/simap_ai_engine/ai_engine/clients/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/clients/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/clients/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/tests/tools/simap_ai_engine/ai_engine/clients/decision_client.py b/src/tests/tools/simap_ai_engine/ai_engine/clients/decision_client.py new file mode 100644 index 0000000000000000000000000000000000000000..51eaa676b257bae6c6c9c406f9a91b22a36be60f --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/clients/decision_client.py @@ -0,0 +1,71 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Decision Engine Client module. + +Provides client functionality for sending analysis results to the Decision Engine. +""" + +import json +import logging +from typing import Any, Dict + +LOGGER = logging.getLogger(__name__) + + +class DecisionEngineClient: + """ + Client for sending analysis results to the Decision Engine. + + This class handles communication with the downstream Decision Engine + service that acts on AI analysis results. + """ + + def __init__(self) -> None: + """ + Initialize the DecisionEngineClient. + + Sets up the connection parameters for the Decision Engine service. + """ + LOGGER.info("DecisionEngineClient initialized") + # TODO: Configure Decision Engine connection parameters + # Example: self.decision_engine_url = get_setting('DECISION_ENGINE_URL') + + def send_results(self, results: Dict[str, Any]) -> bool: + """ + Send analysis results to the Decision Engine. + + Transmits the AI analysis results to the Decision Engine for + action execution. This is currently a placeholder implementation + that logs the results to stdout. + + Args: + results: Dictionary containing analysis results including + violations, recommendations, and summary. + + Returns: + True if results were successfully sent, False otherwise. + + Note: + This is a placeholder implementation. The actual implementation + should send results to a Decision Engine service via gRPC or REST. + """ + LOGGER.info("Sending results to Decision Engine") + try: + print(json.dumps(results, indent=2)) + return True + except Exception as e: + LOGGER.error(f"Failed to send results to Decision Engine: {e}") + return False diff --git a/src/tests/tools/simap_ai_engine/ai_engine/clients/influxdb_fetcher.py b/src/tests/tools/simap_ai_engine/ai_engine/clients/influxdb_fetcher.py new file mode 100644 index 0000000000000000000000000000000000000000..d7c05cc28852e54a75b25b8550622b440f7df081 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/clients/influxdb_fetcher.py @@ -0,0 +1,177 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +InfluxDB Fetcher module. + +Provides functionality to fetch performance metrics from InfluxDB. +""" + +import logging +from datetime import datetime +from typing import Any, Dict + +from common.tools.client.RetryDecorator import delay_exponential, retry + +from ..ai_model.sla_policy import SLAPolicyConfig + +LOGGER = logging.getLogger(__name__) + +# Retry decorator for external service calls +RETRY_DECORATOR = retry( + max_retries=15, + delay_function=delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +) + + +class InfluxDBFetcher: + """ + Fetches performance metrics from InfluxDB. + + This class handles communication with InfluxDB to retrieve time-series + performance data for network devices and services. + """ + + def __init__( + self, + influxdb_host: str, + influxdb_port: int, + influxdb_token: str, + influxdb_database: str + ) -> None: + """ + Initialize the InfluxDBFetcher. + + Args: + influxdb_host: InfluxDB server hostname or IP address. + influxdb_port: InfluxDB server port number. + influxdb_token: Authentication token for InfluxDB. + influxdb_database: Name of the InfluxDB database to query. + """ + self.influxdb_host = influxdb_host + self.influxdb_port = influxdb_port + self.influxdb_token = influxdb_token + self.influxdb_database = influxdb_database + LOGGER.info( + f"InfluxDBFetcher initialized for database '{influxdb_database}' " + f"at {influxdb_host}:{influxdb_port}" + ) + + @RETRY_DECORATOR + def fetch_performance_data( + self, + sla_policy: SLAPolicyConfig, + device_data: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Fetch performance metrics from InfluxDB. + + Queries InfluxDB for time-series performance data based on the + SLA policy parameters and device information. The retry decorator + ensures resilience against transient failures. + + Args: + sla_policy: The SLA policy configuration containing time window + and threshold parameters. + device_data: Device and topology data from SIMAP used to + identify which metrics to query. + + Returns: + Dictionary containing: + - 'metrics': List of performance metric records. + - 'timestamp_range': Dictionary with 'start' and 'end' + timestamps for the queried data. + + Raises: + Exception: If InfluxDB is unavailable after all retries, + or if the query fails. + """ + LOGGER.debug( + f"Fetching performance data for time window: " + f"{sla_policy.time_window_seconds}s" + ) + # TODO: Implement actual InfluxDB query + # Example implementation: + # from influxdb_client_3 import InfluxDBClient3 + # client = InfluxDBClient3( + # host=self.influxdb_host, + # token=self.influxdb_token, + # database=self.influxdb_database + # ) + # query = f''' + # SELECT * FROM metrics + # WHERE time >= now() - {sla_policy.time_window_seconds}s + # ''' + # return client.query(query) + return { + 'metrics': [], + 'timestamp_range': { + 'start': datetime.utcnow().isoformat(), + 'end': datetime.utcnow().isoformat() + } + } + + @RETRY_DECORATOR + def notify_telemetry_update( + self, + notification_data: Dict[str, Any] + ) -> bool: + """ + Process telemetry update notifications. + + Validates and stores status notifications (UPGRADE or DOWNGRADE) + in InfluxDB for telemetry tracking. The retry decorator ensures + resilience against transient failures. + + Args: + notification_data: Dictionary containing: + - 'status': Required. Must be "UPGRADE" or "DOWNGRADE". + - 'timestamp': Optional. Any string value. + + Returns: + True if notification was processed and stored successfully. + + Raises: + ValueError: If status is not "UPGRADE" or "DOWNGRADE". + Exception: If InfluxDB is unavailable after all retries. + """ + status = notification_data.get('status') + timestamp = notification_data.get('timestamp', 'N/A') + + # Validate status value + if status not in {'UPGRADE', 'DOWNGRADE'}: + raise ValueError( + f"Invalid status value '{status}': must be UPGRADE or DOWNGRADE" + ) + + LOGGER.info( + f"Storing telemetry notification in InfluxDB: " + f"status={status}, timestamp={timestamp}" + ) + + # TODO: Implement actual InfluxDB write + # Example implementation: + # from influxdb_client_3 import InfluxDBClient3, Point + # client = InfluxDBClient3( + # host=self.influxdb_host, + # token=self.influxdb_token, + # database=self.influxdb_database + # ) + # point = Point("telemetry_notifications") \ + # .tag("status", status) \ + # .field("timestamp", timestamp) + # client.write(point) + + LOGGER.info("Telemetry notification stored successfully in InfluxDB") + return True diff --git a/src/tests/tools/simap_ai_engine/ai_engine/clients/simap_fetcher.py b/src/tests/tools/simap_ai_engine/ai_engine/clients/simap_fetcher.py new file mode 100644 index 0000000000000000000000000000000000000000..ac0e9d086165f3a34342a320a22140fc8b2d03de --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/clients/simap_fetcher.py @@ -0,0 +1,105 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SIMAP Data Fetcher module. + +Provides functionality to fetch device and topology data from the SIMAP server. +""" + +import logging +from typing import Any, Dict + +from common.tools.client.RetryDecorator import delay_exponential, retry + +from ..ai_model.sla_policy import SLAPolicyConfig + +LOGGER = logging.getLogger(__name__) + +# Retry decorator for external service calls +RETRY_DECORATOR = retry( + max_retries=5, + delay_function=delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +) + + +class SimapDataFetcher: + """ + Fetches device and topology data from the SIMAP server. + + This class handles communication with the SIMAP server to retrieve + device configurations and network topology information needed for + SLA policy analysis. + """ + + def __init__( + self, + simap_scheme: str, + simap_address: str, + simap_port: int, + simap_username: str, + simap_password: str + ) -> None: + """ + Initialize the SimapDataFetcher. + + Args: + simap_scheme: URL scheme for SIMAP server (http/https). + simap_address: SIMAP server hostname or IP address. + simap_port: SIMAP server port number. + simap_username: Username for SIMAP authentication. + simap_password: Password for SIMAP authentication. + """ + self.simap_scheme = simap_scheme + self.simap_address = simap_address + self.simap_port = simap_port + self.simap_username = simap_username + self.simap_password = simap_password + self.base_url = f"{simap_scheme}://{simap_address}:{simap_port}" + LOGGER.info(f"SimapDataFetcher initialized with base URL: {self.base_url}") + + @RETRY_DECORATOR + def fetch_device_data(self, sla_policy: SLAPolicyConfig) -> Dict[str, Any]: + """ + Fetch device and topology data from the SIMAP server. + + Communicates with the SIMAP server to retrieve device configurations + and network topology information relevant to the given SLA policy. + The retry decorator ensures resilience against transient failures. + + Args: + sla_policy: The SLA policy configuration containing the SIMAP ID + and parameters for data retrieval. + + Returns: + Dictionary containing: + - 'devices': List of device configurations. + - 'topology': Network topology information. + + Raises: + Exception: If the SIMAP server is unavailable after all retries, + or if the response is invalid. + """ + LOGGER.debug(f"Fetching device data for SIMAP ID: {sla_policy.simap_id}") + # TODO: Implement actual SIMAP server communication + # Example implementation: + # url = f"{self.base_url}/api/devices/{sla_policy.simap_id}" + # response = requests.get(url, auth=(self.simap_username, self.simap_password)) + # response.raise_for_status() + # return response.json() + return { + 'devices': [], + 'topology': {}, + 'simap_id': sla_policy.simap_id + } diff --git a/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py b/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py new file mode 100644 index 0000000000000000000000000000000000000000..2f7569fa460c40bc26e073f1478007725cbf767a --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/config/Config.py @@ -0,0 +1,39 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Configuration module for the AI Engine. + +This module defines environment variables for SIMAP server, InfluxDB, +and REST API configuration using the TFS get_setting pattern. +""" + +from common.Settings import get_setting + +# SIMAP Datastore Configuration +SIMAP_DATASTORE_SCHEME = get_setting('SIMAP_DATASTORE_SCHEME', default='http') +SIMAP_DATASTORE_ADDRESS = get_setting('SIMAP_DATASTORE_ADDRESS', default='0.0.0.0') +SIMAP_DATASTORE_PORT = int(get_setting('SIMAP_DATASTORE_PORT', default='80')) +SIMAP_DATASTORE_USERNAME = get_setting('SIMAP_DATASTORE_USERNAME', default='admin') +SIMAP_DATASTORE_PASSWORD = get_setting('SIMAP_DATASTORE_PASSWORD', default='admin') + +# InfluxDB Configuration +INFLUXDB_HOST = get_setting('INFLUXDB_HOST', default='localhost') +INFLUXDB_PORT = int(get_setting('INFLUXDB_PORT', default='8181')) +INFLUXDB_TOKEN = get_setting('INFLUXDB_TOKEN', default='') +INFLUXDB_DATABASE = get_setting('INFLUXDB_DATABASE', default='simap_telemetry') + +# AI Engine REST API Configuration +AI_ENGINE_REST_HOST = get_setting('AI_ENGINE_REST_HOST', default='0.0.0.0') +AI_ENGINE_REST_PORT = int(get_setting('AI_ENGINE_REST_PORT', default='8080')) diff --git a/src/tests/tools/simap_ai_engine/ai_engine/config/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/config/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..7363515f07a52d996229bcbd72932ce1423258d7 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/config/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/tests/tools/simap_ai_engine/ai_engine/engine.py b/src/tests/tools/simap_ai_engine/ai_engine/engine.py new file mode 100644 index 0000000000000000000000000000000000000000..17e9db74b5bc49e889ae39ec8a067fe9496a4797 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/engine.py @@ -0,0 +1,109 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +AI Engine orchestrator. + +Main class that initializes and coordinates all AI Engine components. +""" + +import logging + +from flask import Flask + +from .config import Config +from .api.api_blueprint import create_ai_engine_blueprint +from .clients.decision_client import DecisionEngineClient +from .clients.influxdb_fetcher import InfluxDBFetcher +from .clients.simap_fetcher import SimapDataFetcher +from .ai_model.ai_processor import AIModelProcessor + +LOGGER = logging.getLogger(__name__) + + +class AIEngineAPI: + """ + Main orchestrator for the AI Engine REST API. + + This class initializes all components and manages the Flask application + lifecycle. + """ + + def __init__(self) -> None: + """ + Initialize the AI Engine API. + + Creates instances of all required components (fetchers, processor, + client) and configures the Flask application with the API blueprint. + """ + LOGGER.info("Initializing AI Engine API") + + # Initialize components + self.simap_fetcher = SimapDataFetcher( + simap_scheme = Config.SIMAP_DATASTORE_SCHEME, + simap_address = Config.SIMAP_DATASTORE_ADDRESS, + simap_port = Config.SIMAP_DATASTORE_PORT, + simap_username = Config.SIMAP_DATASTORE_USERNAME, + simap_password = Config.SIMAP_DATASTORE_PASSWORD + ) + + self.influxdb_fetcher = InfluxDBFetcher( + influxdb_host = Config.INFLUXDB_HOST, + influxdb_port = Config.INFLUXDB_PORT, + influxdb_token = Config.INFLUXDB_TOKEN, + influxdb_database = Config.INFLUXDB_DATABASE + ) + + self.ai_processor = AIModelProcessor() + self.decision_client = DecisionEngineClient() + + # Create Flask application + self.app = self.create_app() + + def create_app(self) -> Flask: + """ + Create and configure the Flask application. + + Returns: + Configured Flask application instance. + """ + app = Flask(__name__) + + # Register the AI Engine blueprint + blueprint = create_ai_engine_blueprint( + simap_fetcher = self.simap_fetcher, + influxdb_fetcher = self.influxdb_fetcher, + ai_processor = self.ai_processor, + decision_client = self.decision_client + ) + app.register_blueprint(blueprint) + + LOGGER.info("Flask application created and blueprint registered") + return app + + def run(self) -> None: + """ + Run the Flask application. + + Starts the Flask development server with the configured host and port. + """ + LOGGER.info( + f"Starting AI Engine API on " + f"{Config.AI_ENGINE_REST_HOST}:{Config.AI_ENGINE_REST_PORT}" + ) + self.app.run( + host=Config.AI_ENGINE_REST_HOST, + port=Config.AI_ENGINE_REST_PORT, + debug=False + ) diff --git a/src/tests/tools/simap_server/deploy.sh b/src/tests/tools/simap_ai_engine/ai_engine/requirements.in old mode 100755 new mode 100644 similarity index 73% rename from src/tests/tools/simap_server/deploy.sh rename to src/tests/tools/simap_ai_engine/ai_engine/requirements.in index a30b4bb1b6091bfc854c85080ca77060c288a056..a7a19a0120fbdddbcbaeb0e19ef6f459ddb06225 --- a/src/tests/tools/simap_server/deploy.sh +++ b/src/tests/tools/simap_ai_engine/ai_engine/requirements.in @@ -1,4 +1,3 @@ -#!/bin/bash # Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,20 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - -# Cleanup -docker rm --force simap-server - - -# Create SIMAP Server -docker run --detach --name simap-server --publish 8080:8080 simap-server:test - - -sleep 2 - - -# Dump SIMAP Server container -docker ps -a - - -echo "Bye!" +# AI Engine dependencies +flask>=2.3.0 +requests>=2.31.0 +influxdb3-python>=0.8.0 diff --git a/src/tests/tools/simap_ai_engine/ai_engine/tests/__init__.py b/src/tests/tools/simap_ai_engine/ai_engine/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..0df19f477db1724aa62c801b3f27fb1a477ba6ee --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/tests/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for AI Engine module.""" diff --git a/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py b/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py new file mode 100644 index 0000000000000000000000000000000000000000..4a8d5b6e70c9b35185d96c7c02f1a5a28f3eaa04 --- /dev/null +++ b/src/tests/tools/simap_ai_engine/ai_engine/tests/test_api.py @@ -0,0 +1,194 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test suite for AI Engine REST API. + +This module tests the /api/v1/config endpoint sending HTTP requests. +""" + +import logging + +import pytest +import requests + +# Configure logging for tests +logging.basicConfig( + level=logging.DEBUG, + format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s" +) +LOGGER = logging.getLogger(__name__) + +# Test server configuration +TEST_HOST = '127.0.0.1' +TEST_PORT = 8084 # 18080 port for manual testing + +BASE_URL = f'http://{TEST_HOST}:{TEST_PORT}' + + +def test_config_endpoint(): +# def test_config_endpoint(ai_engine_server): + """ + Test GET /api/v1/config returns valid configuration. + + Validates that the config endpoint returns: + - HTTP 200 status code + - JSON response with 'simap', 'influxdb', and 'api' sections + - Expected configuration values + """ + LOGGER.info(">>>>>> Starting test_case test_config_endpoint: /api/v1/config endpoint") + # Send request to config endpoint + response = requests.get(f'{BASE_URL}/api/v1/config', timeout=5) + + # Validate response status + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + # Parse JSON response + data = response.json() + LOGGER.info(f"Config response: {data}") + + # Validate response structure + assert 'simap' in data, "Response missing 'simap' section" + assert 'influxdb' in data, "Response missing 'influxdb' section" + assert 'api' in data, "Response missing 'api' section" + + # Validate SIMAP configuration fields + simap = data['simap'] + assert 'scheme' in simap, "SIMAP config missing 'scheme'" + assert 'address' in simap, "SIMAP config missing 'address'" + assert 'port' in simap, "SIMAP config missing 'port'" + assert 'username' in simap, "SIMAP config missing 'username'" + assert 'password' in simap, "SIMAP config missing 'password'" + + # Validate InfluxDB configuration fields + influxdb = data['influxdb'] + assert 'host' in influxdb, "InfluxDB config missing 'host'" + assert 'port' in influxdb, "InfluxDB config missing 'port'" + assert 'token' in influxdb, "InfluxDB config missing 'token'" + assert 'database' in influxdb, "InfluxDB config missing 'database'" + + # Validate API configuration fields + api = data['api'] + assert 'host' in api, "API config missing 'host'" + assert 'port' in api, "API config missing 'port'" + assert api['host'] == '0.0.0.0', f"Expected host {TEST_HOST}, got {api['host']}" + assert api['port'] == 8080, f"Expected port {8080}, got {api['port']}" + + LOGGER.info("Config endpoint test passed!") + LOGGER.info("<<<<<< Finished test_case test_config_endpoint") + + +def test_analyze_endpoint(): + """ + Test POST /api/v1/analyze endpoint. + + Validates that the analyze endpoint: + - Accepts valid SLA policy JSON payload + - Returns appropriate status codes (200 for success, 503 for service unavailable) + - Returns JSON response with status and message fields + """ + LOGGER.info(">>>>>> Starting test_case test_analyze_endpoint: POST /api/v1/analyze endpoint") + + # Prepare test payload with SLA policy configuration + payload = { + "simap_id": "test-slice-123", + "sla_metrics": { + "latency_threshold_ms": 10, + "bandwidth_utilization_threshold_pct": 80.0 + }, + "window_size_sec": 300 + } + + LOGGER.info(f"Sending analyze request with payload: {payload}") + + # Send POST request to analyze endpoint + response = requests.post( + f'{BASE_URL}/api/v1/analyze', + json=payload, + timeout=10 + ) + + LOGGER.info(f"Analyze response status: {response.status_code}") + + # Parse JSON response + data = response.json() + LOGGER.info(f"Analyze response body: {data}") + + # Validate response structure + assert 'status' in data, "Response missing 'status' field" + assert 'message' in data, "Response missing 'message' field" + + # Accept either success (200) or service unavailable (503) + # 503 is expected if SIMAP server or InfluxDB are not running + if response.status_code == 200: + LOGGER.info("Analysis completed successfully") + assert data['status'] == 'success', f"Expected status 'success', got '{data['status']}'" + assert 'data' in data, "Successful response missing 'data' field" + elif response.status_code == 503: + LOGGER.warning("External service unavailable (expected if SIMAP/InfluxDB not running)") + assert data['status'] == 'error', f"Expected status 'error' for 503, got '{data['status']}'" + elif response.status_code == 400: + LOGGER.error(f"Bad request: {data['message']}") + assert data['status'] == 'error', f"Expected status 'error' for 400, got '{data['status']}'" + else: + pytest.fail(f"Unexpected status code: {response.status_code}") + + LOGGER.info("Analyze endpoint test passed!") + LOGGER.info("<<<<<< Finished test_case test_analyze_endpoint") + + +def test_notify_endpoint(): + """ + Test POST /api/v1/notify endpoint with valid status-only payload. + + Validates that the notify endpoint: + - Accepts valid notification payload with only "status" key + - Returns appropriate status codes (200 for success, 503 for service unavailable) + - Returns JSON response with status and message fields + """ + LOGGER.info(">>>>>> Starting test_case test_notify_endpoint: POST /api/v1/notify endpoint") + + payload = {"status": "UPGRADE"} + LOGGER.info(f"Sending notify request with payload: {payload}") + + # Send POST request to notify endpoint + response = requests.post( + f'{BASE_URL}/api/v1/notify', + json=payload, + timeout=10 + ) + + LOGGER.info(f"Notify response status: {response.status_code}") + + # Parse JSON response + data = response.json() + LOGGER.info(f"Notify response body: {data}") + + # Validate response structure + assert 'status' in data, "Response missing 'status' field" + assert 'message' in data, "Response missing 'message' field" + + # Accept either success (200) or service unavailable (503) + # 503 is expected if InfluxDB is not running + if response.status_code == 200: + LOGGER.info("Notification processed successfully") + assert data['status'] == 'success', f"Expected status 'success', got '{data['status']}'" + elif response.status_code == 503: + LOGGER.warning("InfluxDB unavailable (expected if InfluxDB not running)") + assert data['status'] == 'error', f"Expected status 'error' for 503, got '{data['status']}'" + else: + pytest.fail(f"Unexpected status code: {response.status_code}") + + LOGGER.info("Notify endpoint test passed!") + LOGGER.info("<<<<<< Finished test_case test_notify_endpoint") diff --git a/src/tests/tools/simap_ai_engine/deploy_ai_engine.sh b/src/tests/tools/simap_ai_engine/deploy_ai_engine.sh new file mode 100755 index 0000000000000000000000000000000000000000..f1315bf562dac1335465434330bea64eca221c0a --- /dev/null +++ b/src/tests/tools/simap_ai_engine/deploy_ai_engine.sh @@ -0,0 +1,39 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" + +docker rm --force ai-engine 2>/dev/null || true + +echo "Building AI Engine..." +cd "${REPO_ROOT}" +docker buildx build -t ai-engine:latest -f ./src/tests/tools/simap_ai_engine/ai_engine/Dockerfile . + +echo "Deploying AI Engine..." +docker run --detach --name ai-engine \ + --publish 8084:8080 \ + --env SIMAP_DATASTORE_ADDRESS=172.17.0.1 \ + --env SIMAP_DATASTORE_PORT=8080 \ + --env SIMAP_DATASTORE_USERNAME=admin \ + --env SIMAP_DATASTORE_PASSWORD=admin \ + ai-engine:latest +# docker run --detach --name traffic-changer --publish 8083:8080 traffic-changer:mock + +sleep 2 +docker ps -a +echo "Deployment complete." diff --git a/src/tests/tools/simap_server/.gitlab-ci.yml b/src/tests/tools/simap_datastore/.gitlab-ci.yml similarity index 81% rename from src/tests/tools/simap_server/.gitlab-ci.yml rename to src/tests/tools/simap_datastore/.gitlab-ci.yml index 30c79a50addadd5827f0b450cffb418093fdaa4e..de652fe57ca02b9c4626d0b7bf35ed47a0874e9e 100644 --- a/src/tests/tools/simap_server/.gitlab-ci.yml +++ b/src/tests/tools/simap_datastore/.gitlab-ci.yml @@ -13,14 +13,14 @@ # limitations under the License. # Build, tag, and push the Docker image to the GitLab Docker registry -build simap_server: +build simap_datastore: stage: build before_script: - docker image prune --force - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY script: - - docker buildx build -t "$CI_REGISTRY_IMAGE/simap-server:test" -f ./src/tests/tools/simap_server/Dockerfile . - - docker push "$CI_REGISTRY_IMAGE/simap-server:test" + - docker buildx build -t "$CI_REGISTRY_IMAGE/simap-datastore:test" -f ./src/tests/tools/simap_datastore/Dockerfile . + - docker push "$CI_REGISTRY_IMAGE/simap-datastore:test" after_script: - docker image prune --force rules: @@ -29,8 +29,8 @@ build simap_server: - changes: - src/common/**/*.py - proto/*.proto - - src/src/tests/tools/simap_server/**/*.{py,in,yml,yaml,yang,sh,json} - - src/src/tests/tools/simap_server/Dockerfile + - src/src/tests/tools/simap_datastore/**/*.{py,in,yml,yaml,yang,sh,json} + - src/src/tests/tools/simap_datastore/Dockerfile - src/src/tests/.gitlab-ci.yml #- src/device/**/*.{py,in,yml} #- src/device/Dockerfile diff --git a/src/tests/tools/simap_server/Dockerfile b/src/tests/tools/simap_datastore/Dockerfile similarity index 73% rename from src/tests/tools/simap_server/Dockerfile rename to src/tests/tools/simap_datastore/Dockerfile index c6bb56f431f40feefd4e49f0865fa0f92d38c2be..2d02adff6dc93211a83483f89bf09cfaac96053b 100644 --- a/src/tests/tools/simap_server/Dockerfile +++ b/src/tests/tools/simap_datastore/Dockerfile @@ -43,18 +43,29 @@ RUN python3 -m pip install --upgrade 'setuptools==79.0.0' 'wheel==0.45.1' RUN python3 -m pip install --upgrade 'pip-tools==7.3.0' # Create component sub-folders, get specific Python packages -RUN mkdir -p /var/teraflow/simap_server/ -WORKDIR /var/teraflow/simap_server/ +RUN mkdir -p /var/teraflow/ +WORKDIR /var/teraflow/ COPY src/common/tools/rest_conf/server/requirements.in ./requirements.in RUN pip-compile --quiet --output-file=requirements.txt requirements.in RUN python3 -m pip install -r requirements.txt # Add component files into working directory -COPY src/common/tools/rest_conf/server/restconf_server/ ./simap_server/ -COPY src/tests/tools/simap_server/yang/*.yang ./yang/ -COPY src/tests/tools/simap_server/startup.json ./startup.json +RUN mkdir -p /var/teraflow/common/tools/ +WORKDIR /var/teraflow/ +COPY src/common/tools/rest_api/ ./common/tools/rest_api/ +COPY src/common/tools/rest_conf/ ./common/tools/rest_conf/ -# Configure RESTCONF Server +# Get generic Specific packages +COPY src/tests/tools/simap_datastore/requirements.in ./requirements2.in +RUN pip-compile --quiet --output-file=requirements2.txt requirements2.in +RUN python3 -m pip install -r requirements2.txt + +# Add code files +COPY src/tests/tools/simap_datastore/simap_datastore/*.py ./simap_datastore/ +COPY src/tests/tools/simap_datastore/yang/. ./yang/ +COPY src/tests/tools/simap_datastore/startup.json ./startup.json + +# Configure RESTCONF Datastore ENV RESTCONF_PREFIX="/restconf" ENV YANG_SEARCH_PATH="./yang" ENV STARTUP_FILE="./startup.json" @@ -63,4 +74,4 @@ ENV STARTUP_FILE="./startup.json" ENV FLASK_ENV="production" # Start the service -ENTRYPOINT ["gunicorn", "--workers", "1", "--worker-class", "eventlet", "--bind", "0.0.0.0:8080", "simap_server.app:app"] +ENTRYPOINT ["gunicorn", "--workers", "1", "--worker-class", "eventlet", "--bind", "0.0.0.0:8080", "simap_datastore.app:app"] diff --git a/src/tests/tools/simap_datastore/README.md b/src/tests/tools/simap_datastore/README.md new file mode 100644 index 0000000000000000000000000000000000000000..13cf01d4388100f66f7d3a4c7c2353b247afcb5c --- /dev/null +++ b/src/tests/tools/simap_datastore/README.md @@ -0,0 +1,25 @@ +# RESTCONF/SIMAP Datastore + +This component implements a basic RESTCONF datastore that can load, potentially, any YANG data model. +In this case, it is prepared to load a SIMAP datastore based on IETF Network Topology + custom SIMAP Telemetry extensions. + + +## Build the RESTCONF/SIMAP Datastore Docker image +```bash +./build.sh +``` + +## Deploy the RESTCONF/SIMAP Datastore +```bash +./deploy.sh +``` + +## Run the RESTCONF/SIMAP Client for testing: +```bash +./run_client.sh +``` + +## Destroy the RESTCONF/SIMAP Datastore +```bash +./destroy.sh +``` diff --git a/src/tests/tools/simap_server/build.sh b/src/tests/tools/simap_datastore/build.sh similarity index 74% rename from src/tests/tools/simap_server/build.sh rename to src/tests/tools/simap_datastore/build.sh index 7ec0e0c917cd9870ae4f01fa81519b99a6bfa271..213d253c6a7611d5b1e335b295bc3fbc0bcf4594 100755 --- a/src/tests/tools/simap_server/build.sh +++ b/src/tests/tools/simap_datastore/build.sh @@ -16,7 +16,7 @@ # Make folder containing the script the root folder for its execution cd $(dirname $0)/../../../../ -# Build image SIMAP Server -docker buildx build -t simap-server:test -f ./src/tests/tools/simap_server/Dockerfile . -#docker tag simap-server:test localhost:32000/tfs/simap-server:test -#docker push localhost:32000/tfs/simap-server:test +# Build image SIMAP Datastore +docker buildx build -t simap-datastore:test -f ./src/tests/tools/simap_datastore/Dockerfile . +#docker tag simap-datastore:test localhost:32000/tfs/simap-datastore:test +#docker push localhost:32000/tfs/simap-datastore:test diff --git a/src/tests/tools/simap_datastore/deploy.sh b/src/tests/tools/simap_datastore/deploy.sh new file mode 100755 index 0000000000000000000000000000000000000000..bb15461ffb3dad12e2cfe193936a2ecb37ff8f83 --- /dev/null +++ b/src/tests/tools/simap_datastore/deploy.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Cleanup +docker rm --force simap-datastore + + +# Create SIMAP Datastore with InfluxDB configuration +# INFLUXDB_HOST points to the host machine where InfluxDB is running +# Use --add-host to make the host accessible from inside the container +docker run --detach --name simap-datastore \ + --publish 8080:8080 \ + --add-host=host.docker.internal:host-gateway \ + --env INFLUXDB_HOST="host.docker.internal" \ + --env INFLUXDB_PORT=8181 \ + --env INFLUXDB_DATABASE=simap_telemetry \ + simap-datastore:test + + +sleep 2 + + +# Dump SIMAP Datastore container +docker ps -a + + +echo "Bye!" diff --git a/src/tests/tools/simap_server/destroy.sh b/src/tests/tools/simap_datastore/destroy.sh similarity index 95% rename from src/tests/tools/simap_server/destroy.sh rename to src/tests/tools/simap_datastore/destroy.sh index 51edb6bca0696ae4a25328d9149cae30add57b33..251363ffdd89546fb21335a6d26d10e434253441 100755 --- a/src/tests/tools/simap_server/destroy.sh +++ b/src/tests/tools/simap_datastore/destroy.sh @@ -15,7 +15,7 @@ # Cleanup -docker rm --force simap-server +docker rm --force simap-datastore # Dump Docker containers diff --git a/src/tests/tools/simap_datastore/requirements.in b/src/tests/tools/simap_datastore/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..c5731b66c7e007647daf30edd08de4e16c1c9d05 --- /dev/null +++ b/src/tests/tools/simap_datastore/requirements.in @@ -0,0 +1,15 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +influxdb3-python==0.16.0 diff --git a/src/tests/tools/simap_server/run_client.sh b/src/tests/tools/simap_datastore/run_client.sh similarity index 93% rename from src/tests/tools/simap_server/run_client.sh rename to src/tests/tools/simap_datastore/run_client.sh index 76aced85547c4fb2d884b69e9e1a45a52cf469b5..c8b614838b4643afcbb3f06dcaf40e4fbc427b77 100755 --- a/src/tests/tools/simap_server/run_client.sh +++ b/src/tests/tools/simap_datastore/run_client.sh @@ -16,4 +16,4 @@ # Make folder containing the script the root folder for its execution cd $(dirname $0)/../../../ -python -m tests.tools.simap_server.simap_client +python -m tests.tools.simap_datastore.simap_client diff --git a/src/tests/tools/simap_server/simap_client/SimapClient.py b/src/tests/tools/simap_datastore/simap_client/SimapClient.py similarity index 98% rename from src/tests/tools/simap_server/simap_client/SimapClient.py rename to src/tests/tools/simap_datastore/simap_client/SimapClient.py index 725b08bd47e0bd127cf0f7c4131cb744313b149d..7c4834511f0c93cb568be1e402a03e0b604f62cc 100644 --- a/src/tests/tools/simap_server/simap_client/SimapClient.py +++ b/src/tests/tools/simap_datastore/simap_client/SimapClient.py @@ -210,8 +210,8 @@ class LinkTelemetry: 'latency' : '{:.3f}'.format(latency), } if len(related_service_ids) > 0: telemetry['related-service-ids'] = related_service_ids - link = {'link-id': self._link_id, 'simap-telemetry:simap-telemetry': telemetry} - network = {'network-id': self._network_id, 'ietf-network-topology:link': [link]} + link = {'link-id': self._link_id, 'simap-telemetry:simap-telemetry': telemetry} + network = {'network-id': self._network_id, 'ietf-network-topology:link': [link]} payload = {'ietf-network:networks': {'network': [network]}} self._restconf_client.patch(endpoint, payload) diff --git a/src/tests/tools/simap_datastore/simap_client/SimapMetricsGenerator.py b/src/tests/tools/simap_datastore/simap_client/SimapMetricsGenerator.py new file mode 100644 index 0000000000000000000000000000000000000000..7fa5bcaeda42b1bdfe93880ac30e1a9c6db877e1 --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_client/SimapMetricsGenerator.py @@ -0,0 +1,173 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import math +import logging +from typing import Dict, List, Tuple + +LOGGER = logging.getLogger(__name__) + +# Congestion curve types +CURVE_LINEAR = 'linear' # x - steady increase +CURVE_EXPONENTIAL = 'exponential' # exp(x)-1 - slow start, rapid end +CURVE_LOGARITHMIC = 'logarithmic' # log(1+x) - fast start, plateau + +# Link profiles: (base_bw%, base_latency_ms, sensitivity, curve_type) +# - sensitivity: 1.0 = highly affected by load, 0.3 = minimally affected +# - curve_type: how congestion scales with load +LINK_PROFILES = { + 'L1' : (15.0, 1.0, 1.0, CURVE_EXPONENTIAL), + 'L3' : (10.0, 0.8, 0.7, CURVE_EXPONENTIAL), + 'L5' : ( 8.0, 0.3, 0.3, CURVE_LINEAR), + 'L9' : ( 8.0, 0.3, 0.3, CURVE_LINEAR), + 'L13': (12.0, 0.5, 0.5, CURVE_LOGARITHMIC), +} + +MAX_SERVICES = 5 + + +class SimapMetricsGenerator: + """ + Generates realistic SIMAP telemetry metrics based on service count. + Higher service counts cause non-linear congestion effects. + Access links are more sensitive to load than core links. + """ + + def __init__(self, service_count: int = 0): + LOGGER.info("Initiating SimapMetricsGenerator") + self._service_count = 0 + self._service_ids: Dict[str, List[str]] = { + 'te' : [], + 'trans' : [], + 'agg' : [], + 'e2e' : [], + } + self.set_service_count(service_count) + + @property + def service_count(self) -> int: + return self._service_count + + def set_service_count(self, count: int) -> None: + """Update service count and regenerate domain-specific service IDs.""" + if count < 0 or count > MAX_SERVICES: + raise ValueError(f"Service count must be 0-{MAX_SERVICES}, got {count}") + self._service_count = count + # Each domain has its own service IDs + self._service_ids = { + 'te' : [f'te-svc-{i+1}' for i in range(count)], + 'trans' : [f'trans-svc-{i+1}' for i in range(count)], + 'agg' : [f'agg-svc-{i+1}' for i in range(count)], + 'e2e' : [f'e2e-svc-{i+1}' for i in range(count)], + } + LOGGER.info(f"Service count set to {count}, IDs per domain: {self._service_ids}") + + def get_service_ids(self, domain: str = 'e2e') -> List[str]: + """Return current list of active service IDs for a specific domain.""" + if domain not in self._service_ids: + raise ValueError(f"Unknown domain: {domain}. Valid: {list(self._service_ids.keys())}") + return self._service_ids[domain].copy() + + def get_all_service_ids(self) -> Dict[str, List[str]]: + """Return all domain service IDs.""" + return {k: v.copy() for k, v in self._service_ids.items()} + + def _compute_congestion_factor(self, curve_type: str, load_ratio: float) -> float: + """ + Compute congestion factor based on curve type and load ratio (0-1). + """ + if curve_type == CURVE_LINEAR: + return load_ratio + elif curve_type == CURVE_EXPONENTIAL: + # Exponential: slow start, rapid increase at high load + return (math.exp(load_ratio * 2) - 1) / (math.e ** 2 - 1) + elif curve_type == CURVE_LOGARITHMIC: + # Logarithmic: fast initial increase, then plateau + return math.log1p(load_ratio * 2.7) / math.log1p(2.7) + else: + return load_ratio # Default to linear + + def generate_link_metrics(self, link_id: str) -> Tuple[float, float]: + """ + Generate BW and latency for a specific TE link using distinct congestion patterns. + Returns: + Tuple of (bandwidth_utilization%, latency_ms) + """ + if link_id not in LINK_PROFILES: + raise ValueError(f"Unknown link ID: {link_id}") + + base_bw, base_latency, sensitivity, curve_type = LINK_PROFILES[link_id] + + # Load ratio (0 to 1) + load_ratio = self._service_count / MAX_SERVICES + + # Compute congestion factor using link-specific curve + congestion_factor = self._compute_congestion_factor(curve_type, load_ratio) + + # Calculate base metrics with congestion + bw_utilization = base_bw + (congestion_factor * sensitivity * 60.0) + latency = base_latency * (1.0 + congestion_factor * sensitivity * 4.0) + + # Add uniform noise (5%) + bw_noise = random.uniform(-0.05, 0.05) * bw_utilization + lat_noise = random.uniform(-0.05, 0.05) * latency + + bw_utilization = max(0.0, min(100.0, bw_utilization + bw_noise)) + latency = max(0.1, latency + lat_noise) + + return (bw_utilization, latency) + + def generate_all_te_metrics(self) -> Dict[str, Tuple[float, float]]: + """ + Generate metrics for all TE links in the path. + + Returns: + Dict mapping link_id to (bandwidth%, latency_ms) + """ + return {link_id: self.generate_link_metrics(link_id) for link_id in LINK_PROFILES} + + def aggregate_abstract_metrics( + self, te_metrics: Dict[str, Tuple[float, float]] + ) -> Dict[str, Tuple[float, float]]: + """ + Aggregate TE metrics into abstract layer metrics. + BW: average, Latency: sum + + Returns: + Dict with 'Trans-L1', 'AggNet-L1', 'E2E-L1' metrics + """ + bw_L1, lat_L1 = te_metrics['L1'] + bw_L3, lat_L3 = te_metrics['L3'] + bw_L5, lat_L5 = te_metrics['L5'] + bw_L9, lat_L9 = te_metrics['L9'] + bw_L13, lat_L13 = te_metrics['L13'] + + # Trans-L1: L5 + L9 + bw_trans = (bw_L5 + bw_L9) / 2 + lat_trans = lat_L5 + lat_L9 + + # AggNet-L1: L3 + Trans-L1 + L13 + bw_aggnet = (bw_L3 + bw_trans + bw_L13) / 3 + lat_aggnet = lat_L3 + lat_trans + lat_L13 + + # E2E-L1: L1 + AggNet-L1 + bw_e2e = (bw_L1 + bw_aggnet) / 2 + lat_e2e = lat_L1 + lat_aggnet + + return { + 'Trans-L1' : (bw_trans, lat_trans), + 'AggNet-L1': (bw_aggnet, lat_aggnet), + 'E2E-L1' : (bw_e2e, lat_e2e), + } diff --git a/src/tests/tools/simap_server/simap_client/Tools.py b/src/tests/tools/simap_datastore/simap_client/Tools.py similarity index 100% rename from src/tests/tools/simap_server/simap_client/Tools.py rename to src/tests/tools/simap_datastore/simap_client/Tools.py diff --git a/src/tests/tools/simap_server/simap_client/__init__.py b/src/tests/tools/simap_datastore/simap_client/__init__.py similarity index 100% rename from src/tests/tools/simap_server/simap_client/__init__.py rename to src/tests/tools/simap_datastore/simap_client/__init__.py diff --git a/src/tests/tools/simap_datastore/simap_client/__main__.py b/src/tests/tools/simap_datastore/simap_client/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..bcedacabdc416129a0a3ea26b82c817aae92da71 --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_client/__main__.py @@ -0,0 +1,104 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import json, logging, random, time +from common.tools.rest_conf.client.RestConfClient import RestConfClient + +from .SimapClient import SimapClient +from .SimapMetricsGenerator import SimapMetricsGenerator +from .Tools import (create_simap_aggnet, create_simap_e2enet, + create_simap_te, create_simap_trans) + +logging.basicConfig(level=logging.INFO) +logging.getLogger('RestConfClient').setLevel(logging.WARN) +LOGGER = logging.getLogger(__name__) + + +def main() -> None: + restconf_client = RestConfClient( + '127.0.0.1', port=8080, + logger=logging.getLogger('RestConfClient') + ) + simap_client = SimapClient(restconf_client) + generator = SimapMetricsGenerator(service_count=3) + + # ---> Only need to be created once in the lifetime of the SIMAP datastore <--- # + create_simap_te(simap_client) + create_simap_trans(simap_client) + create_simap_aggnet(simap_client) + create_simap_e2enet(simap_client) + + print('networks=', json.dumps(simap_client.networks())) + + # TE links for path: ONT1 -> OLT -> PE1 -> P1 -> PE2 -> POP1 + te_network = simap_client.network('te') + te_links = { + 'L1' : te_network.link('L1'), # ONT1 -> OLT + 'L3' : te_network.link('L3'), # OLT -> PE1 + 'L5' : te_network.link('L5'), # PE1 -> P1 + 'L9' : te_network.link('L9'), # P1 -> PE2 + 'L13': te_network.link('L13'), # PE2 -> POP1 + } + + # Abstract layer links + abstract_links = { + 'Trans-L1' : simap_client.network('simap-trans').link('Trans-L1'), # L5 + L9 + 'AggNet-L1': simap_client.network('simap-aggnet').link('AggNet-L1'), # L3 + Trans-L1 + L13 + 'E2E-L1' : simap_client.network('simap-e2e').link('E2E-L1'), # L1 + AggNet-L1 + } + + # Initialize metrics generator with service count (0-5) + generator = SimapMetricsGenerator(service_count=4) + + for i in range(1000): + # Randomly change service count (1-5) every 5 iterations + if i % 5 == 0: + generator.set_service_count(random.randint(1, 5)) + + # Generate TE link metrics based on current service count + te_metrics = generator.generate_all_te_metrics() + + # Get domain-specific service IDs + te_service_ids = generator.get_service_ids('te') + trans_service_ids = generator.get_service_ids('trans') + agg_service_ids = generator.get_service_ids('agg') + e2e_service_ids = generator.get_service_ids('e2e') + + # Update TE link telemetry with TE domain service IDs + for link_id, (bw, lat) in te_metrics.items(): + te_links[link_id].telemetry.update(bw, lat, related_service_ids=te_service_ids) + + # Aggregate and update abstract layer telemetry with domain-specific service IDs + abstract_metrics = generator.aggregate_abstract_metrics(te_metrics) + domain_service_map = { + 'Trans-L1' : trans_service_ids, + 'AggNet-L1': agg_service_ids, + 'E2E-L1' : e2e_service_ids, + } + for link_id, (bw, lat) in abstract_metrics.items(): + abstract_links[link_id].telemetry.update(bw, lat, related_service_ids=domain_service_map[link_id]) + + # Print telemetry summary + print(f'--- Iteration {i} | Services: {generator.service_count} ---') + for link_id, (bw, lat) in te_metrics.items(): + print(f'TE {link_id:4s}: BW={bw:5.2f}%, Lat={lat:.3f}ms SvcIDs: {te_service_ids}') + for link_id, (bw, lat) in abstract_metrics.items(): + print(f'{link_id:10s}: BW={bw:5.2f}%, Lat={lat:.3f}ms SvcIDs: {domain_service_map[link_id]}') + + time.sleep(5) + + +if __name__ == '__main__': + main() diff --git a/src/tests/tools/simap_datastore/simap_datastore/Config.py b/src/tests/tools/simap_datastore/simap_datastore/Config.py new file mode 100644 index 0000000000000000000000000000000000000000..6625f79ff60b40122bb8e8126bf97239c61533e3 --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/Config.py @@ -0,0 +1,22 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os + +# InfluxDB Configuration +INFLUXDB_HOST = os.environ.get('INFLUXDB_HOST', 'localhost' ) +INFLUXDB_PORT = int(os.environ.get('INFLUXDB_PORT', '8181') ) +INFLUXDB_DATABASE = os.environ.get('INFLUXDB_DATABASE', 'simap_telemetry') +INFLUXDB_TOKEN = os.environ.get('INFLUXDB_TOKEN', 'apiv3_xSq6xD1wBvZ21Pc3uPnqqmgLeSn-kZbz1y4S4u8GtSbJebEly8wl9WUjGJ0Ja_bQuwKB_lpDcHrNJUBCDJMSCw') diff --git a/src/tests/tools/simap_datastore/simap_datastore/TelemetryCallbacks.py b/src/tests/tools/simap_datastore/simap_datastore/TelemetryCallbacks.py new file mode 100644 index 0000000000000000000000000000000000000000..ad646b9a63722f6c88e0edb2fa76b4ac80d15b4a --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/TelemetryCallbacks.py @@ -0,0 +1,270 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Telemetry callbacks for writing SIMAP telemetry to InfluxDB. +""" + +import json +import logging +import re +from typing import Any, Dict, Optional, Union + +from .....common.tools.rest_conf.server.restconf_server.Callbacks import _Callback +from .influxdb_client import SimapInfluxDBClient + + +LOGGER = logging.getLogger(__name__) + + +def _extract_telemetry_from_nested(data: Dict, link_id: str = None, node_id: str = None) -> Optional[Dict]: + """ + Extract simap-telemetry data from various nested JSON structures returned by YangHandler. + + The YangHandler.update() returns different structures depending on context: + - {"ietf-network:network": [{"network-id": "te", "ietf-network-topology:link": [{"link-id": "L1", "simap-telemetry:simap-telemetry": {...}}]}]} + - {"ietf-network-topology:link": [{"link-id": "L1", "simap-telemetry:simap-telemetry": {...}}]} + - {"simap-telemetry:simap-telemetry": {...}} + - {"ietf-network:node": [...]} for nodes + + Args: + data: The JSON dict to search + link_id: If searching for link telemetry, the link ID to match + node_id: If searching for node telemetry, the node ID to match + + Returns: + The telemetry dict or None if not found + """ + if not isinstance(data, dict): + return None + + # Direct telemetry at top level + if 'simap-telemetry:simap-telemetry' in data: + return data['simap-telemetry:simap-telemetry'] + + # Check for direct telemetry fields (already unwrapped) + if 'bandwidth-utilization' in data or 'latency' in data or 'cpu-utilization' in data: + return data + + # Search in ietf-network:network array + if 'ietf-network:network' in data: + for network in data['ietf-network:network']: + if isinstance(network, dict): + # Look for links + if link_id and 'ietf-network-topology:link' in network: + for link in network['ietf-network-topology:link']: + if isinstance(link, dict) and link.get('link-id') == link_id: + if 'simap-telemetry:simap-telemetry' in link: + return link['simap-telemetry:simap-telemetry'] + # Look for nodes + if node_id and 'node' in network: + for node in network['node']: + if isinstance(node, dict) and node.get('node-id') == node_id: + if 'simap-telemetry:simap-telemetry' in node: + return node['simap-telemetry:simap-telemetry'] + + # Search in ietf-network-topology:link array + if 'ietf-network-topology:link' in data: + for link in data['ietf-network-topology:link']: + if isinstance(link, dict): + if link_id is None or link.get('link-id') == link_id: + if 'simap-telemetry:simap-telemetry' in link: + return link['simap-telemetry:simap-telemetry'] + + # Search in ietf-network:node or node array + for node_key in ['ietf-network:node', 'node']: + if node_key in data: + for node in data[node_key]: + if isinstance(node, dict): + if node_id is None or node.get('node-id') == node_id: + if 'simap-telemetry:simap-telemetry' in node: + return node['simap-telemetry:simap-telemetry'] + + return None + + +class CallbackOnLinkTelemetry(_Callback): + """ + Callback triggered when link telemetry data is updated via RESTCONF. + Writes telemetry data to InfluxDB. + """ + + # Pattern matches: + # /restconf/data/ietf-network:networks/network=/ietf-network-topology:link=/simap-telemetry:simap-telemetry + PATTERN = ( + r'/restconf/data/ietf-network:networks' + r'/network=(?P[^/]+)' + r'/ietf-network-topology:link=(?P[^/]+)' + r'/simap-telemetry:simap-telemetry' + ) + + def __init__(self, influx_client: SimapInfluxDBClient) -> None: + """ + Initialize the callback with an InfluxDB client. + + Args: + influx_client: SimapInfluxDBClient instance for writing telemetry + """ + super().__init__(self.PATTERN) + self._influx_client = influx_client + + def execute( + self, + match: re.Match, + path: str, + old_data: Optional[Union[Dict, str]], + new_data: Optional[Union[Dict, str]] + ) -> bool: + """ + Execute the callback to write link telemetry to InfluxDB. + + Args: + match: Regex match object containing network_id and link_id + path: Original RESTCONF path + old_data: Previous telemetry data (unused) + new_data: New telemetry data to write (can be dict or JSON string) + + Returns: + True to continue executing other callbacks + """ + if new_data is None: + LOGGER.debug("Link telemetry deletion, skipping InfluxDB write") + return True + + # Handle case where new_data is a JSON string instead of a dict + if isinstance(new_data, str): + try: + new_data = json.loads(new_data) + except json.JSONDecodeError: + LOGGER.error("Failed to parse new_data as JSON: %s", new_data) + return True + + network_id = match.group('network_id') + link_id = match.group('link_id') + + # Extract telemetry fields from nested JSON structure + # YangHandler returns various nested formats depending on context + telemetry = _extract_telemetry_from_nested(new_data, link_id=link_id) + if telemetry is None: + LOGGER.warning( + "Could not extract telemetry from data for link=%s: %s", + link_id, new_data + ) + return True + + bandwidth_utilization = telemetry.get('bandwidth-utilization', 0) + latency = telemetry.get('latency', 0) + related_service_ids = telemetry.get('related-service-ids', None) + + LOGGER.info( + "Link telemetry callback: network=%s, link=%s, bw=%s, lat=%s", + network_id, link_id, bandwidth_utilization, latency + ) + + self._influx_client.write_link_telemetry( + network_id=network_id, + link_id=link_id, + bandwidth_utilization=float(bandwidth_utilization), + latency=float(latency), + related_service_ids=related_service_ids + ) + + return True # Continue to other callbacks + + +class CallbackOnNodeTelemetry(_Callback): + """ + Callback triggered when node telemetry data is updated via RESTCONF. + Writes telemetry data to InfluxDB. + """ + + # Pattern matches: + # /restconf/data/ietf-network:networks/network=/node=/simap-telemetry:simap-telemetry + PATTERN = ( + r'/restconf/data/ietf-network:networks' + r'/network=(?P[^/]+)' + r'/node=(?P[^/]+)' + r'/simap-telemetry:simap-telemetry' + ) + + def __init__(self, influx_client: SimapInfluxDBClient) -> None: + """ + Initialize the callback with an InfluxDB client. + + Args: + influx_client: SimapInfluxDBClient instance for writing telemetry + """ + super().__init__(self.PATTERN) + self._influx_client = influx_client + + def execute( + self, + match: re.Match, + path: str, + old_data: Optional[Union[Dict, str]], + new_data: Optional[Union[Dict, str]] + ) -> bool: + """ + Execute the callback to write node telemetry to InfluxDB. + + Args: + match: Regex match object containing network_id and node_id + path: Original RESTCONF path + old_data: Previous telemetry data (unused) + new_data: New telemetry data to write (can be dict or JSON string) + + Returns: + True to continue executing other callbacks + """ + if new_data is None: + LOGGER.debug("Node telemetry deletion, skipping InfluxDB write") + return True + + # Handle case where new_data is a JSON string instead of a dict + if isinstance(new_data, str): + try: + new_data = json.loads(new_data) + except json.JSONDecodeError: + LOGGER.error("Failed to parse new_data as JSON: %s", new_data) + return True + + network_id = match.group('network_id') + node_id = match.group('node_id') + + # Extract telemetry fields from nested JSON structure + # YangHandler returns various nested formats depending on context + telemetry = _extract_telemetry_from_nested(new_data, node_id=node_id) + if telemetry is None: + LOGGER.warning( + "Could not extract telemetry from data for node=%s: %s", + node_id, new_data + ) + return True + + cpu_utilization = telemetry.get('cpu-utilization', 0) + related_service_ids = telemetry.get('related-service-ids', None) + + LOGGER.info( + "Node telemetry callback: network=%s, node=%s, cpu=%s", + network_id, node_id, cpu_utilization + ) + + self._influx_client.write_node_telemetry( + network_id=network_id, + node_id=node_id, + cpu_utilization=float(cpu_utilization), + related_service_ids=related_service_ids + ) + + return True # Continue to other callbacks diff --git a/src/tests/tools/simap_datastore/simap_datastore/__init__.py b/src/tests/tools/simap_datastore/simap_datastore/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..3ccc21c7db78aac26daa1f8c5ff8e1ffd3f35460 --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/tests/tools/simap_datastore/simap_datastore/__main__.py b/src/tests/tools/simap_datastore/simap_datastore/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..2c84d92efd7e33d44237e3a8791771a371e12f3f --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/__main__.py @@ -0,0 +1,26 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .app import app + +BIND_ADDRESS = '0.0.0.0' +BIND_PORT = 8080 + +if __name__ == '__main__': + # Only used to run it locally during development stage; + # otherwise, app is directly launched by gunicorn. + app.run( + host=BIND_ADDRESS, port=BIND_PORT, debug=True, use_reloader=False + ) diff --git a/src/tests/tools/simap_datastore/simap_datastore/app.py b/src/tests/tools/simap_datastore/simap_datastore/app.py new file mode 100644 index 0000000000000000000000000000000000000000..2d62f2965705da3b9ff791bf3cb97abe3d79206d --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/app.py @@ -0,0 +1,64 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# This file overwrites default RestConf Server `app.py` file. + + +import logging +from common.tools.rest_conf.server.restconf_server.RestConfServerApplication import RestConfServerApplication +from .TelemetryCallbacks import CallbackOnLinkTelemetry, CallbackOnNodeTelemetry +from .Config import INFLUXDB_HOST, INFLUXDB_PORT, INFLUXDB_TOKEN, INFLUXDB_DATABASE +from .influxdb_client import SimapInfluxDBClient + + +logging.basicConfig( + level = logging.INFO, + format = '[Worker-%(process)d][%(asctime)s] %(levelname)s:%(name)s:%(message)s', +) +LOGGER = logging.getLogger(__name__) + +LOGGER.info('Starting...') +rcs_app = RestConfServerApplication() +rcs_app.register_host_meta() +rcs_app.register_restconf() +LOGGER.info('All connectors registered') + +# Initialize InfluxDB client and register telemetry callbacks +try: + LOGGER.info('Initializing InfluxDB client (host=%s, port=%d, db=%s)...', INFLUXDB_HOST, INFLUXDB_PORT, INFLUXDB_DATABASE) + influx_client = SimapInfluxDBClient( + host = INFLUXDB_HOST, + port = INFLUXDB_PORT, + token = INFLUXDB_TOKEN, + database = INFLUXDB_DATABASE + ) +except Exception as e: + LOGGER.error('Failed to initialize InfluxDB client: %s', e) + influx_client = None + +if influx_client is not None and influx_client.is_connected(): + try: + rcs_app.callback_dispatcher.register(CallbackOnLinkTelemetry(influx_client)) + rcs_app.callback_dispatcher.register(CallbackOnNodeTelemetry(influx_client)) + LOGGER.info('Telemetry callbacks registered') + except Exception as e: + LOGGER.error('Failed to register telemetry callbacks: %s', e) +else: + LOGGER.warning('InfluxDB client not connected, telemetry callbacks disabled.') + +rcs_app.dump_configuration() +app = rcs_app.get_flask_app() + +LOGGER.info('Initialization completed!') diff --git a/src/tests/tools/simap_datastore/simap_datastore/influxdb_client.py b/src/tests/tools/simap_datastore/simap_datastore/influxdb_client.py new file mode 100644 index 0000000000000000000000000000000000000000..70dc566864c99ff84b3a27646268251ba1307b79 --- /dev/null +++ b/src/tests/tools/simap_datastore/simap_datastore/influxdb_client.py @@ -0,0 +1,181 @@ +# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +InfluxDB client wrapper for SIMAP telemetry data storage. +""" + +import json +import logging +from typing import List, Optional + +from influxdb_client_3 import InfluxDBClient3, Point, WritePrecision + + +LOGGER = logging.getLogger(__name__) + + +class SimapInfluxDBClient: + """ + Client wrapper for writing SIMAP telemetry data to InfluxDB 3.x. + """ + + def __init__( + self, + host: str, + port: int, + token: str, + database: str + ) -> None: + """ + Initialize the InfluxDB client. + + Args: + host: InfluxDB server hostname + port: InfluxDB server port + token: Authentication token + database: Database/bucket name + """ + self._host = host + self._port = port + self._database = database + self._client: Optional[InfluxDBClient3] = None + + try: + self._client = InfluxDBClient3( + token = token, + host = f"http://{host}:{port}", + database = database + ) + LOGGER.info("InfluxDB client initialized: host=%s:%d, database=%s", + self._host, self._port, self._database) + except Exception as e: # pylint: disable=broad-except + LOGGER.error("Failed to initialize InfluxDB client: %s", str(e)) + self._client = None + + def is_connected(self) -> bool: + """Check if client is initialized.""" + return self._client is not None + + def write_link_telemetry( + self, + network_id: str, + link_id: str, + bandwidth_utilization: float, + latency: float, + related_service_ids: Optional[List[str]] = None + ) -> bool: + """ + Write link telemetry data to InfluxDB. + + Args: + network_id: Network identifier (e.g., 'te', 'simap-trans') + link_id: Link identifier (e.g., 'L1', 'Trans-L1') + bandwidth_utilization: Bandwidth utilization percentage (0-100) + latency: Latency in milliseconds + related_service_ids: Optional list of related service IDs + + Returns: + True if write succeeded, False otherwise + """ + if self._client is None: + LOGGER.warning("InfluxDB client not initialized, skipping write") + return False + + try: + point = ( + Point("link_telemetry") + .tag("network_id", network_id) + .tag("link_id", link_id) + .field("bandwidth_utilization", float(bandwidth_utilization)) + .field("latency", float(latency)) + ) + + if related_service_ids: + point = point.field("related_service_ids", json.dumps(related_service_ids)) + + self._client.write(record=point, write_precision=WritePrecision.S) + + LOGGER.debug( + "Wrote link telemetry: network=%s, link=%s, bw=%.2f, lat=%.3f", + network_id, link_id, bandwidth_utilization, latency + ) + return True + + except Exception as e: # pylint: disable=broad-except + LOGGER.error( + "Failed to write link telemetry (network=%s, link=%s): %s", + network_id, link_id, str(e) + ) + return False + + def write_node_telemetry( + self, + network_id: str, + node_id: str, + cpu_utilization: float, + related_service_ids: Optional[List[str]] = None + ) -> bool: + """ + Write node telemetry data to InfluxDB. + + Args: + network_id: Network identifier + node_id: Node identifier (e.g., 'PE1', 'ONT1') + cpu_utilization: CPU utilization percentage (0-100) + related_service_ids: Optional list of related service IDs + + Returns: + True if write succeeded, False otherwise + """ + if self._client is None: + LOGGER.warning("InfluxDB client not initialized, skipping write") + return False + + try: + point = ( + Point("node_telemetry") + .tag("network_id", network_id) + .tag("node_id", node_id) + .field("cpu_utilization", float(cpu_utilization)) + ) + + if related_service_ids: + point = point.field("related_service_ids", json.dumps(related_service_ids)) + + self._client.write(record=point, write_precision=WritePrecision.S) + + LOGGER.debug( + "Wrote node telemetry: network=%s, node=%s, cpu=%.2f", + network_id, node_id, cpu_utilization + ) + return True + + except Exception as e: # pylint: disable=broad-except + LOGGER.error( + "Failed to write node telemetry (network=%s, node=%s): %s", + network_id, node_id, str(e) + ) + return False + + def close(self) -> None: + """Close the InfluxDB client connection.""" + if self._client is not None: + try: + self._client.close() + LOGGER.info("InfluxDB client closed") + except Exception as e: # pylint: disable=broad-except + LOGGER.error("Error closing InfluxDB client: %s", str(e)) + finally: + self._client = None diff --git a/src/tests/tools/simap_server/startup.json b/src/tests/tools/simap_datastore/startup.json similarity index 100% rename from src/tests/tools/simap_server/startup.json rename to src/tests/tools/simap_datastore/startup.json diff --git a/src/tests/tools/simap_server/yang/ietf-inet-types.yang b/src/tests/tools/simap_datastore/yang/ietf-inet-types.yang similarity index 100% rename from src/tests/tools/simap_server/yang/ietf-inet-types.yang rename to src/tests/tools/simap_datastore/yang/ietf-inet-types.yang diff --git a/src/tests/tools/simap_server/yang/ietf-network-topology.yang b/src/tests/tools/simap_datastore/yang/ietf-network-topology.yang similarity index 100% rename from src/tests/tools/simap_server/yang/ietf-network-topology.yang rename to src/tests/tools/simap_datastore/yang/ietf-network-topology.yang diff --git a/src/tests/tools/simap_server/yang/ietf-network.yang b/src/tests/tools/simap_datastore/yang/ietf-network.yang similarity index 100% rename from src/tests/tools/simap_server/yang/ietf-network.yang rename to src/tests/tools/simap_datastore/yang/ietf-network.yang diff --git a/src/tests/tools/simap_server/yang/simap-telemetry.yang b/src/tests/tools/simap_datastore/yang/simap-telemetry.yang similarity index 100% rename from src/tests/tools/simap_server/yang/simap-telemetry.yang rename to src/tests/tools/simap_datastore/yang/simap-telemetry.yang diff --git a/src/tests/tools/simap_server/yang/simap.txt b/src/tests/tools/simap_datastore/yang/simap.txt similarity index 100% rename from src/tests/tools/simap_server/yang/simap.txt rename to src/tests/tools/simap_datastore/yang/simap.txt diff --git a/src/tests/tools/simap_server/README.md b/src/tests/tools/simap_server/README.md deleted file mode 100644 index bdea3b5bf1be4b0c9406be0c9d235daf1ea9533c..0000000000000000000000000000000000000000 --- a/src/tests/tools/simap_server/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# RESTCONF/SIMAP Server - -This server implements a basic RESTCONF Server that can load, potentially, any YANG data model. -In this case, it is prepared to load a SIMAP Server based on IETF Network Topology + custom SIMAP Telemetry extensions. - - -## Build the RESTCONF/SIMAP Server Docker image -```bash -./build.sh -``` - -## Deploy the RESTCONF/SIMAP Server -```bash -./deploy.sh -``` - -## Run the RESTCONF/SIMAP Client for testing: -```bash -./run_client.sh -``` - -## Destroy the RESTCONF/SIMAP Server -```bash -./destroy.sh -``` diff --git a/src/tests/tools/simap_server/simap_client/__main__.py b/src/tests/tools/simap_server/simap_client/__main__.py deleted file mode 100644 index 67803b0922f77d1f02a3cf4b5170ca428767d4fc..0000000000000000000000000000000000000000 --- a/src/tests/tools/simap_server/simap_client/__main__.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import json, logging, time -from common.tools.rest_conf.client.RestConfClient import RestConfClient -from .SimapClient import SimapClient -from .Tools import create_simap_aggnet, create_simap_e2enet, create_simap_te, create_simap_trans - - -logging.basicConfig(level=logging.INFO) -logging.getLogger('RestConfClient').setLevel(logging.WARN) -LOGGER = logging.getLogger(__name__) - - -def main() -> None: - restconf_client = RestConfClient( - '127.0.0.1', port=8080, - logger=logging.getLogger('RestConfClient') - ) - simap_client = SimapClient(restconf_client) - - create_simap_te(simap_client) - create_simap_trans(simap_client) - create_simap_aggnet(simap_client) - create_simap_e2enet(simap_client) - - print('networks=', json.dumps(simap_client.networks())) - - trans_link = simap_client.network('simap-trans').link('Trans-L1') - trans_node_site1 = simap_client.network('simap-trans').node('site1') - trans_node_site2 = simap_client.network('simap-trans').node('site2') - - related_service_ids = ['trans-svc1', 'trans-svc2', 'trans-svc3'] - - for i in range(1000): - trans_link.telemetry.update(float(i), float(i), related_service_ids=related_service_ids) - trans_node_site1.telemetry.update(float(i), related_service_ids=related_service_ids) - trans_node_site2.telemetry.update(float(i), related_service_ids=related_service_ids) - - print('trans link telemetry =', json.dumps(trans_link.telemetry.get())) - print('trans site1 telemetry =', json.dumps(trans_node_site1.telemetry.get())) - print('trans site2 telemetry =', json.dumps(trans_node_site2.telemetry.get())) - - time.sleep(10) - - -if __name__ == '__main__': - main()