Commit 7bfdac4f authored by Samuel Santos's avatar Samuel Santos
Browse files

NSC Monitor system v1.0.0

parent 4f098878
Loading
Loading
Loading
Loading
+107 −62
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ import time
from typing import Dict, List, Tuple
import uuid

import requests

from src.utils.send_response import send_response
import logging
from flask import current_app
@@ -33,7 +35,7 @@ import asyncio
import aiohttp

class Api:
    NSC_BASE_URL = "http://192.168.27.165"
    NSC_BASE_URL = "http://192.168.27.189"
    SDN_TOPOLOGY_URL = f"{NSC_BASE_URL}/restconf/data/ietf-network:networks"
    SDN_SUBSCRIBE_URL = f"{NSC_BASE_URL}/restconf/operations/subscriptions:establish-subscription"
    SDN_SUBSCRIPTION_PERIOD = 10  # seconds
@@ -45,7 +47,16 @@ class Api:
        if "telemetryClients" not in current_app.config: current_app.config["telemetryClients"] = {}
        if "slices" not in current_app.config: current_app.config["slices"] = {}
        self.telemetry_cache = {}
        self.session = None #For connections with SDN
                
    async def get_session(self):
        # Crea la sesión si no existe
        if self.session is None or self.session.closed: self.session = aiohttp.ClientSession()
        return self.session

    async def close_session(self):
        # Llama a esto al apagar tu app
        if self.session: await self.session.close()

    def add_flow(self, intent):
        """
@@ -812,33 +823,51 @@ class Api:
    # --- SDN STREAMS ---

    async def listenStream(self, session, link_id, slice_id, stream_url):
        async with session.get(stream_url) as resp:
        try:
            async with session.get(
                stream_url,
                auth=aiohttp.BasicAuth("admin", "admin"), 
                timeout=None,
                headers = {
                    "Accept": "text/event-stream",
                    "Cache-Control": "no-cache"
                }
            ) as resp:
                buffer = ""

                logging.info(f"Started listening to stream for link {link_id} in slice {slice_id}")
            async for chunk in resp.content:

                async for chunk in resp.content.iter_any():
                    buffer += chunk.decode()
                    
                    # SSE separa los eventos con dos saltos de línea (\n\n)
                    while "\n\n" in buffer:
                        # Extraemos el evento completo y dejamos el resto en el buffer
                        event_block, buffer = buffer.split("\n\n", 1)
                        
                        for line in event_block.split("\n"):
                            if line.startswith("data:"):
                                json_str = line.replace("data:", "").strip()
                                
                                try:
                    # intenta parsear un JSON completo
                    data, index = json.JSONDecoder().raw_decode(buffer)
                    logging.info(f"Received telemetry for link {link_id} in slice {slice_id}: {data}")
                    buffer = buffer[index:].lstrip()
                                    data = json.loads(json_str)
                                    logging.info(f"JSON detected: {data}")

                                    telemetry = data["notification"]["push-update"]["datastore-contents"]["simap-telemetry:simap-telemetry"]

                                    self.telemetry_cache.setdefault(slice_id, {})[link_id] = {
                        "timestamp": data["notification"]["eventTime"], #time.time(),
                        "bandwidth": float(telemetry.get("bandwidth")),
                        "latency": float(telemetry.get("latency")),
                        "services": telemetry.get("services", [])
                                        "timestamp": data["notification"]["eventTime"],
                                        "bandwidth": float(telemetry.get("bandwidth-utilization", 0)),
                                        "latency": float(telemetry.get("latency", 0)),
                                        "services": telemetry.get("related-service-ids", [])
                                    }
                    buffer = ""
                except json.JSONDecodeError: pass # aún no hay suficiente data
                                except json.JSONDecodeError: logging.error("Error parsing the JSON from field 'data'")
                                except Exception as e: logging.error(f"Error processing logic: {e}")
        except aiohttp.ClientError as e: logging.error(f"Connection error with stream for link {link_id}: {e}")
        logging.info(f"Stopped listening to stream for link {link_id} in slice {slice_id}")

    async def startStreams(self, links, slice_id):
        async with aiohttp.ClientSession(timeout = aiohttp.ClientTimeout(total=10)) as session:
        #async with self.get_session() as session: #timeout = aiohttp.ClientTimeout(total=10)
        session = await self.get_session()
        tasks = []

        for link in links:
@@ -869,7 +898,7 @@ class Api:
                        self.listenStream(session, link, slice_id, Api.NSC_BASE_URL + sub_data['uri'])
                    )
                )
            await asyncio.gather(*tasks)
        #await asyncio.gather(*tasks)

    # --- CLIENTS ---

@@ -901,7 +930,10 @@ class Api:

    async def getNetworkTopology(self, sliceId) -> Tuple[Dict[str, any], int]:
        # MOCKED TOPOLOGY
        #topology = requests.get(Api.SDN_TOPOLOGY_URL)
        topology = requests.get(Api.SDN_TOPOLOGY_URL, auth=("admin", "admin"))
        networkRaw = topology.json()

        """
        topology = [
            {
                "ietf-network:networks": {
@@ -1060,9 +1092,11 @@ class Api:
                }
            }
        ] if True else (await response.json())
        #"""

        network = next((n["ietf-network:networks"]["network"][0] for n in networkRaw if n["ietf-network:networks"]["network"][0]["network-id"] == sliceId), None)
        logging.info(f"Retrieved topology for slice '{sliceId}': {network}")

        network = topology[0]["ietf-network:networks"]["network"]
        network = network[0] #next((n for n in network if n["network-id"] == sliceId), None)
        return network, 200

    async def getTelemetry(self, sliceId, subscribe: bool = False) -> Dict[str, any]:
