# Copyright 2022-2024 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging,threading, queue import logging,threading import grpc import logging from .JavaRunner import JavaRunner LOGGER = logging.getLogger(__name__) #_ONE_DAY_IN_SECONDS = 60 * 60 * 24 #SERVER_ADDRESS = 'localhost:2021' class GrpcServer(): def __init__(self) -> None: # pylint: disable=super-init-not-called self.__lock = threading.Lock() self.__started = threading.Event() self.__terminate = threading.Event() self.__out_samples = queue.Queue() self.__server=grpc.aio.server() self.__runnerList=[] def connectToJavaPcep(self, address): runner = JavaRunner(address) # Sets IpAddress in XML config file for java program runner.setPeer(address) process_thread = threading.Thread(target=runner.execPcep) process_thread.start() self.__runnerList.append(runner) return process_thread ''' def ConnectThread(self) -> bool: # TODO: Metodos necesarios para conectarte al speaker # If started, assume it is already connected if self.__started.is_set(): return True self.__started.set() self.__server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) grpcService_pb2_grpc.add_pceServiceServicer_to_server(self, self.__server) self.__server.add_insecure_port(SERVER_ADDRESS) LOGGER.info("Starting server on %s", SERVER_ADDRESS) self.__server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: LOGGER.info("DISCONNECT") self.Disconnect() return True def Connect(self): grpcThread = threading.Thread(target=self.ConnectThread) grpcThread.start() LOGGER.info("Calling the JavaRunner") return True def Disconnect(self) -> bool: self.__terminate.set() # If not started, assume it is already disconnected if not self.__started.is_set(): return True LOGGER.info("Keyboard interrupt, stop server") self.__server.stop(0) # Disconnect triggers deactivation of sampling events # self.__scheduler.shutdown() # exit(0) return True def update(): with grpc.insecure_channel('localhost:10060') as channel: #n = "initiate lsp directo 10.95.86.214 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13" n = "terminate lsp 10.95.86.214 0 nombre" #n="create candidatepath 10.95.86.214 1.1.1.1 4 97 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2" stub = grpcService_pb2_grpc.pceServiceStub(channel) request = grpcService_pb2.commandRequest(command=n) print("updateService req: " ,request) response = stub.update(request) print("updateService client received: " ,response.commandResp) def requestToJavaPcep(self,message): with grpc.insecure_channel('localhost:10060') as channel: #n = "initiate lsp largo2 10.95.86.214 1.1.1.1 1.1.1.2 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2" #n = "initiate lsp directo 10.95.86.214 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13" LOGGER.debug("LLego al request") stub = grpcService_pb2_grpc.pceServiceStub(channel) LOGGER.debug("updateService req 2: %s" ,message) request = grpcService_pb2.commandRequest(command=message) LOGGER.debug("updateService req 2: %s" ,request) response = stub.update(request) LOGGER.debug("updateServide client received: %s" ,response.commandResp) LOGGER.debug("updateServide client received IP: %s" ,response.ipAddress) return response.ipAddress def terminateRunners(self): for runner in self.__runnerList: runner.endBGPSpeaker() return True def terminateGrpcServer(self): LOGGER.debug("Terminating java programs...") self.terminateRunners() LOGGER.debug("Disconnecting grpc server...") self.Disconnect() return True def terminateRunnerById(self,speaker_id): """ Disconnect from BGP-LS speaker given an speaker Id. Its the same as the java running proccess PID. """ for runner in self.__runnerList: if(runner.getPid()==speaker_id): runner.endBGPSpeaker() self.__runnerList.remove(runner) return True '''