GrpcServer.py 7.34 KB
Newer Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json, logging,threading, queue,time,signal
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Optional, Tuple, Union
# from apscheduler.executors.pool import ThreadPoolExecutor
# from apscheduler.job import Job
# from apscheduler.jobstores.memory import MemoryJobStore
# from apscheduler.schedulers.background import BackgroundScheduler
# from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF
# from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type


import logging,threading
import grpc

from bgpls_speaker.service.tools.DiscoveredDBManager import DiscoveredDBManager
from .protos import grpcService_pb2_grpc
from .protos import grpcService_pb2
from .Tools import UpdateRequest

from concurrent import futures
import os
import subprocess
from multiprocessing import Pool
import logging

from .JavaRunner import JavaRunner

LOGGER = logging.getLogger(__name__)

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
SERVER_ADDRESS = 'localhost:2021'

class GrpcServer():

    """
    This class gets the current topology from a bgps speaker module in java 
    and updates the posible new devices to add in the context topology.
    Needs the address, port and as_number from the device that will provide the information via bgpls
    to the java module.
    """
    def __init__(self,DiscoveredDB : DiscoveredDBManager) -> None: # pylint: disable=super-init-not-called
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__out_samples = queue.Queue()
        self.__server=grpc.aio.server()
        # self.__address="10.95.86.214"
        # self.__port=179
        # self.__asNumber=65006
        # self.__configFile="TMConfiguration_guillermo.xml"
        # self.__process=0
        self.__javaLocalPort=0 # --> BGP4Port in XML file
        self.__mngPort=0 # Port used in XML config file for management (NOT used in TFS)
        self.__runnerList=[]
        # Data base for saving all new devices discovered
        self.__discoveredDB=DiscoveredDB
        # self.__comms=grpcComms
        # Este tendría que tener la info del runner al que se connecta¿
    
    def ConnectThread(self) -> bool:
        # TODO: Metodos necesarios para conectarte al speaker
        # If started, assume it is already connected
        if self.__started.is_set(): return True
        self.__started.set() #notifyAll -->event.is_set()
        # 10 workers ?
        self.__server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        grpcService_pb2_grpc.add_updateServiceServicer_to_server(self, self.__server)
        self.__server.add_insecure_port(SERVER_ADDRESS)
        # server.add_secure_port(SERVER_ADDRESS)
        LOGGER.info("Starting server on %s", SERVER_ADDRESS)
        self.__server.start()
        try:
            while True:
                time.sleep(_ONE_DAY_IN_SECONDS)
        except KeyboardInterrupt:
            LOGGER.info("DISCONNECT")
            self.Disconnect()
        return True

    def Connect(self):
        grpcThread = threading.Thread(target=self.ConnectThread)
        grpcThread.start()
        return True

    def Disconnect(self) -> bool:
        self.__terminate.set()
        # If not started, assume it is already disconnected
        if not self.__started.is_set(): return True
        LOGGER.info("Keyboard interrupt, stop server")
        self.__server.stop(0)
        # Disconnect triggers deactivation of sampling events
        # self.__scheduler.shutdown()
        # exit(0)
        return True

    def update(self,request, context) -> bool:
        """
        Processes the messages recived by de grpc server
        """
        with self.__lock:
            #TODO: Get update
            LOGGER.info("(server) Update message from bgpls speaker: \n %s" % (request))
            response = grpcService_pb2.updateResponse(ack="OK")
            update_request = UpdateRequest.from_proto(request)
            self.__discoveredDB.AddToDB(update_request)
            # LOGGER.debug("Update class string %s",update_request.toString())
            return response
    


    def connectToJavaBgpls(self, address : str = "10.95.86.214", port : str = "179", asNumber : str = "65006"):

        # Get unused* port
        self.setLocalPort()
        runner = JavaRunner(self.__javaLocalPort,address,self.__mngPort)
        # Sets port in XML config file for java program
        runner.setAsNumber(asNumber)
        runner.setPort(port)
        runner.setPeer()
        process=runner.execBGPLSpeaker()
        self.__runnerList.append(runner)

        return process.pid

    def terminateRunners(self):
        for runner in self.__runnerList:
            runner.endBGPSpeaker()
        return True
    
    def terminateGrpcServer(self):
        LOGGER.debug("Terminating java programs...")
        self.terminateRunners()
        LOGGER.debug("Disconnecting grpc server...")
        self.Disconnect() 
        return True

    def terminateRunnerById(self,speaker_id):
        """
        Disconnect from BGP-LS speaker given an speaker Id. Its the same
        as the java running proccess PID.
        """
        for runner in self.__runnerList:
            if(runner.getPid()==speaker_id):
                runner.endBGPSpeaker()
                self.__runnerList.remove(runner)

        return True
        
    def setLocalPort(self,initPort=12179):
        """
        If java already running add 1 to current used port,
        else initialize port .
        initPort --> BGP4Port, usually 179 corresponding to BGP
        """
        with self.__lock:
            if(self.__runnerList):
                LOGGER.debug("Port exists %s",self.__javaLocalPort)
                lastRunner=self.__runnerList[-1]
                self.__javaLocalPort=lastRunner.getCurrentLocalPort()+1
                self.__mngPort=lastRunner.getCurrentMngPort()+1
            else:
                LOGGER.debug("Port DONT exists %s",self.__javaLocalPort)
                self.__javaLocalPort=initPort
                self.__mngPort=1112 # default management port
            return self.__javaLocalPort
        
    def getSpeakerListIds(self):
        return [runner.getPid() for runner in self.__runnerList]

    def getSpeakerFromId(self,speaker_id):
        """
        Returns address,as_number,peer_port
        """
        for runner in self.__runnerList:
            if(runner.getPid()==speaker_id):
                return runner.getRunnerInfo()
        return None
    
    def getSpeakerIdFromIpAddr(self,addr):
        """
        Returns Id from the speaker IP Address
        """
        for runner in self.__runnerList:
            ip_addr,asN,port=runner.getRunnerInfo()
            if(ip_addr==addr):
                return runner.getPid()
        return