# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json, logging from sqlalchemy.dialects.postgresql import insert from common.message_broker.MessageBroker import MessageBroker from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, sessionmaker from sqlalchemy_cockroachdb import run_transaction from common.proto.context_pb2 import OpticalConfig, OpticalConfigId , Empty , EventTypeEnum from .models.OpticalConfigModel import OpticalConfigModel , OpticalChannelModel from context.service.database.uuids.OpticalConfig import channel_get_uuid from .Events import notify_event_opticalconfig LOGGER = logging.getLogger(__name__) def get_opticalconfig(db_engine : Engine): def callback(session:Session): optical_configs = list() results = session.query(OpticalConfigModel).all() for obj in results: optical_config = OpticalConfig() optical_config.config = json.dumps(obj.dump()) optical_config.opticalconfig_id.opticalconfig_uuid = obj.dump_id()["opticalconfig_uuid"] optical_configs.append(optical_config) return optical_configs obj = run_transaction(sessionmaker(bind=db_engine), callback) return obj def set_opticalconfig(db_engine : Engine, request : OpticalConfig): opticalconfig_id = OpticalConfigId() opticalconfig_id.opticalconfig_uuid = request.opticalconfig_id.opticalconfig_uuid OpticalConfig_data = [] if request.config: channels = [] transceivers = [] config = json.loads(request.config) if 'transceivers' in config and len(config['transceivers']['transceiver']) > 0: transceivers = [transceiver for transceiver in config['transceivers']['transceiver']] if 'channels' in config and len(config['channels']) > 0: #channels = [channel['name']['index'] for channel in config['channels']] for channel_params in config['channels']: channels.append( { "channel_uuid":channel_get_uuid(channel_params['name']['index']), "opticalconfig_uuid": request.opticalconfig_id.opticalconfig_uuid, "channel_name" : channel_params['name']['index'], "frequency" : int(channel_params["frequency"]) if "frequency" in channel_params else 0, "operational_mode" : int(channel_params["operational-mode"]) if "operational-mode" in channel_params else 0, "target_output_power" : channel_params["target-output-power"] if "target-output-power" in channel_params else '', } ) OpticalConfig_data.append( { "opticalconfig_uuid": request.opticalconfig_id.opticalconfig_uuid, "transcievers" : transceivers, "interfaces" :"", "channel_namespace" : config.get("channel_namespace",None), "endpoints" : [json.dumps(endpoint) for endpoint in config.get("endpoints",[])],} ) def callback(session:Session)->bool: stmt = insert(OpticalConfigModel).values(OpticalConfig_data) stmt = stmt.on_conflict_do_update( index_elements=[OpticalConfigModel.opticalconfig_uuid], set_=dict( channel_namespace=stmt.excluded.channel_namespace ) ) stmt = stmt.returning(OpticalConfigModel.opticalconfig_uuid) opticalconfig_id = session.execute(stmt).fetchone() if (len(channels)>0) : stmt = insert(OpticalChannelModel).values(channels) stmt = stmt.on_conflict_do_update( index_elements=[OpticalChannelModel.channel_uuid , OpticalConfigModel.opticalconfig_uuid], set_=dict( channel_name= stmt.excluded.channel_name , frequency = stmt.excluded.frequency, operational_mode=stmt.excluded.operational_mode, target_output_power=stmt.excluded.target_output_power, ) ) stmt = stmt.returning(OpticalChannelModel.channel_uuid) opticalChannel_id = session.execute(stmt).fetchone() opticalconfig_id = run_transaction(sessionmaker(bind=db_engine), callback) return {'opticalconfig_uuid': opticalconfig_id} def select_opticalconfig(db_engine:Engine,request:OpticalConfigId): def callback(session : Session) -> OpticalConfig: result = OpticalConfig() stmt = session.query(OpticalConfigModel) stmt = stmt.filter_by(opticalconfig_uuid=request.opticalconfig_uuid) obj = stmt.first() if obj is not None: result.config = json.dumps(obj.dump()) result.opticalconfig_id.opticalconfig_uuid = obj.opticalconfig_uuid return result return run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback) def delete_opticalconfig(db_engine : Engine ,messagebroker : MessageBroker, request : OpticalConfigId): opticalconfig_uuid = request.opticalconfig_uuid def callback(session : Session): query = session.query(OpticalConfigModel) num_deleted = session.query(OpticalConfigModel).filter_by(opticalconfig_uuid=opticalconfig_uuid).delete() return num_deleted > 0 deleted = run_transaction(sessionmaker(bind=db_engine), callback) if deleted: notify_event_opticalconfig(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, opticalconfig_uuid) return Empty()