Newer
Older
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.dialects.postgresql import insert
from common.message_broker.MessageBroker import MessageBroker
from sqlalchemy_cockroachdb import run_transaction
from common.proto.context_pb2 import OpticalConfig, OpticalConfigId , Empty , EventTypeEnum
from .models.OpticalConfigModel import OpticalConfigModel , TransponderTypeModel ,OpticalChannelModel
from context.service.database.uuids.OpticalConfig import channel_get_uuid , opticalconfig_get_uuid ,transponder_get_uuid
from .Events import notify_event_opticalconfig
results = session.query(OpticalConfigModel).all()
optical_config.config = json.dumps(obj.dump())
optical_config.opticalconfig_id.opticalconfig_uuid = ids_obj["opticalconfig_uuid"]
optical_config.device_id.device_uuid.uuid=ids_obj["device_uuid"]
optical_configs.append(optical_config)
return optical_configs
obj = run_transaction(sessionmaker(bind=db_engine), callback)
def set_opticalconfig(db_engine : Engine, request : OpticalConfig):
device_id = request.device_id
device_uuid = request.device_id.device_uuid.uuid
config_type=None
opticalconfig_uuid =opticalconfig_get_uuid(device_id)
LOGGER.info(f"cofigy_type {request.config}")
if "type" in config:
config_type= config["type"]
if config_type == "optical-transponder":
transceivers = []
if 'transceivers' in config['transponder'] and len(config['transponder']['transceivers']['transceiver']) > 0:
transceivers = [transceiver for transceiver in config['transponder'] ['transceivers']['transceiver']]
if 'channels' in config['transponder'] and len(config['transponder']['channels']) > 0:
#channels = [channel['name']['index'] for channel in config['channels']]
for channel_params in config['transponder']['channels']:
channels.append(
{
# "opticalconfig_uuid":opticalconfig_uuid,
"transponder_uuid":transponder_get_uuid(device_id),
"channel_uuid":channel_get_uuid(channel_params['name']['index']),
"channel_name" : channel_params['name']['index'],
"frequency" : int(channel_params["frequency"]) if "frequency" in channel_params else 0,
"operational_mode" : int(channel_params["operational-mode"]) if "operational-mode" in channel_params else 0,
"target_output_power" : channel_params["target-output-power"] if "target-output-power" in channel_params else '',
"status":channel_params["status"] if "status" in channel_params else ""
}
)
transponder.append({
"transponder_uuid":transponder_get_uuid(device_id),
"transcievers":transceivers,
"interfaces":None,
"opticalconfig_uuid":opticalconfig_uuid,
})
"opticalconfig_uuid":opticalconfig_uuid,
# "transcievers" : transceivers,
# "interfaces" :"",
"channel_namespace" : config.get("channel_namespace",None),
"endpoints" : [json.dumps(endpoint) for endpoint in config.get("endpoints",[])],
"device_uuid": device_uuid,
"type":config_type
}
stmt = insert(OpticalConfigModel).values(OpticalConfig_data)
index_elements=[OpticalConfigModel.opticalconfig_uuid],
set_=dict(
channel_namespace=stmt.excluded.channel_namespace
)
)
stmt = stmt.returning(OpticalConfigModel.opticalconfig_uuid)
opticalconfig_id = session.execute(stmt).fetchone()
if config_type == 'optical-transponder':
if (len(transponder)>0):
stmt = insert(TransponderTypeModel).values(transponder)
stmt = stmt.on_conflict_do_update(
index_elements=[TransponderTypeModel.transponder_uuid],
set_=dict(
transcievers= stmt.excluded.transcievers ,
)
stmt = stmt.returning(TransponderTypeModel.transponder_uuid)
transponder_id = session.execute(stmt).fetchone()
if (len(channels)>0) :
stmt = insert(OpticalChannelModel).values(channels)
stmt = stmt.on_conflict_do_update(
index_elements=[OpticalChannelModel.channel_uuid ],
set_=dict(
channel_name= stmt.excluded.channel_name ,
frequency = stmt.excluded.frequency,
operational_mode=stmt.excluded.operational_mode,
target_output_power=stmt.excluded.target_output_power,
)
)
stmt = stmt.returning(OpticalChannelModel.channel_uuid)
opticalChannel_id = session.execute(stmt).fetchone()
LOGGER.info(f"added successfully {opticalChannel_id}")
opticalconfig_id = run_transaction(sessionmaker(bind=db_engine), callback)
return {'opticalconfig_uuid': opticalconfig_id}
def select_opticalconfig(db_engine:Engine,request:OpticalConfigId):
def callback(session : Session) -> OpticalConfig:
result = OpticalConfig()
stmt = session.query(OpticalConfigModel)
stmt = stmt.filter_by(opticalconfig_uuid=request.opticalconfig_uuid)
obj = stmt.first()
if obj is not None:
result.config = json.dumps(obj.dump())
ids_obj = obj.dump_id()
result.opticalconfig_id.opticalconfig_uuid = ids_obj["opticalconfig_uuid"]
result.device_id.device_uuid.uuid=ids_obj["device_uuid"]
LOGGER.info(f"select_opticalconfig {result}")
return run_transaction(sessionmaker(bind=db_engine, expire_on_commit=False), callback)
def delete_opticalconfig(db_engine : Engine ,messagebroker : MessageBroker, request : OpticalConfigId):
opticalconfig_uuid = request.opticalconfig_uuid
def callback(session : Session):
query = session.query(OpticalConfigModel)
num_deleted = session.query(OpticalConfigModel).filter_by(opticalconfig_uuid=opticalconfig_uuid).delete()
return num_deleted > 0
deleted = run_transaction(sessionmaker(bind=db_engine), callback)
if deleted:
notify_event_opticalconfig(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, opticalconfig_uuid)
return Empty()