Newer
Older
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from flask import request
from flask.json import jsonify
from flask_restful import Resource
from common.proto.context_pb2 import SliceStatusEnum
from common.tools.context_queries.Slice import get_slice_by_uuid
from context.client.ContextClient import ContextClient
from nbi.service._tools.Authentication import HTTP_AUTH
from nbi.service._tools.HttpStatusCodes import (
HTTP_GATEWAYTIMEOUT, HTTP_NOCONTENT, HTTP_OK, HTTP_SERVERERROR
)
LOGGER = logging.getLogger(__name__)
class L2VPN_Service(Resource):
@HTTP_AUTH.login_required
def get(self, vpn_id : str):
LOGGER.debug('VPN_Id: {:s}'.format(str(vpn_id)))
LOGGER.debug('Request: {:s}'.format(str(request)))
try:
target = get_slice_by_uuid(context_client, vpn_id, rw_copy=True)
if target is None:
raise Exception('VPN({:s}) not found in database'.format(str(vpn_id)))
if target.slice_id.slice_uuid.uuid != vpn_id: # pylint: disable=no-member
raise Exception('Slice retrieval failed. Wrong Slice Id was returned')
slice_ready_status = SliceStatusEnum.SLICESTATUS_ACTIVE
slice_status = target.slice_status.slice_status # pylint: disable=no-member
response = jsonify({})
response.status_code = HTTP_OK if slice_status == slice_ready_status else HTTP_GATEWAYTIMEOUT
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Something went wrong Retrieving VPN({:s})'.format(str(vpn_id)))
response = jsonify({'error': str(e)})
response.status_code = HTTP_SERVERERROR
return response
@HTTP_AUTH.login_required
def delete(self, vpn_id : str):
LOGGER.debug('VPN_Id: {:s}'.format(str(vpn_id)))
LOGGER.debug('Request: {:s}'.format(str(request)))
try:
target = get_slice_by_uuid(context_client, vpn_id)
if target is None:
LOGGER.warning('VPN({:s}) not found in database. Nothing done.'.format(str(vpn_id)))
else:
if target.slice_id.slice_uuid.uuid != vpn_id: # pylint: disable=no-member
raise Exception('Slice retrieval failed. Wrong Slice Id was returned')
slice_client = SliceClient()
slice_client.DeleteSlice(target.slice_id)
response = jsonify({})
response.status_code = HTTP_NOCONTENT
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Something went wrong Deleting VPN({:s})'.format(str(vpn_id)))
response = jsonify({'error': str(e)})
response.status_code = HTTP_SERVERERROR
return response