# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime, json, logging from sqlalchemy import delete #from sqlalchemy.dialects import postgresql from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session from typing import Dict, List, Optional, Set from common.proto.context_pb2 import Component from common.proto.context_pb2 import ConfigRule from common.tools.grpc.Tools import grpc_message_to_json_string from .models.ComponentModel import ComponentModel from .uuids._Builder import get_uuid_from_string from .uuids.EndPoint import endpoint_get_uuid from sqlalchemy.engine import Engine from sqlalchemy.orm import Session, selectinload, sessionmaker from sqlalchemy_cockroachdb import run_transaction from common.proto.context_pb2 import ComponentIdList from .models.ComponentModel import ComponentModel from .uuids.Component import component_get_uuid from .ConfigRule import compose_config_rules_data LOGGER = logging.getLogger(__name__) def compose_components_data( components : List[Component], now : datetime.datetime, device_uuid : Optional[str] = None, service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None ) -> List[Dict]: dict_components : List[Dict] = list() for position,component in enumerate(components): str_kind = component.WhichOneof('config_rule') LOGGER.info("DATA") message = (grpc_message_to_json_string(getattr(component, str_kind, {}))) data = json.loads(message) resource_key = data["resource_key"] resource_value = data["resource_value"] if '/inventory' in resource_key: LOGGER.info('Parametros: KEY',resource_key,'Value',resource_value) dict_component = { 'data' : resource_value, 'created_at': now, 'updated_at': now, } parent_kind,parent_uuid = '',None if device_uuid is not None: dict_component['device_uuid'] = device_uuid parent_kind,parent_uuid = 'device',device_uuid elif service_uuid is not None: dict_component['service_uuid'] = service_uuid parent_kind,parent_uuid = 'service',service_uuid elif slice_uuid is not None: dict_component['slice_uuid'] = slice_uuid parent_kind,parent_uuid = 'slice',slice_uuid else: MSG = 'Parent for Component({:s}) cannot be identified '+\ '(device_uuid={:s}, service_uuid={:s}, slice_uuid={:s})' str_component = grpc_message_to_json_string(component) raise Exception(MSG.format(str_component, str(device_uuid), str(service_uuid), str(slice_uuid))) componenet_name = '{:s}:{:s}:{:s}'.format(parent_kind, 'custom', component.custom.resource_key) component_uuid = get_uuid_from_string(componenet_name, prefix_for_name=parent_uuid) dict_component['component_uuid'] = component_uuid dict_components.append(dict_component) else: continue LOGGER.info('Parametros:',dict_components) return dict_components ''' def upsert_components( session : Session, components : List[Dict], is_delete : bool = False, device_uuid : Optional[str] = None, service_uuid : Optional[str] = None, slice_uuid : Optional[str] = None, ) -> bool: if device_uuid is not None and service_uuid is None and slice_uuid is None: klass = ComponentModel else: MSG = 'DataModel cannot be identified (device_uuid={:s})' raise Exception(MSG.format(str(device_uuid))) uuids_to_upsert : Dict[str, int] = dict() rules_to_upsert : List[Dict] = list() for component in components: component_uuid = component['component_uuid'] position = uuids_to_upsert.get(component_uuid) if position is None: # if not added, add it rules_to_upsert.append(component) uuids_to_upsert[component_uuid] = len(rules_to_upsert) - 1 else: # if already added, update occurrence rules_to_upsert[position] = component upsert_affected = False if len(rules_to_upsert) > 0: stmt = insert(klass).values(rules_to_upsert) stmt = stmt.on_conflict_do_update( index_elements=[klass.component_uuid], set_=dict( data = stmt.excluded.data, updated_at = stmt.excluded.updated_at, ) ) stmt = stmt.returning(klass.created_at, klass.updated_at) #str_stmt = stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}) #LOGGER.warning('upsert stmt={:s}'.format(str(str_stmt))) components_updates = session.execute(stmt).fetchall() upsert_affected = any([(updated_at > created_at) for created_at,updated_at in components_updates]) return upsert_affected def component_list_names(db_engine: Engine, request: ComponentIdList) -> List[Dict]: component_uuids = { component_get_uuid(component_id, allow_random=False)[-1] for component_id in request.component_ids } def callback(session: Session) -> List[Dict]: obj_list: List[ComponentModel] = session.query(ComponentModel)\ .options(selectinload(ComponentModel.device))\ .filter(ComponentModel.component_uuid.in_(component_uuids)).all() return [obj.dump_name() for obj in obj_list] return run_transaction(sessionmaker(bind=db_engine), callback) def compose_components_data(data: Dict[str, any]) -> List[Dict]: filtered_data = [] for item in data: for key, value in item: if any("inventory" in key): filtered_data.append(item) LOGGER.info("Filtered Data:") LOGGER.info(filtered_data) # Return the result return filtered_data '''