Skip to content
Snippets Groups Projects

Resolve "(CTTC) Extend NBI's Debug-API plugin to expose a descriptor-like download endpoint"

3 files
+ 80
8
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from flask.json import jsonify
from flask_restful import Resource, request
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json
from context.client.ContextClient import ContextClient
from service.client.ServiceClient import ServiceClient
from .Tools import (
@@ -34,6 +37,66 @@ class Contexts(_Resource):
def get(self):
return format_grpc_to_json(self.client.ListContexts(Empty()))
class DummyContexts(_Resource):
def get(self):
contexts = grpc_message_to_json(self.client.ListContexts(Empty()), use_integers_for_enums=True)['contexts']
devices = grpc_message_to_json(self.client.ListDevices(Empty()), use_integers_for_enums=True)['devices']
links = grpc_message_to_json(self.client.ListLinks(Empty()), use_integers_for_enums=True)['links']
topologies = list()
slices = list()
services = list()
connections = list()
for context in contexts:
context_uuid = context['context_id']['context_uuid']['uuid']
context_id = grpc_context_id(context_uuid)
topologies.extend(grpc_message_to_json(
self.client.ListTopologies(context_id),
use_integers_for_enums=True
)['topologies'])
slices.extend(grpc_message_to_json(
self.client.ListSlices(context_id),
use_integers_for_enums=True
)['slices'])
context_services = grpc_message_to_json(
self.client.ListServices(context_id),
use_integers_for_enums=True
)['services']
services.extend(context_services)
for service in context_services:
service_uuid = service['service_id']['service_uuid']['uuid']
service_id = grpc_service_id(context_uuid, service_uuid)
connections.extend(grpc_message_to_json(
self.client.ListConnections(service_id),
use_integers_for_enums=True
)['connections'])
for device in devices:
for config_rule in device['device_config']['config_rules']:
if 'custom' not in config_rule: continue
resource_value = config_rule['custom']['resource_value']
if not isinstance(resource_value, str): continue
try:
resource_value = json.loads(resource_value)
except: # pylint: disable=bare-except
pass
config_rule['custom']['resource_value'] = resource_value
dummy_context = {'dummy_mode': True}
if len(contexts ) > 0: dummy_context['contexts' ] = contexts
if len(topologies ) > 0: dummy_context['topologies' ] = topologies
if len(devices ) > 0: dummy_context['devices' ] = devices
if len(links ) > 0: dummy_context['links' ] = links
if len(slices ) > 0: dummy_context['slices' ] = slices
if len(services ) > 0: dummy_context['services' ] = services
if len(connections) > 0: dummy_context['connections'] = connections
return jsonify(dummy_context)
class Context(_Resource):
def get(self, context_uuid : str):
return format_grpc_to_json(self.client.GetContext(grpc_context_id(context_uuid)))
@@ -62,7 +125,7 @@ class Service(_Resource):
def get(self, context_uuid : str, service_uuid : str):
return format_grpc_to_json(self.client.GetService(grpc_service_id(context_uuid, service_uuid)))
def post(self, context_uuid : str, service_uuid : str):
def post(self, context_uuid : str, service_uuid : str): # pylint: disable=unused-argument
service = request.get_json()['services'][0]
return format_grpc_to_json(self.service_client.CreateService(grpc_service(
service_uuid = service['service_id']['service_uuid']['uuid'],
@@ -70,7 +133,7 @@ class Service(_Resource):
context_uuid = service['service_id']['context_id']['context_uuid']['uuid'],
)))
def put(self, context_uuid : str, service_uuid : str):
def put(self, context_uuid : str, service_uuid : str): # pylint: disable=unused-argument
service = request.get_json()['services'][0]
return format_grpc_to_json(self.service_client.UpdateService(grpc_service(
service_uuid = service['service_id']['service_uuid']['uuid'],
Loading