Loading src/context/service/ContextServiceServicerImpl.py +5 −7 Original line number Diff line number Diff line Loading @@ -370,13 +370,11 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetOpticalBand(self, request : Empty, context : grpc.ServicerContext) -> OpticalBandList: result = get_optical_band(self.db_engine) return OpticalBandList(opticalbands=result) return get_optical_band(self.db_engine) safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectOpticalBand(self, request : OpticalBandId, context : grpc.ServicerContext) -> OpticalBand: result = select_optical_band(self.db_engine,request ) return result return select_optical_band(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetOpticalBand(self, request : OpticalBand, context : grpc.ServicerContext) -> Empty: Loading src/context/service/database/OpticalBand.py +33 −36 Original line number Diff line number Diff line Loading @@ -13,12 +13,12 @@ # limitations under the License. import logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from sqlalchemy.dialects.postgresql import insert from typing import Dict, List, Optional from common.method_wrappers.ServiceExceptions import NotFoundException from typing import Dict, List from common.proto.context_pb2 import OpticalBand, OpticalBandId, OpticalBandList from .models.OpticalConfig.OpticalBandModel import OpticalBandModel Loading @@ -26,34 +26,31 @@ from .models.OpticalConfig.OpticalBandModel import OpticalBandModel LOGGER = logging.getLogger(__name__) def get_optical_band(db_engine : Engine): def callback(session:Session): results = session.query(OpticalBandModel).all() return [obj.dump() for obj in results] obj = run_transaction(sessionmaker(bind=db_engine), callback) return obj def get_optical_band(db_engine : Engine) -> OpticalBandList: def callback(session : Session) -> List[Dict]: obj_list : List[OpticalBandModel] = session.query(OpticalBandModel).all() return [obj.dump() for obj in obj_list] optical_bands = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalBandList(opticalbands=optical_bands) def select_optical_band( db_engine : Engine ,request:OpticalBandId): def select_optical_band(db_engine : Engine, request : OpticalBandId) -> OpticalBand: ob_uuid = request.opticalband_uuid.uuid def callback(session : Session) -> OpticalBand: def callback(session : Session) -> Optional[Dict]: stmt = session.query(OpticalBandModel) stmt = stmt.filter_by(ob_uuid=ob_uuid) obj = stmt.first() if obj is not None: return obj.dump() return None result= run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback) if result is None : return result return OpticalBand(**result) obj = stmt.one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback) if obj is None: raw_ob_uuid = request.opticalband_uuid.uuid raise NotFoundException('OpticalBand', raw_ob_uuid, extra_details=[ 'opticalband_uuid generated was: {:s}'.format(ob_uuid) ]) return OpticalBand(**obj) def set_optical_band(db_engine : Engine, ob_data): def callback(session : Session) -> List[Dict]: if len(ob_data) > 0: stmt = insert(OpticalBandModel).values(ob_data) Loading Loading
src/context/service/ContextServiceServicerImpl.py +5 −7 Original line number Diff line number Diff line Loading @@ -370,13 +370,11 @@ class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceSer @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetOpticalBand(self, request : Empty, context : grpc.ServicerContext) -> OpticalBandList: result = get_optical_band(self.db_engine) return OpticalBandList(opticalbands=result) return get_optical_band(self.db_engine) safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SelectOpticalBand(self, request : OpticalBandId, context : grpc.ServicerContext) -> OpticalBand: result = select_optical_band(self.db_engine,request ) return result return select_optical_band(self.db_engine, request) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetOpticalBand(self, request : OpticalBand, context : grpc.ServicerContext) -> Empty: Loading
src/context/service/database/OpticalBand.py +33 −36 Original line number Diff line number Diff line Loading @@ -13,12 +13,12 @@ # limitations under the License. import logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from sqlalchemy.dialects.postgresql import insert from typing import Dict, List, Optional from common.method_wrappers.ServiceExceptions import NotFoundException from typing import Dict, List from common.proto.context_pb2 import OpticalBand, OpticalBandId, OpticalBandList from .models.OpticalConfig.OpticalBandModel import OpticalBandModel Loading @@ -26,34 +26,31 @@ from .models.OpticalConfig.OpticalBandModel import OpticalBandModel LOGGER = logging.getLogger(__name__) def get_optical_band(db_engine : Engine): def callback(session:Session): results = session.query(OpticalBandModel).all() return [obj.dump() for obj in results] obj = run_transaction(sessionmaker(bind=db_engine), callback) return obj def get_optical_band(db_engine : Engine) -> OpticalBandList: def callback(session : Session) -> List[Dict]: obj_list : List[OpticalBandModel] = session.query(OpticalBandModel).all() return [obj.dump() for obj in obj_list] optical_bands = run_transaction(sessionmaker(bind=db_engine), callback) return OpticalBandList(opticalbands=optical_bands) def select_optical_band( db_engine : Engine ,request:OpticalBandId): def select_optical_band(db_engine : Engine, request : OpticalBandId) -> OpticalBand: ob_uuid = request.opticalband_uuid.uuid def callback(session : Session) -> OpticalBand: def callback(session : Session) -> Optional[Dict]: stmt = session.query(OpticalBandModel) stmt = stmt.filter_by(ob_uuid=ob_uuid) obj = stmt.first() if obj is not None: return obj.dump() return None result= run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback) if result is None : return result return OpticalBand(**result) obj = stmt.one_or_none() return None if obj is None else obj.dump() obj = run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback) if obj is None: raw_ob_uuid = request.opticalband_uuid.uuid raise NotFoundException('OpticalBand', raw_ob_uuid, extra_details=[ 'opticalband_uuid generated was: {:s}'.format(ob_uuid) ]) return OpticalBand(**obj) def set_optical_band(db_engine : Engine, ob_data): def callback(session : Session) -> List[Dict]: if len(ob_data) > 0: stmt = insert(OpticalBandModel).values(ob_data) Loading