# Copyright 2022-2024 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import grpc, logging from common.proto.context_pb2 import Empty from pcep.service.tools.GrpcServer import GrpcServer from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.pcep_pb2 import RequestRq, RequestRp, PceIpRq, PceIpRp, LSPdb_Request, LSPdb_Response, \ Session_Request, Session_Response, LspID, LspDescriptor from common.proto.pcep_pb2_grpc import PcepServiceServicer from pcep.database.LSP_DB import LspDB from pcep.database.LSPModel import LspModel LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'RPC') class PcepServiceServicerImpl(PcepServiceServicer): def __init__(self, pcepServer : GrpcServer) -> None: LOGGER.debug('Creating Servicer...') self.pcepServer=pcepServer LOGGER.debug('Servicer Created') LOGGER.info("Init LSP Database service") self.lsp_db_obj = LspDB() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def configuratePCE(self, request : PceIpRq, context : grpc.ServicerContext) -> PceIpRp: LOGGER.debug("(ConfiguratePCE) Create pce instance %s",request) configurateIP=self.pcepServer.connectToJavaPcep(request.address) #return PceIpRp(addressRp=configurateIP) return PceIpRp(addressRp="10.95.90.150") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def sendRequest(self, request : RequestRq, context : grpc.ServicerContext) -> RequestRp: LOGGER.debug("(Send Request) Send: %s",request.command) message=self.pcepServer.requestToJavaPcep(request.command) return RequestRp(commandRp=message) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def getLSPdb(self, request: LSPdb_Request, context : grpc.ServicerContext) -> LSPdb_Response: LOGGER.debug("GET LSPDB") response = self.pcepServer.get_lspdb_from_java(request) return response @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def getSessionsInfo(self, request: Session_Request, context : grpc.ServicerContext) -> Session_Response: LOGGER.debug("GET SESSIONS INFO") sessionresponse = self.pcepServer.get_sessions_info_java(request) return sessionresponse # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # def GetLSPDescriptor(self, request: LspID, grpc_context: grpc.ServicerContext) -> LspDescriptor: # response = LspDescriptor() # LOGGER.info(f"Received gRPC message object: {request}") # try: # # Obtener el identificador del LSP desde la solicitud # lsp_id_to_search = request.lsp_id.uuid # El LspID contiene el lsp_id que se busca # row = self.lsp_db_obj.search_db_row_by_id(LspModel, 'lsp_id', lsp_id_to_search) # if row is None: # LOGGER.info(f"No matching row found for LSP id: {lsp_id_to_search}") # return Empty() # Si no se encuentra el LSP, devolvemos un Empty # # Convertir la fila obtenida a un LspDescriptor y devolverla # response = LspModel.convert_row_to_LspDescriptor(row) # return response # except Exception as e: # LOGGER.error(f"Unable to search LSP id. Error: {e}") # raise e # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # def SetKpiDescriptor(self, request: LspDescriptor, grpc_context: grpc.ServicerContext # type: ignore # ) -> LspID: # type: ignore # response = LspID() # LOGGER.info("Received gRPC message object: {:}".format(request)) # try: # lsp_to_insert = LspModel.convert_LspDescriptor_to_row(request) # if (self.lsp_db_obj.add_row_to_db(lsp_to_insert)): # response.lsp_id.uuid = request.lsp_id.uuid # # LOGGER.info("Added Row: {:}".format(response)) # return response # except Exception as e: # LOGGER.info("Unable to create LspModel class object. {:}".format(e)) # @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) # def DeleteKpiDescriptor(self, request: LspID, grpc_context: grpc.ServicerContext # type: ignore # ) -> Empty: # type: ignore # LOGGER.info("Received gRPC message object: {:}".format(request)) # try: # lsp_id_to_search = request.lsp_id.uuid # El LspID contiene el lsp_id que se busca # self.lsp_db_obj.delete_db_row_by_id(LspModel, 'kpi_id', lsp_id_to_search) # except Exception as e: # LOGGER.info('Unable to search kpi id. {:}'.format(e)) # finally: # return Empty()