Loading src/pathcompextended/service/PathCompExtendedServiceServicerImpl.py +128 −25 Original line number Diff line number Diff line Loading @@ -185,8 +185,24 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): LOGGER.debug('[GetNetworkContext] begin') reply = NetworkContext() # TODO: Implement network context retrieval logic # This may involve querying H-RAT or returning cached context if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting network context from H-RAT") with self._lock: hrat_response = self._hrat_client.get_network_context() for topology in hrat_response.data: proto_topology = reply.topologies.add() proto_topology.topology_id = topology.topology_id proto_topology.controller_id = topology.controller_id proto_topology.topology_type = topology.topology_type proto_topology.raw_json_topology = topology.raw_json_topology LOGGER.debug( "HRAT network context retrieved: {:d} topologies".format(len(reply.topologies)) ) except Exception as e: LOGGER.error('[GetNetworkContext] Error getting context from H-RAT: {:s}'.format(str(e))) raise LOGGER.debug('[GetNetworkContext] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading @@ -196,7 +212,24 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): LOGGER.debug('[GetSpecificNetworkContext] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = NetworkTopology() # TODO: Implement specific network context retrieval logic if is_hrat_enabled() and self._hrat_client: try: controller_id = request.value LOGGER.debug("Getting network context for controller %s from H-RAT", controller_id) with self._lock: hrat_response = self._hrat_client.get_network_context_by_controller(controller_id) if hrat_response.data: topology = hrat_response.data[0] reply.topology_id = topology.topology_id reply.controller_id = topology.controller_id reply.topology_type = topology.topology_type reply.raw_json_topology = topology.raw_json_topology LOGGER.debug("HRAT network context for controller %s retrieved", controller_id) # else: reply stays empty (no topology found for controller) except Exception as e: LOGGER.error('[GetSpecificNetworkContext] Error getting context from H-RAT: {:s}'.format(str(e))) raise LOGGER.debug('[GetSpecificNetworkContext] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading Loading @@ -273,19 +306,53 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): def GetTransportOpticalSlices(self, request: ProtoEmpty, context: grpc.ServicerContext): LOGGER.debug('[GetTransportOpticalSlices] begin') # TODO: Implement streaming retrieval of transport optical slices # This should yield TransportOpticalSlice messages if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting all transport optical slices from H-RAT") with self._lock: hrat_response = self._hrat_client.get_all_transport_optical_slices() for item in hrat_response.data: reply = TransportOpticalSlice() reply.transport_optical_slice_uuid.value = item.optical_slice_uuid reply.viability = item.viability reply.raw_optical_slice_json = json.dumps( [action.model_dump(by_alias=True) for action in item.actions] ) yield reply LOGGER.debug( "HRAT transport optical slices retrieved: {:d} items".format(len(hrat_response.data)) ) except Exception as e: LOGGER.error('[GetTransportOpticalSlices] Error getting slices from H-RAT: {:s}'.format(str(e))) raise # else: yield nothing (empty stream) LOGGER.debug('[GetTransportOpticalSlices] end') # Return empty generator for now return iter([]) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTransportOpticalSlice(self, request: UUID, context: grpc.ServicerContext) -> TransportOpticalSlice: LOGGER.debug('[GetTransportOpticalSlice] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = TransportOpticalSlice() # TODO: Implement transport optical slice retrieval logic if is_hrat_enabled() and self._hrat_client: try: slice_uuid = request.value LOGGER.debug("Getting transport optical slice %s from H-RAT", slice_uuid) with self._lock: slice_data = self._hrat_client.get_transport_optical_slice(slice_uuid) reply.transport_optical_slice_uuid.value = slice_data.data.optical_slice_uuid reply.viability = slice_data.data.viability reply.raw_optical_slice_json = json.dumps( [action.model_dump(by_alias=True) for action in slice_data.data.actions] ) LOGGER.debug("HRAT transport optical slice %s retrieved", slice_uuid) except Exception as e: LOGGER.error('[GetTransportOpticalSlice] Error getting slice from H-RAT: {:s}'.format(str(e))) raise else: LOGGER.warning('[GetTransportOpticalSlice] H-RAT not enabled') raise grpc.RpcError(grpc.StatusCode.UNIMPLEMENTED, "H-RAT integration required") LOGGER.debug('[GetTransportOpticalSlice] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading Loading @@ -374,19 +441,55 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): def GetTransportNetworkSlicesL3(self, request: ProtoEmpty, context: grpc.ServicerContext): LOGGER.debug('[GetTransportNetworkSlicesL3] begin') # TODO: Implement streaming retrieval of transport network slices L3 # This should yield TransportNetworkSliceL3 messages if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting all transport network slices L3 from H-RAT") with self._lock: hrat_response = self._hrat_client.get_all_transport_network_slices_l3() for item in hrat_response.data: reply = TransportNetworkSliceL3() reply.transport_optical_slice_uuid.value = item.optical_slice_uuid reply.transport_network_slice_l3_uuid.value = item.transport_network_slice_l3_uuid reply.viability = item.viability reply.raw_actions_json = json.dumps( [action.model_dump(by_alias=True) for action in item.actions] ) yield reply LOGGER.debug( "HRAT transport network slices L3 retrieved: {:d} items".format(len(hrat_response.data)) ) except Exception as e: LOGGER.error('[GetTransportNetworkSlicesL3] Error getting slices from H-RAT: {:s}'.format(str(e))) raise # else: yield nothing (empty stream) LOGGER.debug('[GetTransportNetworkSlicesL3] end') # Return empty generator for now return iter([]) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTransportNetworkSliceL3(self, request: UUID, context: grpc.ServicerContext) -> TransportNetworkSliceL3: LOGGER.debug('[GetTransportNetworkSliceL3] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = TransportNetworkSliceL3() # TODO: Implement transport network slice L3 retrieval logic if is_hrat_enabled() and self._hrat_client: try: slice_uuid = request.value LOGGER.debug("Getting transport network slice L3 %s from H-RAT", slice_uuid) with self._lock: slice_data = self._hrat_client.get_transport_network_slice_l3(slice_uuid) reply.transport_optical_slice_uuid.value = slice_data.data.optical_slice_uuid reply.transport_network_slice_l3_uuid.value = slice_data.data.transport_network_slice_l3_uuid reply.viability = slice_data.data.viability reply.raw_actions_json = json.dumps( [action.model_dump(by_alias=True) for action in slice_data.data.actions] ) LOGGER.debug("HRAT transport network slice L3 %s retrieved", slice_uuid) except Exception as e: LOGGER.error('[GetTransportNetworkSliceL3] Error getting slice from H-RAT: {:s}'.format(str(e))) raise else: LOGGER.warning('[GetTransportNetworkSliceL3] H-RAT not enabled') raise grpc.RpcError(grpc.StatusCode.UNIMPLEMENTED, "H-RAT integration required") LOGGER.debug('[GetTransportNetworkSliceL3] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading
src/pathcompextended/service/PathCompExtendedServiceServicerImpl.py +128 −25 Original line number Diff line number Diff line Loading @@ -185,8 +185,24 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): LOGGER.debug('[GetNetworkContext] begin') reply = NetworkContext() # TODO: Implement network context retrieval logic # This may involve querying H-RAT or returning cached context if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting network context from H-RAT") with self._lock: hrat_response = self._hrat_client.get_network_context() for topology in hrat_response.data: proto_topology = reply.topologies.add() proto_topology.topology_id = topology.topology_id proto_topology.controller_id = topology.controller_id proto_topology.topology_type = topology.topology_type proto_topology.raw_json_topology = topology.raw_json_topology LOGGER.debug( "HRAT network context retrieved: {:d} topologies".format(len(reply.topologies)) ) except Exception as e: LOGGER.error('[GetNetworkContext] Error getting context from H-RAT: {:s}'.format(str(e))) raise LOGGER.debug('[GetNetworkContext] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading @@ -196,7 +212,24 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): LOGGER.debug('[GetSpecificNetworkContext] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = NetworkTopology() # TODO: Implement specific network context retrieval logic if is_hrat_enabled() and self._hrat_client: try: controller_id = request.value LOGGER.debug("Getting network context for controller %s from H-RAT", controller_id) with self._lock: hrat_response = self._hrat_client.get_network_context_by_controller(controller_id) if hrat_response.data: topology = hrat_response.data[0] reply.topology_id = topology.topology_id reply.controller_id = topology.controller_id reply.topology_type = topology.topology_type reply.raw_json_topology = topology.raw_json_topology LOGGER.debug("HRAT network context for controller %s retrieved", controller_id) # else: reply stays empty (no topology found for controller) except Exception as e: LOGGER.error('[GetSpecificNetworkContext] Error getting context from H-RAT: {:s}'.format(str(e))) raise LOGGER.debug('[GetSpecificNetworkContext] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading Loading @@ -273,19 +306,53 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): def GetTransportOpticalSlices(self, request: ProtoEmpty, context: grpc.ServicerContext): LOGGER.debug('[GetTransportOpticalSlices] begin') # TODO: Implement streaming retrieval of transport optical slices # This should yield TransportOpticalSlice messages if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting all transport optical slices from H-RAT") with self._lock: hrat_response = self._hrat_client.get_all_transport_optical_slices() for item in hrat_response.data: reply = TransportOpticalSlice() reply.transport_optical_slice_uuid.value = item.optical_slice_uuid reply.viability = item.viability reply.raw_optical_slice_json = json.dumps( [action.model_dump(by_alias=True) for action in item.actions] ) yield reply LOGGER.debug( "HRAT transport optical slices retrieved: {:d} items".format(len(hrat_response.data)) ) except Exception as e: LOGGER.error('[GetTransportOpticalSlices] Error getting slices from H-RAT: {:s}'.format(str(e))) raise # else: yield nothing (empty stream) LOGGER.debug('[GetTransportOpticalSlices] end') # Return empty generator for now return iter([]) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTransportOpticalSlice(self, request: UUID, context: grpc.ServicerContext) -> TransportOpticalSlice: LOGGER.debug('[GetTransportOpticalSlice] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = TransportOpticalSlice() # TODO: Implement transport optical slice retrieval logic if is_hrat_enabled() and self._hrat_client: try: slice_uuid = request.value LOGGER.debug("Getting transport optical slice %s from H-RAT", slice_uuid) with self._lock: slice_data = self._hrat_client.get_transport_optical_slice(slice_uuid) reply.transport_optical_slice_uuid.value = slice_data.data.optical_slice_uuid reply.viability = slice_data.data.viability reply.raw_optical_slice_json = json.dumps( [action.model_dump(by_alias=True) for action in slice_data.data.actions] ) LOGGER.debug("HRAT transport optical slice %s retrieved", slice_uuid) except Exception as e: LOGGER.error('[GetTransportOpticalSlice] Error getting slice from H-RAT: {:s}'.format(str(e))) raise else: LOGGER.warning('[GetTransportOpticalSlice] H-RAT not enabled') raise grpc.RpcError(grpc.StatusCode.UNIMPLEMENTED, "H-RAT integration required") LOGGER.debug('[GetTransportOpticalSlice] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply Loading Loading @@ -374,19 +441,55 @@ class PathCompExtendedServiceServicerImpl(PathCompExtendedServiceServicer): def GetTransportNetworkSlicesL3(self, request: ProtoEmpty, context: grpc.ServicerContext): LOGGER.debug('[GetTransportNetworkSlicesL3] begin') # TODO: Implement streaming retrieval of transport network slices L3 # This should yield TransportNetworkSliceL3 messages if is_hrat_enabled() and self._hrat_client: try: LOGGER.debug("Getting all transport network slices L3 from H-RAT") with self._lock: hrat_response = self._hrat_client.get_all_transport_network_slices_l3() for item in hrat_response.data: reply = TransportNetworkSliceL3() reply.transport_optical_slice_uuid.value = item.optical_slice_uuid reply.transport_network_slice_l3_uuid.value = item.transport_network_slice_l3_uuid reply.viability = item.viability reply.raw_actions_json = json.dumps( [action.model_dump(by_alias=True) for action in item.actions] ) yield reply LOGGER.debug( "HRAT transport network slices L3 retrieved: {:d} items".format(len(hrat_response.data)) ) except Exception as e: LOGGER.error('[GetTransportNetworkSlicesL3] Error getting slices from H-RAT: {:s}'.format(str(e))) raise # else: yield nothing (empty stream) LOGGER.debug('[GetTransportNetworkSlicesL3] end') # Return empty generator for now return iter([]) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetTransportNetworkSliceL3(self, request: UUID, context: grpc.ServicerContext) -> TransportNetworkSliceL3: LOGGER.debug('[GetTransportNetworkSliceL3] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) reply = TransportNetworkSliceL3() # TODO: Implement transport network slice L3 retrieval logic if is_hrat_enabled() and self._hrat_client: try: slice_uuid = request.value LOGGER.debug("Getting transport network slice L3 %s from H-RAT", slice_uuid) with self._lock: slice_data = self._hrat_client.get_transport_network_slice_l3(slice_uuid) reply.transport_optical_slice_uuid.value = slice_data.data.optical_slice_uuid reply.transport_network_slice_l3_uuid.value = slice_data.data.transport_network_slice_l3_uuid reply.viability = slice_data.data.viability reply.raw_actions_json = json.dumps( [action.model_dump(by_alias=True) for action in slice_data.data.actions] ) LOGGER.debug("HRAT transport network slice L3 %s retrieved", slice_uuid) except Exception as e: LOGGER.error('[GetTransportNetworkSliceL3] Error getting slice from H-RAT: {:s}'.format(str(e))) raise else: LOGGER.warning('[GetTransportNetworkSliceL3] H-RAT not enabled') raise grpc.RpcError(grpc.StatusCode.UNIMPLEMENTED, "H-RAT integration required") LOGGER.debug('[GetTransportNetworkSliceL3] end ; reply = {:s}'.format(grpc_message_to_json_string(reply))) return reply