Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • e177f5226f77cc60de46ed202ebd089ab9a03cf9
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/307-update-python-version-service
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

Link.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Service.py 12.48 KiB
    # Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import datetime, logging
    from sqlalchemy import and_
    from sqlalchemy.dialects.postgresql import insert
    from sqlalchemy.engine import Engine
    from sqlalchemy.orm import Session, selectinload, sessionmaker
    from sqlalchemy_cockroachdb import run_transaction
    from typing import Dict, List, Optional, Set
    from common.proto.context_pb2 import (
        ContextId, Empty, EventTypeEnum, Service, ServiceFilter, ServiceId, ServiceIdList,
        ServiceList, ServiceTypeEnum
    )
    from common.message_broker.MessageBroker import MessageBroker
    from common.method_wrappers.ServiceExceptions import InvalidArgumentException, NotFoundException
    from common.tools.object_factory.Context import json_context_id
    from common.tools.object_factory.Service import json_service_id
    from context.service.database.ConfigRule import compose_config_rules_data, upsert_config_rules
    from context.service.database.Constraint import compose_constraints_data, upsert_constraints
    from .models.enums.ServiceStatus import grpc_to_enum__service_status
    from .models.enums.ServiceType import grpc_to_enum__service_type
    from .models.ServiceModel import ServiceModel, ServiceEndPointModel
    from .uuids.Context import context_get_uuid
    from .uuids.EndPoint import endpoint_get_uuid
    from .uuids.Service import service_get_uuid
    from .Events import notify_event_context, notify_event_service
    
    LOGGER = logging.getLogger(__name__)
    
    def service_list_ids(db_engine : Engine, request : ContextId) -> ServiceIdList:
        context_uuid = context_get_uuid(request, allow_random=False)
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ServiceModel] = session.query(ServiceModel).filter_by(context_uuid=context_uuid).all()
            return [obj.dump_id() for obj in obj_list]
        service_ids = run_transaction(sessionmaker(bind=db_engine), callback)
        return ServiceIdList(service_ids=service_ids)
    
    def service_list_objs(db_engine : Engine, request : ContextId) -> ServiceList:
        context_uuid = context_get_uuid(request, allow_random=False)
        def callback(session : Session) -> List[Dict]:
            obj_list : List[ServiceModel] = session.query(ServiceModel)\
                .options(selectinload(ServiceModel.service_endpoints))\
                .options(selectinload(ServiceModel.constraints))\
                .options(selectinload(ServiceModel.config_rules))\
                .filter_by(context_uuid=context_uuid).all()
            return [obj.dump() for obj in obj_list]
        services = run_transaction(sessionmaker(bind=db_engine), callback)
        return ServiceList(services=services)
    
    def service_get(db_engine : Engine, request : ServiceId) -> Service:
        _,service_uuid = service_get_uuid(request, allow_random=False)
        def callback(session : Session) -> Optional[Dict]:
            obj : Optional[ServiceModel] = session.query(ServiceModel)\
                .options(selectinload(ServiceModel.service_endpoints))\
                .options(selectinload(ServiceModel.constraints))\
                .options(selectinload(ServiceModel.config_rules))\
                .filter_by(service_uuid=service_uuid).one_or_none()
            return None if obj is None else obj.dump()
        obj = run_transaction(sessionmaker(bind=db_engine), callback)
        if obj is None:
            context_uuid = context_get_uuid(request.context_id, allow_random=False)
            raw_service_uuid = '{:s}/{:s}'.format(request.context_id.context_uuid.uuid, request.service_uuid.uuid)
            raise NotFoundException('Service', raw_service_uuid, extra_details=[
                'context_uuid generated was: {:s}'.format(context_uuid),
                'service_uuid generated was: {:s}'.format(service_uuid),
            ])
        return Service(**obj)
    
    def service_set(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId:
        raw_context_uuid = request.service_id.context_id.context_uuid.uuid
        raw_service_uuid = request.service_id.service_uuid.uuid
        raw_service_name = request.name
        service_name = raw_service_uuid if len(raw_service_name) == 0 else raw_service_name
        context_uuid,service_uuid = service_get_uuid(request.service_id, service_name=service_name, allow_random=True)
    
        service_type = grpc_to_enum__service_type(request.service_type)
        if service_type is None and request.service_type == ServiceTypeEnum.SERVICETYPE_OPTICAL_CONNECTIVITY:
            service_type = "OPTICAL_CONNECTIVITY"
    
        service_status = grpc_to_enum__service_status(request.service_status.service_status)
    
        now = datetime.datetime.utcnow()
    
        service_endpoints_data : List[Dict] = list()
        for i,endpoint_id in enumerate(request.service_endpoint_ids):
            endpoint_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
            if len(endpoint_context_uuid) == 0:
                endpoint_context_uuid = context_get_uuid(request.service_id.context_id, allow_random=False)
            else:
                endpoint_context_uuid = context_get_uuid(endpoint_id.topology_id.context_id, allow_random=False)
            if endpoint_context_uuid != context_uuid:
                raise InvalidArgumentException(
                    'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                    endpoint_context_uuid,
                    ['should be == request.service_id.context_id.context_uuid.uuid({:s})'.format(raw_context_uuid)])
    
            _, _, endpoint_uuid = endpoint_get_uuid(endpoint_id, allow_random=False)
            service_endpoints_data.append({
                'service_uuid' : service_uuid,
                'endpoint_uuid': endpoint_uuid,
                'position'     : i,
            })
    
        constraints = compose_constraints_data(request.service_constraints, now, service_uuid=service_uuid)
        config_rules = compose_config_rules_data(request.service_config.config_rules, now, service_uuid=service_uuid)
    
        service_data = [{
            'context_uuid'  : context_uuid,
            'service_uuid'  : service_uuid,
            'service_name'  : service_name,
            'service_type'  : service_type,
            'service_status': service_status,
            'created_at'    : now,
            'updated_at'    : now,
        }]
    
        def callback(session : Session) -> bool:
            stmt = insert(ServiceModel).values(service_data)
            stmt = stmt.on_conflict_do_update(
                index_elements=[ServiceModel.service_uuid],
                set_=dict(
                    service_name   = stmt.excluded.service_name,
                    service_type   = stmt.excluded.service_type,
                    service_status = stmt.excluded.service_status,
                    updated_at     = stmt.excluded.updated_at,
                )
            )
            stmt = stmt.returning(ServiceModel.created_at, ServiceModel.updated_at)
            created_at,updated_at = session.execute(stmt).fetchone()
            updated = updated_at > created_at
    
            # TODO: check if endpoints are changed
            if len(service_endpoints_data) > 0:
                stmt = insert(ServiceEndPointModel).values(service_endpoints_data)
                stmt = stmt.on_conflict_do_nothing(
                    index_elements=[ServiceEndPointModel.service_uuid, ServiceEndPointModel.endpoint_uuid]
                )
                session.execute(stmt)
    
            changed_constraints = upsert_constraints(session, constraints, service_uuid=service_uuid)
            changed_config_rules = upsert_config_rules(session, config_rules, service_uuid=service_uuid)
    
            return updated or changed_constraints or changed_config_rules
    
        updated = run_transaction(sessionmaker(bind=db_engine), callback)
        context_id = json_context_id(context_uuid)
        service_id = json_service_id(service_uuid, context_id=context_id)
        event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
        notify_event_service(messagebroker, event_type, service_id)
        notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
        return ServiceId(**service_id)
    
    def service_unset(db_engine : Engine, messagebroker : MessageBroker, request : Service) -> ServiceId:
        raw_context_uuid = request.service_id.context_id.context_uuid.uuid
        raw_service_uuid = request.service_id.service_uuid.uuid
        raw_service_name = request.name
        service_name = raw_service_uuid if len(raw_service_name) == 0 else raw_service_name
        context_uuid,service_uuid = service_get_uuid(request.service_id, service_name=service_name, allow_random=False)
    
        service_endpoint_uuids : Set[str] = set()
        for i,endpoint_id in enumerate(request.service_endpoint_ids):
            endpoint_context_uuid = endpoint_id.topology_id.context_id.context_uuid.uuid
            if len(endpoint_context_uuid) == 0: endpoint_context_uuid = context_uuid
            if endpoint_context_uuid not in {raw_context_uuid, context_uuid}:
                raise InvalidArgumentException(
                    'request.service_endpoint_ids[{:d}].topology_id.context_id.context_uuid.uuid'.format(i),
                    endpoint_context_uuid,
                    ['should be == request.service_id.context_id.context_uuid.uuid({:s})'.format(raw_context_uuid)])
            service_endpoint_uuids.add(endpoint_get_uuid(endpoint_id, allow_random=False)[2])
    
        now = datetime.datetime.utcnow()
        constraints = compose_constraints_data(request.service_constraints, now, service_uuid=service_uuid)
        config_rules = compose_config_rules_data(request.service_config.config_rules, now, service_uuid=service_uuid)
    
        def callback(session : Session) -> bool:
            num_deletes = 0
            if len(service_endpoint_uuids) > 0:
                num_deletes += session.query(ServiceEndPointModel)\
                    .filter(and_(
                        ServiceEndPointModel.service_uuid == service_uuid,
                        ServiceEndPointModel.endpoint_uuid.in_(service_endpoint_uuids)
                    )).delete()
    
            changed_constraints = upsert_constraints(session, constraints, is_delete=True, service_uuid=service_uuid)
            changed_config_rules = upsert_config_rules(session, config_rules, is_delete=True, service_uuid=service_uuid)
    
            return num_deletes > 0 or changed_constraints or changed_config_rules
    
        updated = run_transaction(sessionmaker(bind=db_engine), callback)
        service_id = json_service_id(service_uuid, json_context_id(context_uuid))
        if updated:
            notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, service_id)
        return ServiceId(**service_id)
    
    def service_delete(db_engine : Engine, messagebroker : MessageBroker, request : ServiceId) -> Empty:
        context_uuid,service_uuid = service_get_uuid(request, allow_random=False)
        def callback(session : Session) -> bool:
            num_deleted = session.query(ServiceModel).filter_by(service_uuid=service_uuid).delete()
            return num_deleted > 0
        deleted = run_transaction(sessionmaker(bind=db_engine), callback)
        context_id = json_context_id(context_uuid)
        service_id = json_service_id(service_uuid, context_id=context_id)
        if deleted:
            notify_event_service(messagebroker, EventTypeEnum.EVENTTYPE_REMOVE, service_id)
            notify_event_context(messagebroker, EventTypeEnum.EVENTTYPE_UPDATE, context_id)
        return Empty()
    
    def service_select(db_engine : Engine, request : ServiceFilter) -> ServiceList:
        service_uuids = [
            service_get_uuid(service_id, allow_random=False)[1]
            for service_id in request.service_ids.service_ids
        ]
        dump_params = dict(
            include_endpoint_ids=request.include_endpoint_ids,
            include_constraints =request.include_constraints,
            include_config_rules=request.include_config_rules,
        )
        def callback(session : Session) -> List[Dict]:
            query = session.query(ServiceModel)
            if request.include_endpoint_ids: query = query.options(selectinload(ServiceModel.service_endpoints))
            if request.include_constraints : query = query.options(selectinload(ServiceModel.constraints))
            if request.include_config_rules: query = query.options(selectinload(ServiceModel.config_rules))
            obj_list : List[ServiceModel] = query.filter(ServiceModel.service_uuid.in_(service_uuids)).all()
            return [obj.dump(**dump_params) for obj in obj_list]
        services = run_transaction(sessionmaker(bind=db_engine), callback)
        return ServiceList(services=services)