Commit f66fe9e8 authored by kesnar's avatar kesnar
Browse files

feat: add python scripts for kpis

parent 9b75d8d8
Loading
Loading
Loading
Loading
+162 −0
Original line number Diff line number Diff line
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#import copy, logging, pytest
#from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events
#from common.tools.object_factory.Context import json_context_id
#from common.tools.object_factory.Device import json_device_id
#from common.tools.object_factory.Service import json_service_id
#from common.tools.object_factory.Link import json_link_id
#from common.tools.object_factory.Topology import json_topology_id
#from context.client.EventsCollector import EventsCollector
#from common.proto.context_pb2 import Context, ContextId, Device, Empty, Link, Topology, Service, ServiceId
#from monitoring.client.MonitoringClient import MonitoringClient
#from common.proto.context_pb2 import ConfigActionEnum, Device, DeviceId, DeviceOperationalStatusEnum

import os, threading, time, socket
from common.Settings import get_setting
from common.proto.context_pb2 import Empty, Timestamp
from common.proto.monitoring_pb2 import KpiDescriptor, Kpi, KpiId, KpiValue
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from monitoring.client.MonitoringClient import MonitoringClient
from context.client.ContextClient import ContextClient

# ----- If you want to use .env file
#from dotenv import load_dotenv
#load_dotenv()
#def get_setting(key):
#    return os.getenv(key)


#### gRPC Clients
monitoring_client = MonitoringClient(get_setting('MONITORINGSERVICE_SERVICE_HOST'), get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC'))
context_client = ContextClient(get_setting('CONTEXTSERVICE_SERVICE_HOST'), get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC'))

### Locks and common variables
ping_trigger = threading.Lock()
kpi_id_trigger = threading.Lock()
kpi_id = KpiId()
should_ping = True

### Define the path to the Unix socket
socket_path = "./tmp/sock"
if os.path.exists(socket_path):
    os.remove(socket_path)

def thread_context_func():
    while True:
        # Listen to ContextService/GetServiceEvents stream 
        events = context_client.GetServiceEvents(Empty())
        for event in events:
            event_service = event.service_id
            event_service_uuid = event_service.service_uuid.uuid
            event_type = event.event.event_type
            if event_type == 1:
                print(f"stream: New CREATE event:\n{event_service}")
                kpi_descriptor = KpiDescriptor(
                        kpi_id = None,
                        kpi_id_list = [],
                        device_id = None,
                        endpoint_id = None,
                        kpi_description = f"Latency value for service {event_service_uuid}",
                        service_id = event_service,
                        kpi_sample_type = KpiSampleType.KPISAMPLETYPE_UNKNOWN
                        )
                response = monitoring_client.SetKpi(kpi_descriptor)
                print(response)
                with kpi_id_trigger:
                    global kpi_id
                    kpi_id = response
                    print(kpi_id)
                with ping_trigger:
                    should_ping = True
            elif event_type == 3:
                print(f"stream: New REMOVE event:\n{event_service}")
                with ping_trigger:
                    should_ping = False

def thread_kpi_func():
    try:
        # Create socket object
        server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

        # Bind the socket to the socket path
        server_socket.bind(socket_path)

        # Listen for incoming connections
        server_socket.listen(1)
        
        while True:
            print("Awaiting for new connection!")

            # Accept incoming connection
            connection, client_address = server_socket.accept()

            # Read data from the connection
            data = connection.recv(1024)

            if data:
                with ping_trigger:
                    if should_ping: 
                        data = data.decode()
                        print(f"Received: {data}")
                        with kpi_id_trigger:
                            global kpi_id
                            
                            now = time.time()

                            new_timestamp = Timestamp()
                            new_timestamp.timestamp = now

                            new_value = KpiValue()
                            new_value.floatVal = float(data)

                            kpi = Kpi (
                                    kpi_id = kpi_id,
                                    timestamp = new_timestamp,
                                    kpi_value = new_value
                                    )
                            print(kpi)
                            response = monitoring_client.IncludeKpi(kpi) 
                            print(f"response: {response}")

            # Close the connection 
            connection.close()

    
    except Exception as e:
        print(f"Error: {str(e)}")


def main():

    # Start Thread that listens to context events
    thread_context = threading.Thread(target=thread_context_func)
    thread_context.daemon = True
    thread_context.start()

    # Start Thread that listens to socket
    thread_kpi = threading.Thread(target=thread_kpi_func)
    thread_kpi.daemon = True
    thread_kpi.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        os.remove(socket_path)
        print("Script terminated.")

if __name__ == "__main__":
    main()
+32 −0
Original line number Diff line number Diff line
import socket

# Define the path to the Unix socket
socket_path = "./tmp/sock"

try:
    # Create a socket object
    server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    # Bind the socket to the socket path
    server_socket.bind(socket_path)

    # Listen for incoming connections
    server_socket.listen(1)

    print(f"Listening on {socket_path}...")

    while True:
        # Accept incoming connection
        connection, client_address = server_socket.accept()

        # Read data from the connection
        data = connection.recv(1024)

        if data:
            print(f"Received: {data.decode()}")

        # Close the connection
        connection.close()

except Exception as e:
    print(f"Error: {str(e)}")
+26 −0
Original line number Diff line number Diff line
import socket

# Define the path to the Unix socket
socket_path = "./tmp/sock"

# Data to be sent
data = "hello"

try:
    # Create a socket object
    client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

    # Connect to the Unix socket
    client_socket.connect(socket_path)

    # Send the data
    client_socket.send(data.encode())

    # Close the socket
    client_socket.close()

    print("Sent to the socket.")
except Exception as e:
    print("Error")

+60 −0
Original line number Diff line number Diff line
import socket
import re
import time
import subprocess

socket_path = "./tmp/sock"

def get_kpi_value():
    hostname = "8.8.8.8"
    count = 1
    wait = 5

    total_pings = 0
    successful_pings = 0
    try:
        # Run the ping command and capture the output
        result = subprocess.check_output(["ping", "-W", str(wait), "-c", str(count), hostname], universal_newlines=True)

        response_time = float(re.findall(r"time=([0-9.]+) ms", result)[0])
        successful_pings += 1

    except subprocess.CalledProcessError as e:
        response_time = -1

    total_pings += 1
    moving_loss_ratio = round(((total_pings - successful_pings) / total_pings * 100), 2)

    print("Packet loss: {}%".format(moving_loss_ratio))
    print("Latency: {} ms".format(response_time))
    return response_time

def main():
    try:
        while True:
            start_time = time.time()

            # Ping and capture latency and packet loss
            data = str(get_kpi_value())

            # Write results in socket
            try:
                client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                client_socket.connect(socket_path)
                client_socket.send(data.encode())
                client_socket.close()
            except Exception as e:
                print(e)

            # Calculate the time taken by get_kpi_value()
            execution_time = time.time() - start_time
            # Wait the rest of the time
            wait_time = max(0, 6 - execution_time)
            time.sleep(wait_time)

    except KeyboardInterrupt:
        print("Script terminated.")

if __name__ == "__main__":
    main()