# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid,json import datetime, logging from sqlalchemy.dialects.postgresql import insert from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from typing import Dict, List, Optional, Set, Tuple from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException from common.proto.context_pb2 import Device, DeviceFilter, DeviceId, TopologyId,OpticalConfig,OpticalConfigId,OpticalConfigList from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.object_factory.Device import json_device_id from context.service.database.uuids.Topology import topology_get_uuid from .models.DeviceModel import DeviceModel from .models.EndPointModel import EndPointModel from .models.TopologyModel import TopologyDeviceModel from .models.enums.DeviceDriver import grpc_to_enum__device_driver from .models.enums.DeviceOperationalStatus import grpc_to_enum__device_operational_status from .models.enums.KpiSampleType import grpc_to_enum__kpi_sample_type from .uuids.Device import device_get_uuid from .uuids.EndPoint import endpoint_get_uuid from .ConfigRule import compose_config_rules_data, upsert_config_rules from .models.OpticalConfigModel import OpticalConfigModel LOGGER = logging.getLogger(__name__) def get_opticalconfig (db_engine:Engine): def callback(session:Session): lst = list() results = session.query(OpticalConfigModel).all() for obj in results: OpticalConfig=OpticalConfig() OpticalConfig.config=json.dump(obj.config) myid=OpticalConfigId() myid.opticalconfig_uuid=obj.opticalconfig_uuid OpticalConfig.opticalconfig_id.CopyFrom(myid) lst.append(OpticalConfig) return lst obj=run_transaction(sessionmaker(bind=db_engine),callback) return obj def set_opticalconfig (db_engine:Engine,request:OpticalConfig): opticalconfig_id=OpticalConfigId() opticalconfig_id.opticalconfig_uuid=request.opticalconfig_id.opticalconfig_uuid my_config_data=[] if (request.config): channels=[] transceivers=[] config = json.loads(request.config) if ( "channels" in config and len(config["channels"])>0): channels=[channel['name']['index'] for channel in config["channels"]] if ("transceivers" in config and len(config["transceivers"]["transceiver"])>0): transceivers=[transceiver for transceiver in config["transceivers"]["transceiver"]] my_config_data=[ { "opticalconfig_uuid":request.opticalconfig_id.opticalconfig_uuid, "channels":channels, "transcievers":transceivers, "interfaces":json.dumps(config["interfaces"]["interface"]), "channel_namespace":config["channel_namespace"], "endpoints":[json.dumps(endpoint) for endpoint in config["endpoints"]], "frequency":config["frequency"] if "frequency" in config else 0, "operational_mode":config["operational_mode"] if "operational_mode" in config else 0, "output_power":config["output_power"] if "output_power" in config else '', } ] def callback(session:Session)->bool: stmt = insert(OpticalConfigModel).values(my_config_data) stmt = stmt.on_conflict_do_update( index_elements=[OpticalConfigModel.opticalconfig_uuid], set_=dict( channel_namespace=stmt.excluded.channel_namespace ) ) stmt = stmt.returning(OpticalConfigModel.opticalconfig_uuid) id = session.execute(stmt).fetchone() opticalconfig_id =run_transaction(sessionmaker(bind=db_engine),callback) return {'opticalconfig_uuid': opticalconfig_id} def select_opticalconfig(db_engine:Engine,request:OpticalConfigId): def callback(session : Session) -> OpticalConfig: result=OpticalConfig() obj = session.query(OpticalConfigModel).filter_by(opticalconfig_uuid=request.opticalconfig_uuid).first() if (obj is not None): myid=OpticalConfigId() myid.opticalconfig_uuid=obj.opticalconfig_uuid result.config=json.dumps(obj.dump()) result.opticalconfig_id.CopyFrom(myid) return result return run_transaction(sessionmaker(bind=db_engine,expire_on_commit=False), callback)