Loading .DS_Store 0 → 100644 +6 KiB File added.No diff preview for this file type. View file src/.DS_Store 0 → 100644 +6 KiB File added.No diff preview for this file type. View file src/api/main.py +30 −9 Original line number Diff line number Diff line Loading @@ -334,10 +334,14 @@ class Api: result = self.slice_service.nsc(intent) if result: create_data_store(intent) slos = safe_get(intent, ['ietf-network-slice-service:network-slice-services', 'slo-sle-templates', 'slo-sle-template'], []) slosDict = { slo["id"]: { "slo-policy": slo["slo-policy"], "sle-policy": slo["sle-policy"] } for slo in slos } for slice in result["slices"]: current_app.config["slices"][slice["id"]] = { "src": slice["source"], "dst": slice["destination"] "dst": slice["destination"], "slo-sle-policy": slosDict.get(slice.get("slo-sle-template", ""), {}), } logging.info(f"Network Slice created successfully") return send_response( Loading Loading @@ -854,10 +858,12 @@ class Api: logging.info(f"JSON detected: {data}") telemetry = data["notification"]["push-update"]["datastore-contents"]["simap-telemetry:simap-telemetry"] bw = telemetry.get("bandwidth-utilization", "N/A") latency = telemetry.get("latency", "N/A") self.telemetry_cache.setdefault(slice_id, {})[link_id] = { "timestamp": data["notification"]["eventTime"], "bandwidth": float(telemetry.get("bandwidth-utilization", 0)), "latency": float(telemetry.get("latency", 0)), "bandwidth": float(bw) if bw != "N/A" else bw, "latency": float(latency) if latency != "N/A" else latency, "services": telemetry.get("related-service-ids", []) } except json.JSONDecodeError: logging.error("Error parsing the JSON from field 'data'") Loading Loading @@ -1094,7 +1100,7 @@ class Api: ] if True else (await response.json()) #""" network = next((n["ietf-network:networks"]["network"][0] for n in networkRaw if n["ietf-network:networks"]["network"][0]["network-id"] == sliceId), None) network = next((n["ietf-network:networks"]["network"][0] for n in networkRaw if n["ietf-network:networks"]["network"][0]["network-id"] == "admin"), None) logging.info(f"Retrieved topology for slice '{sliceId}': {network}") return network, 200 Loading Loading @@ -1129,13 +1135,24 @@ class Api: "services": List[any] } """ slo = current_app.config["slices"][sliceId]["slo-sle-policy"].get("slo-policy", {}) sloBandwith = [ s["bound"] for s in slo.get("metric-bound", []) if s.get("metric-type", "") == "one-way-bandwidth" ][0] if slo else 0 sloLatency = [ s["bound"] for s in slo.get("metric-bound", []) if s.get("metric-type", "") == "one-way-delay-maximum" ][0] if slo else "N/A" bw = min([link["bandwidth"] for _, link in self.telemetry_cache[sliceId].items()]) lt = sum([link["latency"] for _, link in self.telemetry_cache[sliceId].items()]) warningJson = { "warning": {} } if sloLatency != "N/A" and lt > float(sloLatency): warningJson["warning"]["latency"] = f"High latency: {lt}ms exceeds SLO of {sloLatency}ms" if sloBandwith != 0 and bw > float(sloBandwith): warningJson["warning"]["bandwidth"] = f"High bandwidth utilization: {bw}Kb/s exceeds SLO of {sloBandwith}Kb/s" return { "sliceId": sliceId, "telemetry": { "desc": f"Aggregated telemetry of network slice {sliceId}. Other additional fields should be included in the real version.", "latency": f"{sum([link["latency"] for _, link in self.telemetry_cache[sliceId].items()])}ms", "bandwidth": f"{sum([link["bandwidth"] for _, link in self.telemetry_cache[sliceId].items()])}Mb/s" }, "desc": f"Aggregated telemetry of network slice {sliceId}. Other additional fields could be included in the future.", "latency": f"{lt}ms", "bandwidth": f"{bw}Mb/s" } + warningJson, "timestamp": time.time() } else: Loading Loading @@ -1278,8 +1295,12 @@ class Api: while okToProceed: try: data, code = await self.getTelemetrySubscriptions(clientId, sliceId, True) if code == 200: yield f"data: {json.dumps(data)}\n\n" if "warning" in data["sliceSubscription"]["telemetry"]: logging.warning(f"Warning for client '{clientId}' in slice '{sliceId}': {data['sliceSubscription']['telemetry']['warning']}") yield f"event: warning\ndata: {json.dumps(data)}\n\n" else: yield f"data: {json.dumps(data)}\n\n" await asyncio.sleep(frequency) else: yield f"event: error\ndata: {json.dumps(data)}\n\n" Loading Loading
src/api/main.py +30 −9 Original line number Diff line number Diff line Loading @@ -334,10 +334,14 @@ class Api: result = self.slice_service.nsc(intent) if result: create_data_store(intent) slos = safe_get(intent, ['ietf-network-slice-service:network-slice-services', 'slo-sle-templates', 'slo-sle-template'], []) slosDict = { slo["id"]: { "slo-policy": slo["slo-policy"], "sle-policy": slo["sle-policy"] } for slo in slos } for slice in result["slices"]: current_app.config["slices"][slice["id"]] = { "src": slice["source"], "dst": slice["destination"] "dst": slice["destination"], "slo-sle-policy": slosDict.get(slice.get("slo-sle-template", ""), {}), } logging.info(f"Network Slice created successfully") return send_response( Loading Loading @@ -854,10 +858,12 @@ class Api: logging.info(f"JSON detected: {data}") telemetry = data["notification"]["push-update"]["datastore-contents"]["simap-telemetry:simap-telemetry"] bw = telemetry.get("bandwidth-utilization", "N/A") latency = telemetry.get("latency", "N/A") self.telemetry_cache.setdefault(slice_id, {})[link_id] = { "timestamp": data["notification"]["eventTime"], "bandwidth": float(telemetry.get("bandwidth-utilization", 0)), "latency": float(telemetry.get("latency", 0)), "bandwidth": float(bw) if bw != "N/A" else bw, "latency": float(latency) if latency != "N/A" else latency, "services": telemetry.get("related-service-ids", []) } except json.JSONDecodeError: logging.error("Error parsing the JSON from field 'data'") Loading Loading @@ -1094,7 +1100,7 @@ class Api: ] if True else (await response.json()) #""" network = next((n["ietf-network:networks"]["network"][0] for n in networkRaw if n["ietf-network:networks"]["network"][0]["network-id"] == sliceId), None) network = next((n["ietf-network:networks"]["network"][0] for n in networkRaw if n["ietf-network:networks"]["network"][0]["network-id"] == "admin"), None) logging.info(f"Retrieved topology for slice '{sliceId}': {network}") return network, 200 Loading Loading @@ -1129,13 +1135,24 @@ class Api: "services": List[any] } """ slo = current_app.config["slices"][sliceId]["slo-sle-policy"].get("slo-policy", {}) sloBandwith = [ s["bound"] for s in slo.get("metric-bound", []) if s.get("metric-type", "") == "one-way-bandwidth" ][0] if slo else 0 sloLatency = [ s["bound"] for s in slo.get("metric-bound", []) if s.get("metric-type", "") == "one-way-delay-maximum" ][0] if slo else "N/A" bw = min([link["bandwidth"] for _, link in self.telemetry_cache[sliceId].items()]) lt = sum([link["latency"] for _, link in self.telemetry_cache[sliceId].items()]) warningJson = { "warning": {} } if sloLatency != "N/A" and lt > float(sloLatency): warningJson["warning"]["latency"] = f"High latency: {lt}ms exceeds SLO of {sloLatency}ms" if sloBandwith != 0 and bw > float(sloBandwith): warningJson["warning"]["bandwidth"] = f"High bandwidth utilization: {bw}Kb/s exceeds SLO of {sloBandwith}Kb/s" return { "sliceId": sliceId, "telemetry": { "desc": f"Aggregated telemetry of network slice {sliceId}. Other additional fields should be included in the real version.", "latency": f"{sum([link["latency"] for _, link in self.telemetry_cache[sliceId].items()])}ms", "bandwidth": f"{sum([link["bandwidth"] for _, link in self.telemetry_cache[sliceId].items()])}Mb/s" }, "desc": f"Aggregated telemetry of network slice {sliceId}. Other additional fields could be included in the future.", "latency": f"{lt}ms", "bandwidth": f"{bw}Mb/s" } + warningJson, "timestamp": time.time() } else: Loading Loading @@ -1278,8 +1295,12 @@ class Api: while okToProceed: try: data, code = await self.getTelemetrySubscriptions(clientId, sliceId, True) if code == 200: yield f"data: {json.dumps(data)}\n\n" if "warning" in data["sliceSubscription"]["telemetry"]: logging.warning(f"Warning for client '{clientId}' in slice '{sliceId}': {data['sliceSubscription']['telemetry']['warning']}") yield f"event: warning\ndata: {json.dumps(data)}\n\n" else: yield f"data: {json.dumps(data)}\n\n" await asyncio.sleep(frequency) else: yield f"event: error\ndata: {json.dumps(data)}\n\n" Loading