Skip to content
Snippets Groups Projects
GrpcServer.py 5.02 KiB
Newer Older
Pablo Armingol's avatar
Pablo Armingol committed
# Copyright 2022-2024 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging,threading, queue
import logging,threading
import grpc
import logging

from .JavaRunner import JavaRunner

LOGGER = logging.getLogger(__name__)

#_ONE_DAY_IN_SECONDS = 60 * 60 * 24
#SERVER_ADDRESS = 'localhost:2021'

class GrpcServer():

    def __init__(self) -> None: # pylint: disable=super-init-not-called
        self.__lock = threading.Lock()
        self.__started = threading.Event()
        self.__terminate = threading.Event()
        self.__out_samples = queue.Queue()
        self.__server=grpc.aio.server()
        self.__runnerList=[]

    def connectToJavaPcep(self, address):
        runner = JavaRunner(address)
        
        # Sets IpAddress in XML config file for java program
        runner.setPeer(address)

        process_thread = threading.Thread(target=runner.execPcep)
        process_thread.start()
        self.__runnerList.append(runner)

        return process_thread
    
    '''
    def ConnectThread(self) -> bool:
        # TODO: Metodos necesarios para conectarte al speaker
        # If started, assume it is already connected
        if self.__started.is_set(): return True
        self.__started.set() 
        self.__server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        grpcService_pb2_grpc.add_pceServiceServicer_to_server(self, self.__server)
        self.__server.add_insecure_port(SERVER_ADDRESS)
        LOGGER.info("Starting server on %s", SERVER_ADDRESS)
        self.__server.start()
        try:
            while True:
                time.sleep(_ONE_DAY_IN_SECONDS)
        except KeyboardInterrupt:
            LOGGER.info("DISCONNECT")
            self.Disconnect()
        return True

    def Connect(self):
        grpcThread = threading.Thread(target=self.ConnectThread)
        grpcThread.start()
        LOGGER.info("Calling the JavaRunner")
        return True

    def Disconnect(self) -> bool:
        self.__terminate.set()
        # If not started, assume it is already disconnected
        if not self.__started.is_set(): return True
        LOGGER.info("Keyboard interrupt, stop server")
        self.__server.stop(0)
        # Disconnect triggers deactivation of sampling events
        # self.__scheduler.shutdown()
        # exit(0)
        return True

    def update():
        with grpc.insecure_channel('localhost:10060') as channel:
Pablo Armingol's avatar
Pablo Armingol committed
            #n = "initiate lsp directo 10.95.90.56 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13"
            n = "terminate lsp 10.95.90.56 0 nombre"
            #n="create candidatepath 10.95.90.56 1.1.1.1 4 97 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2"
            stub = grpcService_pb2_grpc.pceServiceStub(channel)
            request = grpcService_pb2.commandRequest(command=n)
            print("updateService req: " ,request)
            response = stub.update(request)
            print("updateService client received: " ,response.commandResp)

    def requestToJavaPcep(self,message):
        with grpc.insecure_channel('localhost:10060') as channel:
Pablo Armingol's avatar
Pablo Armingol committed
            #n = "initiate lsp largo2 10.95.90.56 1.1.1.1 1.1.1.2 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2"
            #n = "initiate lsp directo 10.95.90.56 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13"
            LOGGER.debug("LLego al request")
            stub = grpcService_pb2_grpc.pceServiceStub(channel)
            LOGGER.debug("updateService req 2: %s" ,message)
            request = grpcService_pb2.commandRequest(command=message)
            LOGGER.debug("updateService req 2: %s" ,request)
            response = stub.update(request)
            LOGGER.debug("updateServide client received: %s" ,response.commandResp)
            LOGGER.debug("updateServide client received IP: %s" ,response.ipAddress)
            return response.ipAddress

    def terminateRunners(self):
        for runner in self.__runnerList:
            runner.endBGPSpeaker()
        return True
    
    def terminateGrpcServer(self):
        LOGGER.debug("Terminating java programs...")
        self.terminateRunners()
        LOGGER.debug("Disconnecting grpc server...")
        self.Disconnect() 
        return True

    def terminateRunnerById(self,speaker_id):
        """
        Disconnect from BGP-LS speaker given an speaker Id. Its the same
        as the java running proccess PID.
        """
        for runner in self.__runnerList:
            if(runner.getPid()==speaker_id):
                runner.endBGPSpeaker()
                self.__runnerList.remove(runner)

        return True
    '''