@@ -1082,7 +1116,6 @@ class Api:
                if code != 200: return shortestPath
                config["shortestPath"] = shortestPath
            else: shortestPath = config["shortestPath"]
            logging.info(f"[getTelemetry] Shortest path for slice '{sliceId}': {shortestPath}")

            # Make sure data is already initialized in cache
            # (this will only happen if an already-opened stream with SDN controller has sent data for this slice,
@@ -1099,14 +1132,26 @@ class Api:
                return {
                    "sliceId": sliceId,
                    "telemetry": {
                        "desc": f"Aggregated telemetry of a Network slice {sliceId}. Other additional fields should be included in the real version.",
                        "latency": f"{sum([link.latency for key, link in self.telemetry_cache[sliceId].items()])}ms",
                        "bandwidth": f"{sum([link.bandwidth for key, link in self.telemetry_cache[sliceId].items()])}Mb/s"
                        "desc": f"Aggregated telemetry of network slice {sliceId}. Other additional fields should be included in the real version.",
                        "latency": f"{sum([link["latency"] for _, link in self.telemetry_cache[sliceId].items()])}ms",
                        "bandwidth": f"{sum([link["bandwidth"] for _, link in self.telemetry_cache[sliceId].items()])}Mb/s"
                    },
                    "timestamp": time.time()
                }
            else:
                if subscribe: # Open a stream with SDN controller for this slice:
                    shortestPathLinks = []
                    if sliceId not in self.telemetry_cache: self.telemetry_cache[sliceId] = {}
                    for i in range(len(shortestPath)-1):
                        linkId = f"{shortestPath[i]}-{shortestPath[i+1]}"
                        shortestPathLinks.append(linkId)
                        self.telemetry_cache[sliceId][linkId] = {
                            "timestamp": None, #time.time(),
                            "bandwidth": 0,
                            "latency": 0,
                            "services": []
                        }

                    # Get telemetry of every node in the slice:
                    """
                    self.startStreams(self, [
@@ -1116,7 +1161,7 @@ class Api:
                    """

                    # Get telemetry only of the links in the shortest path:
                    await self.startStreams([ f"{shortestPath[i]}-{shortestPath[i+1]}" for i in range(len(shortestPath)-1) ], sliceId)
                    await self.startStreams(shortestPathLinks, sliceId)
                return { "message": f"No telemetry available for slice '{sliceId}'" }
        else: return { "message": f"There is no slice with id '{sliceId}' registered" }