diff --git a/src/compute/Dockerfile b/src/compute/Dockerfile index 6d3cafda9f02cb5e90947eb8b235182644371be2..bb10332d155101f708019cb6b8b99c7c6bef45a6 100644 --- a/src/compute/Dockerfile +++ b/src/compute/Dockerfile @@ -46,6 +46,7 @@ COPY common/. common COPY compute/. compute COPY context/. context COPY service/. service +COPY slice/. slice # Start compute service ENTRYPOINT ["python", "-m", "compute.service"] diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/Constants.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/Constants.py index 9420517e1b253e6f9169ccabafe5f441662c9b04..b7f377254f3bec36ce407ee2e25f63adba22baa7 100644 --- a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/Constants.py +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/Constants.py @@ -25,4 +25,30 @@ BEARER_MAPPINGS = { 'R2-EMU:13/2/1': ('R2-EMU', '13/2/1', '12.12.12.1', '65000:120', 450, '3.4.2.1', 24), 'R3-INF:13/2/1': ('R3-INF', '13/2/1', '20.20.20.1', '65000:200', 500, '3.3.1.1', 24), 'R4-EMU:13/2/1': ('R4-EMU', '13/2/1', '22.22.22.1', '65000:220', 550, '3.4.1.1', 24), + + 'R1@D1:3/1': ('R1@D1', '3/1', '10.0.1.1', '65001:101', 100, '1.1.3.1', 24), + 'R1@D1:3/2': ('R1@D1', '3/2', '10.0.1.1', '65001:101', 100, '1.1.3.2', 24), + 'R1@D1:3/3': ('R1@D1', '3/3', '10.0.1.1', '65001:101', 100, '1.1.3.3', 24), + 'R2@D1:3/1': ('R2@D1', '3/1', '10.0.1.2', '65001:102', 100, '1.2.3.1', 24), + 'R2@D1:3/2': ('R2@D1', '3/2', '10.0.1.2', '65001:102', 100, '1.2.3.2', 24), + 'R2@D1:3/3': ('R2@D1', '3/3', '10.0.1.2', '65001:102', 100, '1.2.3.3', 24), + 'R3@D1:3/1': ('R3@D1', '3/1', '10.0.1.3', '65001:103', 100, '1.3.3.1', 24), + 'R3@D1:3/2': ('R3@D1', '3/2', '10.0.1.3', '65001:103', 100, '1.3.3.2', 24), + 'R3@D1:3/3': ('R3@D1', '3/3', '10.0.1.3', '65001:103', 100, '1.3.3.3', 24), + 'R4@D1:3/1': ('R4@D1', '3/1', '10.0.1.4', '65001:104', 100, '1.4.3.1', 24), + 'R4@D1:3/2': ('R4@D1', '3/2', '10.0.1.4', '65001:104', 100, '1.4.3.2', 24), + 'R4@D1:3/3': ('R4@D1', '3/3', '10.0.1.4', '65001:104', 100, '1.4.3.3', 24), + + 'R1@D2:3/1': ('R1@D2', '3/1', '10.0.2.1', '65002:101', 100, '2.1.3.1', 24), + 'R1@D2:3/2': ('R1@D2', '3/2', '10.0.2.1', '65002:101', 100, '2.1.3.2', 24), + 'R1@D2:3/3': ('R1@D2', '3/3', '10.0.2.1', '65002:101', 100, '2.1.3.3', 24), + 'R2@D2:3/1': ('R2@D2', '3/1', '10.0.2.2', '65002:102', 100, '2.2.3.1', 24), + 'R2@D2:3/2': ('R2@D2', '3/2', '10.0.2.2', '65002:102', 100, '2.2.3.2', 24), + 'R2@D2:3/3': ('R2@D2', '3/3', '10.0.2.2', '65002:102', 100, '2.2.3.3', 24), + 'R3@D2:3/1': ('R3@D2', '3/1', '10.0.2.3', '65002:103', 100, '2.3.3.1', 24), + 'R3@D2:3/2': ('R3@D2', '3/2', '10.0.2.3', '65002:103', 100, '2.3.3.2', 24), + 'R3@D2:3/3': ('R3@D2', '3/3', '10.0.2.3', '65002:103', 100, '2.3.3.3', 24), + 'R4@D2:3/1': ('R4@D2', '3/1', '10.0.2.4', '65002:104', 100, '2.4.3.1', 24), + 'R4@D2:3/2': ('R4@D2', '3/2', '10.0.2.4', '65002:104', 100, '2.4.3.2', 24), + 'R4@D2:3/3': ('R4@D2', '3/3', '10.0.2.4', '65002:104', 100, '2.4.3.3', 24), } diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Service.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Service.py index 6a91e6ae1b71bca5c7c26464d0bdb4f187f0381a..440b706ab06e47ca249187ecee3cbe5197a331c3 100644 --- a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Service.py +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Service.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ctypes import Union import logging from flask import request from flask.json import jsonify @@ -19,10 +20,11 @@ from flask_restful import Resource from common.Constants import DEFAULT_CONTEXT_UUID from common.Settings import get_setting from context.client.ContextClient import ContextClient -from context.proto.context_pb2 import ServiceId +from context.proto.context_pb2 import Service, ServiceId, Slice, SliceStatusEnum from service.client.ServiceClient import ServiceClient from service.proto.context_pb2 import ServiceStatusEnum from .tools.Authentication import HTTP_AUTH +from .tools.ContextMethods import get_service, get_slice from .tools.HttpStatusCodes import HTTP_GATEWAYTIMEOUT, HTTP_NOCONTENT, HTTP_OK, HTTP_SERVERERROR LOGGER = logging.getLogger(__name__) @@ -40,21 +42,30 @@ class L2VPN_Service(Resource): LOGGER.debug('VPN_Id: {:s}'.format(str(vpn_id))) LOGGER.debug('Request: {:s}'.format(str(request))) - # pylint: disable=no-member - service_id_request = ServiceId() - service_id_request.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID - service_id_request.service_uuid.uuid = vpn_id + response = jsonify({}) try: - service_reply = self.context_client.GetService(service_id_request) - if service_reply.service_id != service_id_request: # pylint: disable=no-member - raise Exception('Service retrieval failed. Wrong Service Id was returned') - service_ready_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE - service_status = service_reply.service_status.service_status - response = jsonify({}) - response.status_code = HTTP_OK if service_status == service_ready_status else HTTP_GATEWAYTIMEOUT + target = get_service(self.context_client, vpn_id) + if target is not None: + if target.service_id.service_uuid.uuid != vpn_id: # pylint: disable=no-member + raise Exception('Service retrieval failed. Wrong Service Id was returned') + service_ready_status = ServiceStatusEnum.SERVICESTATUS_ACTIVE + service_status = target.service_status.service_status + response.status_code = HTTP_OK if service_status == service_ready_status else HTTP_GATEWAYTIMEOUT + return response + + target = get_slice(self.context_client, vpn_id) + if target is None: + if target.slice_id.slice_uuid.uuid != vpn_id: # pylint: disable=no-member + raise Exception('Slice retrieval failed. Wrong Slice Id was returned') + slice_ready_status = SliceStatusEnum.SLICESTATUS_ACTIVE + slice_status = target.slice_status.slice_status + response.status_code = HTTP_OK if slice_status == slice_ready_status else HTTP_GATEWAYTIMEOUT + return response + + raise Exception('VPN({:s}) not found in database'.format(str(vpn_id))) except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Something went wrong Retrieving Service {:s}'.format(str(request))) + LOGGER.exception('Something went wrong Retrieving VPN({:s})'.format(str(request))) response = jsonify({'error': str(e)}) response.status_code = HTTP_SERVERERROR return response diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Services.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Services.py index 191166a74e53b5c9ae58d3c8cbb3f0e0b1ca8480..6d39cfe2db2ccba5ca75fe6861599b8ef178a038 100644 --- a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Services.py +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_Services.py @@ -22,6 +22,8 @@ from common.Constants import DEFAULT_CONTEXT_UUID from common.Settings import get_setting from service.client.ServiceClient import ServiceClient from service.proto.context_pb2 import Service, ServiceStatusEnum, ServiceTypeEnum +from slice.client.SliceClient import SliceClient +from slice.proto.context_pb2 import SliceStatusEnum, Slice from .schemas.vpn_service import SCHEMA_VPN_SERVICE from .tools.Authentication import HTTP_AUTH from .tools.HttpStatusCodes import HTTP_CREATED, HTTP_SERVERERROR @@ -34,6 +36,8 @@ class L2VPN_Services(Resource): super().__init__() self.service_client = ServiceClient( get_setting('SERVICESERVICE_SERVICE_HOST'), get_setting('SERVICESERVICE_SERVICE_PORT_GRPC')) + self.slice_client = SliceClient( + get_setting('SLICESERVICE_SERVICE_HOST'), get_setting('SLICESERVICE_SERVICE_PORT_GRPC')) @HTTP_AUTH.login_required def get(self): @@ -48,17 +52,29 @@ class L2VPN_Services(Resource): vpn_services : List[Dict] = request_data['ietf-l2vpn-svc:vpn-service'] for vpn_service in vpn_services: - # pylint: disable=no-member - service_request = Service() - service_request.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID - service_request.service_id.service_uuid.uuid = vpn_service['vpn-id'] - service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM - service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED - try: - service_reply = self.service_client.CreateService(service_request) - if service_reply != service_request.service_id: # pylint: disable=no-member - raise Exception('Service creation failed. Wrong Service Id was returned') + vpn_service_type = vpn_service['vpn-svc-type'] + if vpn_service_type == 'vpws': + # pylint: disable=no-member + service_request = Service() + service_request.service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID + service_request.service_id.service_uuid.uuid = vpn_service['vpn-id'] + service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM + service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + + service_reply = self.service_client.CreateService(service_request) + if service_reply != service_request.service_id: # pylint: disable=no-member + raise Exception('Service creation failed. Wrong Service Id was returned') + elif vpn_service_type == 'vpls': + # pylint: disable=no-member + slice_request = Slice() + slice_request.slice_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID + slice_request.slice_id.slice_uuid.uuid = vpn_service['vpn-id'] + slice_request.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_PLANNED + + slice_reply = self.slice_client.CreateSlice(slice_request) + if slice_reply != slice_request.slice_id: # pylint: disable=no-member + raise Exception('Slice creation failed. Wrong Slice Id was returned') response = jsonify({}) response.status_code = HTTP_CREATED diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_SiteNetworkAccesses.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_SiteNetworkAccesses.py index 6811dadac8bbc744bc1630adcfb88750765b11b8..2c0245b9ae790dd91509f51d821530c589fedf9c 100644 --- a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_SiteNetworkAccesses.py +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/L2VPN_SiteNetworkAccesses.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ctypes import Union import json, logging from typing import Dict from flask import request @@ -19,14 +20,15 @@ from flask.json import jsonify from flask.wrappers import Response from flask_restful import Resource from werkzeug.exceptions import UnsupportedMediaType -from common.Constants import DEFAULT_CONTEXT_UUID from common.Settings import get_setting from common.tools.grpc.Tools import grpc_message_to_json_string from context.client.ContextClient import ContextClient -from context.proto.context_pb2 import ConfigActionEnum, Service, ServiceId +from context.proto.context_pb2 import ConfigActionEnum, Service, Slice from service.client.ServiceClient import ServiceClient +from slice.client.SliceClient import SliceClient from .schemas.site_network_access import SCHEMA_SITE_NETWORK_ACCESS from .tools.Authentication import HTTP_AUTH +from .tools.ContextMethods import get_service, get_slice from .tools.HttpStatusCodes import HTTP_NOCONTENT, HTTP_SERVERERROR from .tools.Validator import validate_message from .Constants import BEARER_MAPPINGS, DEFAULT_ADDRESS_FAMILIES, DEFAULT_BGP_AS, DEFAULT_BGP_ROUTE_TARGET, DEFAULT_MTU @@ -44,26 +46,27 @@ def process_site_network_access(context_client : ContextClient, site_network_acc raise Exception(msg.format(str(bearer_reference))) device_uuid,endpoint_uuid,router_id,route_distinguisher,sub_if_index,address_ip,address_prefix = mapping - # pylint: disable=no-member - service_id = ServiceId() - service_id.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID - service_id.service_uuid.uuid = vpn_id + target : Union[Service, Slice, None] = None + if target is None: target = get_service(context_client, vpn_id) + if target is None: target = get_slice (context_client, vpn_id) + if target is None: raise Exception('VPN({:s}) not found in database'.format(str(vpn_id))) - service_readonly = context_client.GetService(service_id) - service = Service() - service.CopyFrom(service_readonly) + # pylint: disable=no-member + endpoint_ids = target.service_endpoint_ids if isinstance(target, Service) else target.slice_endpoint_ids - for endpoint_id in service.service_endpoint_ids: # pylint: disable=no-member + for endpoint_id in endpoint_ids: if endpoint_id.device_id.device_uuid.uuid != device_uuid: continue if endpoint_id.endpoint_uuid.uuid != endpoint_uuid: continue break # found, do nothing else: # not found, add it - endpoint_id = service.service_endpoint_ids.add() # pylint: disable=no-member + endpoint_id = endpoint_ids.add() endpoint_id.device_id.device_uuid.uuid = device_uuid endpoint_id.endpoint_uuid.uuid = endpoint_uuid - for config_rule in service.service_config.config_rules: # pylint: disable=no-member + if isinstance(target, Slice): return target + + for config_rule in target.service_config.config_rules: # pylint: disable=no-member if config_rule.resource_key != '/settings': continue json_settings = json.loads(config_rule.resource_value) @@ -95,7 +98,7 @@ def process_site_network_access(context_client : ContextClient, site_network_acc break else: # not found, add it - config_rule = service.service_config.config_rules.add() # pylint: disable=no-member + config_rule = target.service_config.config_rules.add() # pylint: disable=no-member config_rule.action = ConfigActionEnum.CONFIGACTION_SET config_rule.resource_key = '/settings' config_rule.resource_value = json.dumps({ @@ -106,7 +109,7 @@ def process_site_network_access(context_client : ContextClient, site_network_acc }, sort_keys=True) endpoint_settings_key = '/device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid) - for config_rule in service.service_config.config_rules: # pylint: disable=no-member + for config_rule in target.service_config.config_rules: # pylint: disable=no-member if config_rule.resource_key != endpoint_settings_key: continue json_settings = json.loads(config_rule.resource_value) @@ -154,7 +157,7 @@ def process_site_network_access(context_client : ContextClient, site_network_acc break else: # not found, add it - config_rule = service.service_config.config_rules.add() # pylint: disable=no-member + config_rule = target.service_config.config_rules.add() # pylint: disable=no-member config_rule.action = ConfigActionEnum.CONFIGACTION_SET config_rule.resource_key = endpoint_settings_key config_rule.resource_value = json.dumps({ @@ -166,24 +169,34 @@ def process_site_network_access(context_client : ContextClient, site_network_acc 'address_prefix': address_prefix, }, sort_keys=True) - return service + return target def process_list_site_network_access( - context_client : ContextClient, service_client : ServiceClient, request_data : Dict) -> Response: + context_client : ContextClient, service_client : ServiceClient, slice_client : SliceClient, + request_data : Dict + ) -> Response: LOGGER.debug('Request: {:s}'.format(str(request_data))) validate_message(SCHEMA_SITE_NETWORK_ACCESS, request_data) errors = [] for site_network_access in request_data['ietf-l2vpn-svc:site-network-access']: + sna_request = process_site_network_access(context_client, site_network_access) + LOGGER.debug('sna_request = {:s}'.format(grpc_message_to_json_string(sna_request))) try: - service_request = process_site_network_access(context_client, site_network_access) - LOGGER.debug('service_request = {:s}'.format(grpc_message_to_json_string(service_request))) - service_reply = service_client.UpdateService(service_request) - if service_reply != service_request.service_id: # pylint: disable=no-member - raise Exception('Service update failed. Wrong Service Id was returned') + if isinstance(sna_request, Service): + sna_reply = service_client.UpdateService(sna_request) + if sna_reply != sna_request.service_id: # pylint: disable=no-member + raise Exception('Service update failed. Wrong Service Id was returned') + elif isinstance(sna_request, Slice): + sna_reply = slice_client.UpdateSlice(sna_request) + if sna_reply != sna_request.slice_id: # pylint: disable=no-member + raise Exception('Slice update failed. Wrong Slice Id was returned') + else: + raise NotImplementedError('Support for Class({:s}) not implemented'.format(str(type(sna_request)))) except Exception as e: # pylint: disable=broad-except - LOGGER.exception('Something went wrong Updating Service {:s}'.format(str(request))) + msg = 'Something went wrong Updating Service {:s}' + LOGGER.exception(msg.format(grpc_message_to_json_string(sna_request))) errors.append({'error': str(e)}) response = jsonify(errors) @@ -197,15 +210,19 @@ class L2VPN_SiteNetworkAccesses(Resource): get_setting('CONTEXTSERVICE_SERVICE_HOST'), get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC')) self.service_client = ServiceClient( get_setting('SERVICESERVICE_SERVICE_HOST'), get_setting('SERVICESERVICE_SERVICE_PORT_GRPC')) + self.slice_client = SliceClient( + get_setting('SLICESERVICE_SERVICE_HOST'), get_setting('SLICESERVICE_SERVICE_PORT_GRPC')) @HTTP_AUTH.login_required def post(self, site_id : str): if not request.is_json: raise UnsupportedMediaType('JSON payload is required') LOGGER.debug('Site_Id: {:s}'.format(str(site_id))) - return process_list_site_network_access(self.context_client, self.service_client, request.json) + return process_list_site_network_access( + self.context_client, self.service_client, self.slice_client, request.json) @HTTP_AUTH.login_required def put(self, site_id : str): if not request.is_json: raise UnsupportedMediaType('JSON payload is required') LOGGER.debug('Site_Id: {:s}'.format(str(site_id))) - return process_list_site_network_access(self.context_client, self.service_client, request.json) + return process_list_site_network_access( + self.context_client, self.service_client, self.slice_client, request.json) diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/schemas/vpn_service.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/schemas/vpn_service.py index b224b40737914fbf0c7ae87f08d62dc34836f2aa..9dd8eea3d711a2d5ea22ac8e0fc3e4d7140493b1 100644 --- a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/schemas/vpn_service.py +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/schemas/vpn_service.py @@ -36,7 +36,7 @@ SCHEMA_VPN_SERVICE = { 'required': ['vpn-id', 'vpn-svc-type', 'svc-topo', 'customer-name'], 'properties': { 'vpn-id': {'type': 'string', 'pattern': REGEX_UUID}, - 'vpn-svc-type': {'enum': ['vpws']}, + 'vpn-svc-type': {'enum': ['vpws', 'vpls']}, 'svc-topo': {'enum': ['any-to-any']}, 'customer-name': {'const': 'osm'}, }, diff --git a/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/tools/ContextMethods.py b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/tools/ContextMethods.py new file mode 100644 index 0000000000000000000000000000000000000000..79e73a28d7bd68a2e0a7e7636c0c45ca5c4a9043 --- /dev/null +++ b/src/compute/service/rest_server/nbi_plugins/ietf_l2vpn/tools/ContextMethods.py @@ -0,0 +1,39 @@ +import grpc, logging +from typing import Optional +from common.Constants import DEFAULT_CONTEXT_UUID +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import Service, ServiceId, Slice, SliceId + +LOGGER = logging.getLogger(__name__) + +def get_service( + context_client : ContextClient, service_uuid : str, context_uuid : str = DEFAULT_CONTEXT_UUID + ) -> Optional[Service]: + try: + # pylint: disable=no-member + service_id = ServiceId() + service_id.context_id.context_uuid.uuid = context_uuid + service_id.service_uuid.uuid = service_uuid + service_readonly = context_client.GetService(service_id) + service = Service() + service.CopyFrom(service_readonly) + return service + except grpc.RpcError: + #LOGGER.exception('Unable to get service({:s} / {:s})'.format(str(context_uuid), str(service_uuid))) + return None + +def get_slice( + context_client : ContextClient, slice_uuid : str, context_uuid : str = DEFAULT_CONTEXT_UUID + ) -> Optional[Slice]: + try: + # pylint: disable=no-member + slice_id = SliceId() + slice_id.context_id.context_uuid.uuid = context_uuid + slice_id.slice_uuid.uuid = slice_uuid + slice_readonly = context_client.GetSlice(slice_id) + slice_ = Slice() + slice_.CopyFrom(slice_readonly) + return slice_ + except grpc.RpcError: + #LOGGER.exception('Unable to get slice({:s} / {:s})'.format(str(context_uuid), str(slice_uuid))) + return None diff --git a/src/compute/tests/mock_osm/WimconnectorIETFL2VPN.py b/src/compute/tests/mock_osm/WimconnectorIETFL2VPN.py index d5ce65a1e4fcc5c47f90fb35e21023b28cb3c8ee..b9639e8046593c1dbf4017cff963ceb7c51d0532 100644 --- a/src/compute/tests/mock_osm/WimconnectorIETFL2VPN.py +++ b/src/compute/tests/mock_osm/WimconnectorIETFL2VPN.py @@ -164,178 +164,183 @@ class WimconnectorIETFL2VPN(SdnConnectorBase): Raises: SdnConnectorException: In case of error. """ - if service_type == "ELINE": - if len(connection_points) > 2: - raise SdnConnectorError( - "Connections between more than 2 endpoints are not supported" + SETTINGS = { # min_endpoints, max_endpoints, vpn_service_type + 'ELINE': (2, 2, 'vpws'), # Virtual Private Wire Service + 'ELAN' : (2, None, 'vpls'), # Virtual Private LAN Service + } + settings = SETTINGS.get(service_type) + if settings is None: raise NotImplementedError('Unsupported service_type({:s})'.format(str(service_type))) + min_endpoints, max_endpoints, vpn_service_type = settings + + if max_endpoints is not None and len(connection_points) > max_endpoints: + msg = "Connections between more than {:d} endpoints are not supported for service_type {:s}" + raise SdnConnectorError(msg.format(max_endpoints, service_type)) + + if min_endpoints is not None and len(connection_points) < min_endpoints: + msg = "Connections must be of at least {:d} endpoints for service_type {:s}" + raise SdnConnectorError(msg.format(min_endpoints, service_type)) + + """First step, create the vpn service""" + uuid_l2vpn = str(uuid.uuid4()) + vpn_service = {} + vpn_service["vpn-id"] = uuid_l2vpn + vpn_service["vpn-svc-type"] = vpn_service_type + vpn_service["svc-topo"] = "any-to-any" + vpn_service["customer-name"] = "osm" + vpn_service_list = [] + vpn_service_list.append(vpn_service) + vpn_service_l = {"ietf-l2vpn-svc:vpn-service": vpn_service_list} + response_service_creation = None + conn_info = [] + self.logger.info("Sending vpn-service :{}".format(vpn_service_l)) + + try: + endpoint_service_creation = ( + "{}/restconf/data/ietf-l2vpn-svc:l2vpn-svc/vpn-services".format( + self.wim["wim_url"] ) + ) + response_service_creation = requests.post( + endpoint_service_creation, + headers=self.headers, + json=vpn_service_l, + auth=self.auth, + ) + except requests.exceptions.ConnectionError: + raise SdnConnectorError( + "Request to create service Timeout", http_code=408 + ) + + if response_service_creation.status_code == 409: + raise SdnConnectorError( + "Service already exists", + http_code=response_service_creation.status_code, + ) + elif response_service_creation.status_code != requests.codes.created: + raise SdnConnectorError( + "Request to create service not accepted", + http_code=response_service_creation.status_code, + ) + + """Second step, create the connections and vpn attachments""" + for connection_point in connection_points: + connection_point_wan_info = self.search_mapp(connection_point) + site_network_access = {} + connection = {} + + if connection_point["service_endpoint_encapsulation_type"] != "none": + if ( + connection_point["service_endpoint_encapsulation_type"] + == "dot1q" + ): + """The connection is a VLAN""" + connection["encapsulation-type"] = "dot1q-vlan-tagged" + tagged = {} + tagged_interf = {} + service_endpoint_encapsulation_info = connection_point[ + "service_endpoint_encapsulation_info" + ] + + if service_endpoint_encapsulation_info["vlan"] is None: + raise SdnConnectorError("VLAN must be provided") - if len(connection_points) < 2: - raise SdnConnectorError("Connections must be of at least 2 endpoints") - - """First step, create the vpn service""" - uuid_l2vpn = str(uuid.uuid4()) - vpn_service = {} - vpn_service["vpn-id"] = uuid_l2vpn - vpn_service["vpn-svc-type"] = "vpws" # Rename "vpn-scv-type" -> "vpn-svc-type" - vpn_service["svc-topo"] = "any-to-any" - vpn_service["customer-name"] = "osm" - vpn_service_list = [] - vpn_service_list.append(vpn_service) - vpn_service_l = {"ietf-l2vpn-svc:vpn-service": vpn_service_list} - response_service_creation = None - conn_info = [] - self.logger.info("Sending vpn-service :{}".format(vpn_service_l)) + tagged_interf["cvlan-id"] = service_endpoint_encapsulation_info[ + "vlan" + ] + tagged["dot1q-vlan-tagged"] = tagged_interf + connection["tagged-interface"] = tagged + else: + raise NotImplementedError("Encapsulation type not implemented") + + site_network_access["connection"] = connection + self.logger.info("Sending connection:{}".format(connection)) + vpn_attach = {} + vpn_attach["vpn-id"] = uuid_l2vpn + vpn_attach["site-role"] = vpn_service["svc-topo"] + "-role" + site_network_access["vpn-attachment"] = vpn_attach + self.logger.info("Sending vpn-attachement :{}".format(vpn_attach)) + uuid_sna = str(uuid.uuid4()) + site_network_access["network-access-id"] = uuid_sna + site_network_access["bearer"] = connection_point_wan_info[ + "service_mapping_info" + ]["bearer"] + site_network_accesses = {} + site_network_access_list = [] + site_network_access_list.append(site_network_access) + site_network_accesses[ + "ietf-l2vpn-svc:site-network-access" + ] = site_network_access_list + conn_info_d = {} + conn_info_d["site"] = connection_point_wan_info["service_mapping_info"][ + "site-id" + ] + conn_info_d["site-network-access-id"] = site_network_access[ + "network-access-id" + ] + conn_info_d["mapping"] = None + conn_info.append(conn_info_d) try: - endpoint_service_creation = ( - "{}/restconf/data/ietf-l2vpn-svc:l2vpn-svc/vpn-services".format( - self.wim["wim_url"] + endpoint_site_network_access_creation = ( + "{}/restconf/data/ietf-l2vpn-svc:l2vpn-svc/" + "sites/site={}/site-network-accesses/".format( + self.wim["wim_url"], + connection_point_wan_info["service_mapping_info"][ + "site-id" + ], ) ) - response_service_creation = requests.post( - endpoint_service_creation, + response_endpoint_site_network_access_creation = requests.post( + endpoint_site_network_access_creation, headers=self.headers, - json=vpn_service_l, + json=site_network_accesses, auth=self.auth, ) - except requests.exceptions.ConnectionError: - raise SdnConnectorError( - "Request to create service Timeout", http_code=408 - ) - if response_service_creation.status_code == 409: - raise SdnConnectorError( - "Service already exists", - http_code=response_service_creation.status_code, - ) - elif response_service_creation.status_code != requests.codes.created: - raise SdnConnectorError( - "Request to create service not accepted", - http_code=response_service_creation.status_code, - ) + if ( + response_endpoint_site_network_access_creation.status_code + == 409 + ): + self.delete_connectivity_service(vpn_service["vpn-id"]) + + raise SdnConnectorError( + "Site_Network_Access with ID '{}' already exists".format( + site_network_access["network-access-id"] + ), + http_code=response_endpoint_site_network_access_creation.status_code, + ) + elif ( + response_endpoint_site_network_access_creation.status_code + == 400 + ): + self.delete_connectivity_service(vpn_service["vpn-id"]) - """Second step, create the connections and vpn attachments""" - for connection_point in connection_points: - connection_point_wan_info = self.search_mapp(connection_point) - site_network_access = {} - connection = {} - - if connection_point["service_endpoint_encapsulation_type"] != "none": - if ( - connection_point["service_endpoint_encapsulation_type"] - == "dot1q" - ): - """The connection is a VLAN""" - connection["encapsulation-type"] = "dot1q-vlan-tagged" - tagged = {} - tagged_interf = {} - service_endpoint_encapsulation_info = connection_point[ - "service_endpoint_encapsulation_info" - ] - - if service_endpoint_encapsulation_info["vlan"] is None: - raise SdnConnectorError("VLAN must be provided") - - tagged_interf["cvlan-id"] = service_endpoint_encapsulation_info[ - "vlan" - ] - tagged["dot1q-vlan-tagged"] = tagged_interf - connection["tagged-interface"] = tagged - else: - raise NotImplementedError("Encapsulation type not implemented") - - site_network_access["connection"] = connection - self.logger.info("Sending connection:{}".format(connection)) - vpn_attach = {} - vpn_attach["vpn-id"] = uuid_l2vpn - vpn_attach["site-role"] = vpn_service["svc-topo"] + "-role" - site_network_access["vpn-attachment"] = vpn_attach - self.logger.info("Sending vpn-attachement :{}".format(vpn_attach)) - uuid_sna = str(uuid.uuid4()) - site_network_access["network-access-id"] = uuid_sna - site_network_access["bearer"] = connection_point_wan_info[ - "service_mapping_info" - ]["bearer"] - site_network_accesses = {} - site_network_access_list = [] - site_network_access_list.append(site_network_access) - site_network_accesses[ - "ietf-l2vpn-svc:site-network-access" - ] = site_network_access_list - conn_info_d = {} - conn_info_d["site"] = connection_point_wan_info["service_mapping_info"][ - "site-id" - ] - conn_info_d["site-network-access-id"] = site_network_access[ - "network-access-id" - ] - conn_info_d["mapping"] = None - conn_info.append(conn_info_d) - - try: - endpoint_site_network_access_creation = ( - "{}/restconf/data/ietf-l2vpn-svc:l2vpn-svc/" - "sites/site={}/site-network-accesses/".format( - self.wim["wim_url"], + raise SdnConnectorError( + "Site {} does not exist".format( connection_point_wan_info["service_mapping_info"][ "site-id" - ], - ) - ) - response_endpoint_site_network_access_creation = requests.post( - endpoint_site_network_access_creation, - headers=self.headers, - json=site_network_accesses, - auth=self.auth, + ] + ), + http_code=response_endpoint_site_network_access_creation.status_code, ) - - if ( - response_endpoint_site_network_access_creation.status_code - == 409 - ): - self.delete_connectivity_service(vpn_service["vpn-id"]) - - raise SdnConnectorError( - "Site_Network_Access with ID '{}' already exists".format( - site_network_access["network-access-id"] - ), - http_code=response_endpoint_site_network_access_creation.status_code, - ) - elif ( - response_endpoint_site_network_access_creation.status_code - == 400 - ): - self.delete_connectivity_service(vpn_service["vpn-id"]) - - raise SdnConnectorError( - "Site {} does not exist".format( - connection_point_wan_info["service_mapping_info"][ - "site-id" - ] - ), - http_code=response_endpoint_site_network_access_creation.status_code, - ) - elif ( - response_endpoint_site_network_access_creation.status_code - != requests.codes.created - and response_endpoint_site_network_access_creation.status_code - != requests.codes.no_content - ): - self.delete_connectivity_service(vpn_service["vpn-id"]) - - raise SdnConnectorError( - "Request no accepted", - http_code=response_endpoint_site_network_access_creation.status_code, - ) - except requests.exceptions.ConnectionError: + elif ( + response_endpoint_site_network_access_creation.status_code + != requests.codes.created + and response_endpoint_site_network_access_creation.status_code + != requests.codes.no_content + ): self.delete_connectivity_service(vpn_service["vpn-id"]) - raise SdnConnectorError("Request Timeout", http_code=408) + raise SdnConnectorError( + "Request no accepted", + http_code=response_endpoint_site_network_access_creation.status_code, + ) + except requests.exceptions.ConnectionError: + self.delete_connectivity_service(vpn_service["vpn-id"]) + + raise SdnConnectorError("Request Timeout", http_code=408) - return uuid_l2vpn, conn_info - else: - raise NotImplementedError + return uuid_l2vpn, conn_info def delete_connectivity_service(self, service_uuid, conn_info=None): """Disconnect multi-site endpoints previously connected diff --git a/src/context/client/ContextClient.py b/src/context/client/ContextClient.py index bf58ea45db4ced7496657410f0c941584e4611e0..3206e4a366ef2f6cb7d3aeb366b287572b8d49da 100644 --- a/src/context/client/ContextClient.py +++ b/src/context/client/ContextClient.py @@ -15,11 +15,12 @@ from typing import Iterator import grpc, logging from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string from context.proto.context_pb2 import ( Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList, Context, ContextEvent, ContextId, ContextIdList, ContextList, Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList, Empty, Link, LinkEvent, - LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Topology, - TopologyEvent, TopologyId, TopologyIdList, TopologyList) + LinkId, LinkIdList, LinkList, Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList, Slice, SliceEvent, + SliceId, SliceIdList, SliceList, Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList) from context.proto.context_pb2_grpc import ContextServiceStub LOGGER = logging.getLogger(__name__) @@ -47,252 +48,294 @@ class ContextClient: @RETRY_DECORATOR def ListContextIds(self, request: Empty) -> ContextIdList: - LOGGER.debug('ListContextIds request: {:s}'.format(str(request))) + LOGGER.debug('ListContextIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListContextIds(request) - LOGGER.debug('ListContextIds result: {:s}'.format(str(response))) + LOGGER.debug('ListContextIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListContexts(self, request: Empty) -> ContextList: - LOGGER.debug('ListContexts request: {:s}'.format(str(request))) + LOGGER.debug('ListContexts request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListContexts(request) - LOGGER.debug('ListContexts result: {:s}'.format(str(response))) + LOGGER.debug('ListContexts result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetContext(self, request: ContextId) -> Context: - LOGGER.debug('GetContext request: {:s}'.format(str(request))) + LOGGER.debug('GetContext request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetContext(request) - LOGGER.debug('GetContext result: {:s}'.format(str(response))) + LOGGER.debug('GetContext result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetContext(self, request: Context) -> ContextId: - LOGGER.debug('SetContext request: {:s}'.format(str(request))) + LOGGER.debug('SetContext request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetContext(request) - LOGGER.debug('SetContext result: {:s}'.format(str(response))) + LOGGER.debug('SetContext result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveContext(self, request: ContextId) -> Empty: - LOGGER.debug('RemoveContext request: {:s}'.format(str(request))) + LOGGER.debug('RemoveContext request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveContext(request) - LOGGER.debug('RemoveContext result: {:s}'.format(str(response))) + LOGGER.debug('RemoveContext result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetContextEvents(self, request: Empty) -> Iterator[ContextEvent]: - LOGGER.debug('GetContextEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetContextEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetContextEvents(request) - LOGGER.debug('GetContextEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetContextEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListTopologyIds(self, request: ContextId) -> TopologyIdList: - LOGGER.debug('ListTopologyIds request: {:s}'.format(str(request))) + LOGGER.debug('ListTopologyIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListTopologyIds(request) - LOGGER.debug('ListTopologyIds result: {:s}'.format(str(response))) + LOGGER.debug('ListTopologyIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListTopologies(self, request: ContextId) -> TopologyList: - LOGGER.debug('ListTopologies request: {:s}'.format(str(request))) + LOGGER.debug('ListTopologies request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListTopologies(request) - LOGGER.debug('ListTopologies result: {:s}'.format(str(response))) + LOGGER.debug('ListTopologies result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetTopology(self, request: TopologyId) -> Topology: - LOGGER.debug('GetTopology request: {:s}'.format(str(request))) + LOGGER.debug('GetTopology request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetTopology(request) - LOGGER.debug('GetTopology result: {:s}'.format(str(response))) + LOGGER.debug('GetTopology result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetTopology(self, request: Topology) -> TopologyId: - LOGGER.debug('SetTopology request: {:s}'.format(str(request))) + LOGGER.debug('SetTopology request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetTopology(request) - LOGGER.debug('SetTopology result: {:s}'.format(str(response))) + LOGGER.debug('SetTopology result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveTopology(self, request: TopologyId) -> Empty: - LOGGER.debug('RemoveTopology request: {:s}'.format(str(request))) + LOGGER.debug('RemoveTopology request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveTopology(request) - LOGGER.debug('RemoveTopology result: {:s}'.format(str(response))) + LOGGER.debug('RemoveTopology result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetTopologyEvents(self, request: Empty) -> Iterator[TopologyEvent]: - LOGGER.debug('GetTopologyEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetTopologyEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetTopologyEvents(request) - LOGGER.debug('GetTopologyEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetTopologyEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListDeviceIds(self, request: Empty) -> DeviceIdList: - LOGGER.debug('ListDeviceIds request: {:s}'.format(str(request))) + LOGGER.debug('ListDeviceIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListDeviceIds(request) - LOGGER.debug('ListDeviceIds result: {:s}'.format(str(response))) + LOGGER.debug('ListDeviceIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListDevices(self, request: Empty) -> DeviceList: - LOGGER.debug('ListDevices request: {:s}'.format(str(request))) + LOGGER.debug('ListDevices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListDevices(request) - LOGGER.debug('ListDevices result: {:s}'.format(str(response))) + LOGGER.debug('ListDevices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetDevice(self, request: DeviceId) -> Device: - LOGGER.debug('GetDevice request: {:s}'.format(str(request))) + LOGGER.debug('GetDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetDevice(request) - LOGGER.debug('GetDevice result: {:s}'.format(str(response))) + LOGGER.debug('GetDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetDevice(self, request: Device) -> DeviceId: - LOGGER.debug('SetDevice request: {:s}'.format(str(request))) + LOGGER.debug('SetDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetDevice(request) - LOGGER.debug('SetDevice result: {:s}'.format(str(response))) + LOGGER.debug('SetDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveDevice(self, request: DeviceId) -> Empty: - LOGGER.debug('RemoveDevice request: {:s}'.format(str(request))) + LOGGER.debug('RemoveDevice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveDevice(request) - LOGGER.debug('RemoveDevice result: {:s}'.format(str(response))) + LOGGER.debug('RemoveDevice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetDeviceEvents(self, request: Empty) -> Iterator[DeviceEvent]: - LOGGER.debug('GetDeviceEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetDeviceEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetDeviceEvents(request) - LOGGER.debug('GetDeviceEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetDeviceEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListLinkIds(self, request: Empty) -> LinkIdList: - LOGGER.debug('ListLinkIds request: {:s}'.format(str(request))) + LOGGER.debug('ListLinkIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListLinkIds(request) - LOGGER.debug('ListLinkIds result: {:s}'.format(str(response))) + LOGGER.debug('ListLinkIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListLinks(self, request: Empty) -> LinkList: - LOGGER.debug('ListLinks request: {:s}'.format(str(request))) + LOGGER.debug('ListLinks request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListLinks(request) - LOGGER.debug('ListLinks result: {:s}'.format(str(response))) + LOGGER.debug('ListLinks result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetLink(self, request: LinkId) -> Link: - LOGGER.debug('GetLink request: {:s}'.format(str(request))) + LOGGER.debug('GetLink request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetLink(request) - LOGGER.debug('GetLink result: {:s}'.format(str(response))) + LOGGER.debug('GetLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetLink(self, request: Link) -> LinkId: - LOGGER.debug('SetLink request: {:s}'.format(str(request))) + LOGGER.debug('SetLink request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetLink(request) - LOGGER.debug('SetLink result: {:s}'.format(str(response))) + LOGGER.debug('SetLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveLink(self, request: LinkId) -> Empty: - LOGGER.debug('RemoveLink request: {:s}'.format(str(request))) + LOGGER.debug('RemoveLink request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveLink(request) - LOGGER.debug('RemoveLink result: {:s}'.format(str(response))) + LOGGER.debug('RemoveLink result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetLinkEvents(self, request: Empty) -> Iterator[LinkEvent]: - LOGGER.debug('GetLinkEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetLinkEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetLinkEvents(request) - LOGGER.debug('GetLinkEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetLinkEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListServiceIds(self, request: ContextId) -> ServiceIdList: - LOGGER.debug('ListServiceIds request: {:s}'.format(str(request))) + LOGGER.debug('ListServiceIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListServiceIds(request) - LOGGER.debug('ListServiceIds result: {:s}'.format(str(response))) + LOGGER.debug('ListServiceIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListServices(self, request: ContextId) -> ServiceList: - LOGGER.debug('ListServices request: {:s}'.format(str(request))) + LOGGER.debug('ListServices request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListServices(request) - LOGGER.debug('ListServices result: {:s}'.format(str(response))) + LOGGER.debug('ListServices result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetService(self, request: ServiceId) -> Service: - LOGGER.debug('GetService request: {:s}'.format(str(request))) + LOGGER.debug('GetService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetService(request) - LOGGER.debug('GetService result: {:s}'.format(str(response))) + LOGGER.debug('GetService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetService(self, request: Service) -> ServiceId: - LOGGER.debug('SetService request: {:s}'.format(str(request))) + LOGGER.debug('SetService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetService(request) - LOGGER.debug('SetService result: {:s}'.format(str(response))) + LOGGER.debug('SetService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveService(self, request: ServiceId) -> Empty: - LOGGER.debug('RemoveService request: {:s}'.format(str(request))) + LOGGER.debug('RemoveService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveService(request) - LOGGER.debug('RemoveService result: {:s}'.format(str(response))) + LOGGER.debug('RemoveService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetServiceEvents(self, request: Empty) -> Iterator[ServiceEvent]: - LOGGER.debug('GetServiceEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetServiceEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetServiceEvents(request) - LOGGER.debug('GetServiceEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetServiceEvents result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def ListSliceIds(self, request: ContextId) -> SliceIdList: + LOGGER.debug('ListSliceIds request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ListSliceIds(request) + LOGGER.debug('ListSliceIds result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def ListSlices(self, request: ContextId) -> SliceList: + LOGGER.debug('ListSlices request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ListSlices(request) + LOGGER.debug('ListSlices result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetSlice(self, request: SliceId) -> Slice: + LOGGER.debug('GetSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSlice(request) + LOGGER.debug('GetSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SetSlice(self, request: Slice) -> SliceId: + LOGGER.debug('SetSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetSlice(request) + LOGGER.debug('SetSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RemoveSlice(self, request: SliceId) -> Empty: + LOGGER.debug('RemoveSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RemoveSlice(request) + LOGGER.debug('RemoveSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetSliceEvents(self, request: Empty) -> Iterator[SliceEvent]: + LOGGER.debug('GetSliceEvents request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetSliceEvents(request) + LOGGER.debug('GetSliceEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListConnectionIds(self, request: ServiceId) -> ConnectionIdList: - LOGGER.debug('ListConnectionIds request: {:s}'.format(str(request))) + LOGGER.debug('ListConnectionIds request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListConnectionIds(request) - LOGGER.debug('ListConnectionIds result: {:s}'.format(str(response))) + LOGGER.debug('ListConnectionIds result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def ListConnections(self, request: ServiceId) -> ConnectionList: - LOGGER.debug('ListConnections request: {:s}'.format(str(request))) + LOGGER.debug('ListConnections request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.ListConnections(request) - LOGGER.debug('ListConnections result: {:s}'.format(str(response))) + LOGGER.debug('ListConnections result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetConnection(self, request: ConnectionId) -> Connection: - LOGGER.debug('GetConnection request: {:s}'.format(str(request))) + LOGGER.debug('GetConnection request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetConnection(request) - LOGGER.debug('GetConnection result: {:s}'.format(str(response))) + LOGGER.debug('GetConnection result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def SetConnection(self, request: Connection) -> ConnectionId: - LOGGER.debug('SetConnection request: {:s}'.format(str(request))) + LOGGER.debug('SetConnection request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.SetConnection(request) - LOGGER.debug('SetConnection result: {:s}'.format(str(response))) + LOGGER.debug('SetConnection result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def RemoveConnection(self, request: ConnectionId) -> Empty: - LOGGER.debug('RemoveConnection request: {:s}'.format(str(request))) + LOGGER.debug('RemoveConnection request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.RemoveConnection(request) - LOGGER.debug('RemoveConnection result: {:s}'.format(str(response))) + LOGGER.debug('RemoveConnection result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def GetConnectionEvents(self, request: Empty) -> Iterator[ConnectionEvent]: - LOGGER.debug('GetConnectionEvents request: {:s}'.format(str(request))) + LOGGER.debug('GetConnectionEvents request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.GetConnectionEvents(request) - LOGGER.debug('GetConnectionEvents result: {:s}'.format(str(response))) + LOGGER.debug('GetConnectionEvents result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/context/client/EventsCollector.py b/src/context/client/EventsCollector.py index 3022df0a6f4e56cdbf6444b90eeeb41b43277c53..f35b43eab40e7bf41334646ace6b4fa46beb5e10 100644 --- a/src/context/client/EventsCollector.py +++ b/src/context/client/EventsCollector.py @@ -32,6 +32,7 @@ class EventsCollector: self._device_stream = context_client_grpc.GetDeviceEvents(Empty()) self._link_stream = context_client_grpc.GetLinkEvents(Empty()) self._service_stream = context_client_grpc.GetServiceEvents(Empty()) + self._slice_stream = context_client_grpc.GetSliceEvents(Empty()) self._connection_stream = context_client_grpc.GetConnectionEvents(Empty()) self._context_thread = threading.Thread(target=self._collect, args=(self._context_stream ,), daemon=False) @@ -39,6 +40,7 @@ class EventsCollector: self._device_thread = threading.Thread(target=self._collect, args=(self._device_stream ,), daemon=False) self._link_thread = threading.Thread(target=self._collect, args=(self._link_stream ,), daemon=False) self._service_thread = threading.Thread(target=self._collect, args=(self._service_stream ,), daemon=False) + self._slice_thread = threading.Thread(target=self._collect, args=(self._slice_stream ,), daemon=False) self._connection_thread = threading.Thread(target=self._collect, args=(self._connection_stream,), daemon=False) def _collect(self, events_stream) -> None: @@ -57,6 +59,7 @@ class EventsCollector: self._device_thread.start() self._link_thread.start() self._service_thread.start() + self._slice_thread.start() self._connection_thread.start() def get_event(self, block : bool = True, timeout : float = 0.1): @@ -85,6 +88,7 @@ class EventsCollector: self._device_stream.cancel() self._link_stream.cancel() self._service_stream.cancel() + self._slice_stream.cancel() self._connection_stream.cancel() self._context_thread.join() @@ -92,4 +96,5 @@ class EventsCollector: self._device_thread.join() self._link_thread.join() self._service_thread.join() + self._slice_thread.join() self._connection_thread.join() diff --git a/src/context/service/database/SliceModel.py b/src/context/service/database/SliceModel.py index d5c7486f190c3cfe03f40d16466822d38a219885..5b560a94864edb7fa9f501ad03377aa167036efc 100644 --- a/src/context/service/database/SliceModel.py +++ b/src/context/service/database/SliceModel.py @@ -81,5 +81,5 @@ class SliceModel(Model): if include_endpoint_ids: result['slice_endpoint_ids'] = self.dump_endpoint_ids() if include_constraints: result['slice_constraints'] = self.dump_constraints() if include_service_ids: result['slice_service_ids'] = self.dump_service_ids() - if include_subslice_ids: result['sub_subslice_ids'] = self.dump_subslice_ids() + if include_subslice_ids: result['slice_subslice_ids'] = self.dump_subslice_ids() return result diff --git a/src/context/service/grpc_server/ContextServiceServicerImpl.py b/src/context/service/grpc_server/ContextServiceServicerImpl.py index 223c0466a40312ca6ee4b5f7e7515bc90233cf36..9218d550f824b22f3f90b042363acf433dfe0e59 100644 --- a/src/context/service/grpc_server/ContextServiceServicerImpl.py +++ b/src/context/service/grpc_server/ContextServiceServicerImpl.py @@ -609,7 +609,7 @@ class ContextServiceServicerImpl(ContextServiceServicer): @safe_and_metered_rpc_method(METRICS, LOGGER) def SetSlice(self, request: Slice, context : grpc.ServicerContext) -> SliceId: with self.lock: - context_uuid = request.service_id.context_id.context_uuid.uuid + context_uuid = request.slice_id.context_id.context_uuid.uuid db_context : ContextModel = get_object(self.database, ContextModel, context_uuid) for i,endpoint_id in enumerate(request.slice_endpoint_ids): diff --git a/src/interdomain/Config.py b/src/interdomain/Config.py index 2e236fa9f7b8974e2c7add60af3ef7f1a47724aa..ee5cd04118354f4ad2fb36d04eb507aae2412190 100644 --- a/src/interdomain/Config.py +++ b/src/interdomain/Config.py @@ -26,7 +26,8 @@ GRPC_GRACE_PERIOD = 60 METRICS_PORT = 9192 # Dependency micro-service connection settings +CONTEXT_SERVICE_HOST = '127.0.0.1' +CONTEXT_SERVICE_PORT = 1010 + SLICE_SERVICE_HOST = '127.0.0.1' SLICE_SERVICE_PORT = 4040 - - diff --git a/src/interdomain/client/InterdomainClient.py b/src/interdomain/client/InterdomainClient.py index fc61496cf8e20c806fd0d438854891110624a45e..985af9c53edcfd3f3439fbe7681787c99021ed95 100644 --- a/src/interdomain/client/InterdomainClient.py +++ b/src/interdomain/client/InterdomainClient.py @@ -1,7 +1,7 @@ import grpc, logging from common.tools.client.RetryDecorator import retry, delay_exponential -from interdomain.proto.context_pb2 import TeraFlowController, AuthenticationResult -from interdomain.proto.slice_pb2 import TransportSlice, SliceId +from common.tools.grpc.Tools import grpc_message_to_json_string +from interdomain.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatus, TeraFlowController from interdomain.proto.interdomain_pb2_grpc import InterdomainServiceStub LOGGER = logging.getLogger(__name__) @@ -27,30 +27,37 @@ class InterdomainClient: self.channel = None self.stub = None + @RETRY_DECORATOR + def RequestSlice(self, request : Slice) -> SliceId: + LOGGER.debug('RequestSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RequestSlice(request) + LOGGER.debug('RequestSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + @RETRY_DECORATOR def Authenticate(self, request : TeraFlowController) -> AuthenticationResult: - LOGGER.debug('Authenticate request: {:s}'.format(str(request))) + LOGGER.debug('Authenticate request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.Authenticate(request) - LOGGER.debug('Authenticate result: {:s}'.format(str(response))) + LOGGER.debug('Authenticate result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def LookUpSlice(self, request : TransportSlice) -> SliceId: - LOGGER.debug('LookUpSlice request: {:s}'.format(str(request))) + def LookUpSlice(self, request : Slice) -> SliceId: + LOGGER.debug('LookUpSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.LookUpSlice(request) - LOGGER.debug('LookUpSlice result: {:s}'.format(str(response))) + LOGGER.debug('LookUpSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def OrderSliceFromCatalog(self, request : TransportSlice) -> SliceStatus: - LOGGER.debug('OrderSliceFromCatalog request: {:s}'.format(str(request))) + def OrderSliceFromCatalog(self, request : Slice) -> SliceStatus: + LOGGER.debug('OrderSliceFromCatalog request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.OrderSliceFromCatalog(request) - LOGGER.debug('OrderSliceFromCatalog result: {:s}'.format(str(response))) + LOGGER.debug('OrderSliceFromCatalog result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def CreateSliceAndAddToCatalog(self, request : TransportSlice) -> SliceStatus: - LOGGER.debug('CreateSliceAndAddToCatalog request: {:s}'.format(str(request))) + def CreateSliceAndAddToCatalog(self, request : Slice) -> SliceStatus: + LOGGER.debug('CreateSliceAndAddToCatalog request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.CreateSliceAndAddToCatalog(request) - LOGGER.debug('CreateSliceAndAddToCatalog result: {:s}'.format(str(response))) + LOGGER.debug('CreateSliceAndAddToCatalog result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/interdomain/requirements.in b/src/interdomain/requirements.in index 58c398e7de1c46bac98483f3171b7fd37df98370..162ecde82a076fce597850ac8d71de3880c9a5eb 100644 --- a/src/interdomain/requirements.in +++ b/src/interdomain/requirements.in @@ -1,5 +1,6 @@ grpcio==1.43.0 grpcio-health-checking==1.43.0 prometheus-client==0.13.0 +protobuf==3.19.3 pytest==6.2.5 pytest-benchmark==3.4.1 diff --git a/src/interdomain/service/InterdomainService.py b/src/interdomain/service/InterdomainService.py index b1eaa635fb0a0155c8c791d9156a79a35272a3d3..debc943cf17ef2444b231d9a7f10338e5bc5f5b6 100644 --- a/src/interdomain/service/InterdomainService.py +++ b/src/interdomain/service/InterdomainService.py @@ -17,19 +17,24 @@ from concurrent import futures from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from context.client.ContextClient import ContextClient from interdomain.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD from interdomain.proto.interdomain_pb2_grpc import add_InterdomainServiceServicer_to_server +from slice.client.SliceClient import SliceClient from .InterdomainServiceServicerImpl import InterdomainServiceServicerImpl +from .RemoteDomainClients import RemoteDomainClients BIND_ADDRESS = '0.0.0.0' LOGGER = logging.getLogger(__name__) class InterdomainService: def __init__( - self, slice_client, - address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): - + self, context_client : ContextClient, slice_client : SliceClient, remote_domain_clients : RemoteDomainClients, + address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD + ): + self.context_client = context_client self.slice_client = slice_client + self.remote_domain_clients = remote_domain_clients self.address = address self.port = port self.endpoint = None @@ -48,7 +53,8 @@ class InterdomainService: self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - self.interdomain_servicer = InterdomainServiceServicerImpl() + self.interdomain_servicer = InterdomainServiceServicerImpl( + self.context_client, self.slice_client, self.remote_domain_clients) add_InterdomainServiceServicer_to_server(self.interdomain_servicer, self.server) self.health_servicer = HealthServicer( diff --git a/src/interdomain/service/InterdomainServiceServicerImpl.py b/src/interdomain/service/InterdomainServiceServicerImpl.py index 8570651d7700cdc6628a66130471e1be40104aa5..e76297625e344f3dd46fbdd951e9705e7b171d36 100644 --- a/src/interdomain/service/InterdomainServiceServicerImpl.py +++ b/src/interdomain/service/InterdomainServiceServicerImpl.py @@ -14,36 +14,139 @@ import grpc, logging from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from interdomain.proto.context_pb2 import AuthenticationResult, TeraFlowController -from interdomain.proto.slice_pb2 import SliceId, SliceStatus, TransportSlice +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import SliceStatusEnum +from interdomain.proto.context_pb2 import AuthenticationResult, Slice, SliceId, SliceStatus, TeraFlowController from interdomain.proto.interdomain_pb2_grpc import InterdomainServiceServicer +from interdomain.service.RemoteDomainClients import RemoteDomainClients +from slice.client.SliceClient import SliceClient LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Interdomain' -METHOD_NAMES = ['Authenticate', 'LookUpSlice', 'OrderSliceFromCatalog', 'CreateSliceAndAddToCatalog'] +METHOD_NAMES = ['RequestSlice', 'Authenticate', 'LookUpSlice', 'OrderSliceFromCatalog', 'CreateSliceAndAddToCatalog'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class InterdomainServiceServicerImpl(InterdomainServiceServicer): - def __init__(self): + def __init__( + self, context_client : ContextClient, slice_client : SliceClient, + remote_domain_clients : RemoteDomainClients + ): LOGGER.debug('Creating Servicer...') + self.context_client = context_client + self.slice_client = slice_client + self.remote_domain_clients = remote_domain_clients LOGGER.debug('Servicer Created') + @safe_and_metered_rpc_method(METRICS, LOGGER) + def RequestSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + domains_to_endpoints = {} + local_domain_uuid = None + for slice_endpoint_id in request.slice_endpoint_ids: + device_uuid = slice_endpoint_id.device_id.device_uuid.uuid + domain_uuid = device_uuid.split('@')[1] + endpoints = domains_to_endpoints.setdefault(domain_uuid, []) + endpoints.append(slice_endpoint_id) + if local_domain_uuid is None: local_domain_uuid = domain_uuid + + reply = Slice() + reply.CopyFrom(request) + + # decompose remote slices + for domain_uuid, slice_endpoint_ids in domains_to_endpoints.items(): + if domain_uuid == local_domain_uuid: continue + + remote_slice_request = Slice() + remote_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid + remote_slice_request.slice_id.slice_uuid.uuid = \ + request.slice_id.slice_uuid.uuid + ':subslice@' + local_domain_uuid + remote_slice_request.slice_status.slice_status = request.slice_status.slice_status + for endpoint_id in slice_endpoint_ids: + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = endpoint_id.device_id.device_uuid.uuid + slice_endpoint_id.endpoint_uuid.uuid = endpoint_id.endpoint_uuid.uuid + + # add endpoint connecting to remote domain + if domain_uuid == 'D1': + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + elif domain_uuid == 'D2': + slice_endpoint_id = remote_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + + interdomain_client = self.remote_domain_clients.get_peer('remote-teraflow') + remote_slice_reply = interdomain_client.LookUpSlice(remote_slice_request) + if remote_slice_reply == remote_slice_request.slice_id: # pylint: disable=no-member + # successful case + remote_slice = interdomain_client.OrderSliceFromCatalog(remote_slice_request) + if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: + raise Exception('Remote Slice creation failed. Wrong Slice status returned') + else: + # not in catalog + remote_slice = interdomain_client.CreateSliceAndAddToCatalog(remote_slice_request) + if remote_slice.slice_status.slice_status != SliceStatusEnum.SLICESTATUS_ACTIVE: + raise Exception('Remote Slice creation failed. Wrong Slice status returned') + + #self.context_client.SetSlice(remote_slice) + #subslice_id = reply.slice_subslice_ids.add() + #subslice_id.CopyFrom(remote_slice.slice_id) + + local_slice_request = Slice() + local_slice_request.slice_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid + local_slice_request.slice_id.slice_uuid.uuid = request.slice_id.slice_uuid.uuid + ':subslice' + local_slice_request.slice_status.slice_status = request.slice_status.slice_status + for endpoint_id in domains_to_endpoints[local_domain_uuid]: + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.CopyFrom(endpoint_id) + + # add endpoint connecting to remote domain + if local_domain_uuid == 'D1': + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R4@D1' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + elif local_domain_uuid == 'D2': + slice_endpoint_id = local_slice_request.slice_endpoint_ids.add() + slice_endpoint_id.device_id.device_uuid.uuid = 'R1@D2' + slice_endpoint_id.endpoint_uuid.uuid = '2/1' + + local_slice_reply = self.slice_client.CreateSlice(local_slice_request) + if local_slice_reply != local_slice_request.slice_id: # pylint: disable=no-member + raise Exception('Local Slice creation failed. Wrong Slice Id was returned') + + subslice_id = reply.slice_subslice_ids.add() + subslice_id.context_id.context_uuid.uuid = local_slice_request.slice_id.context_id.context_uuid.uuid + subslice_id.slice_uuid.uuid = local_slice_request.slice_id.slice_uuid.uuid + + self.context_client.SetSlice(reply) + return reply.slice_id + @safe_and_metered_rpc_method(METRICS, LOGGER) def Authenticate(self, request : TeraFlowController, context : grpc.ServicerContext) -> AuthenticationResult: auth_result = AuthenticationResult() - #auth_result.context_id = ... + auth_result.context_id.CopyFrom(request.context_id) # pylint: disable=no-member auth_result.authenticated = True return auth_result @safe_and_metered_rpc_method(METRICS, LOGGER) - def LookUpSlice(self, request : TransportSlice, context : grpc.ServicerContext) -> SliceId: - return SliceId() + def LookUpSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + try: + slice_ = self.context_client.GetSlice(request.slice_id) + return slice_.slice_id + except grpc.RpcError: + #LOGGER.exception('Unable to get slice({:s})'.format(grpc_message_to_json_string(request.slice_id))) + return SliceId() @safe_and_metered_rpc_method(METRICS, LOGGER) - def OrderSliceFromCatalog(self, request : TransportSlice, context : grpc.ServicerContext) -> SliceStatus: - return SliceStatus() + def OrderSliceFromCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + raise NotImplementedError('OrderSliceFromCatalog') + #return Slice() @safe_and_metered_rpc_method(METRICS, LOGGER) - def CreateSliceAndAddToCatalog(self, request : TransportSlice, context : grpc.ServicerContext) -> SliceStatus: - return SliceStatus() + def CreateSliceAndAddToCatalog(self, request : Slice, context : grpc.ServicerContext) -> Slice: + reply = self.slice_client.CreateSlice(request) + if reply != request.slice_id: # pylint: disable=no-member + raise Exception('Slice creation failed. Wrong Slice Id was returned') + return self.context_client.GetSlice(request.slice_id) diff --git a/src/interdomain/service/RemoteDomainClients.py b/src/interdomain/service/RemoteDomainClients.py new file mode 100644 index 0000000000000000000000000000000000000000..98c6a37d03caee2d515601d239ff69be2200a810 --- /dev/null +++ b/src/interdomain/service/RemoteDomainClients.py @@ -0,0 +1,42 @@ +import logging, socket +from common.Constants import DEFAULT_CONTEXT_UUID +from common.Settings import get_setting +from interdomain.Config import GRPC_SERVICE_PORT +from interdomain.client.InterdomainClient import InterdomainClient +from interdomain.proto.context_pb2 import TeraFlowController + +LOGGER = logging.getLogger(__name__) + +class RemoteDomainClients: + def __init__(self) -> None: + self.peer_domain = {} + + def add_peer( + self, domain_name : str, address : str, port : int, context_uuid : str = DEFAULT_CONTEXT_UUID + ) -> None: + while True: + try: + remote_teraflow_ip = socket.gethostbyname(address) + if len(remote_teraflow_ip) > 0: break + except socket.gaierror as e: + if str(e) == '[Errno -2] Name or service not known': continue + + interdomain_client = InterdomainClient(address, port) + request = TeraFlowController() + request.context_id.context_uuid.uuid = DEFAULT_CONTEXT_UUID # pylint: disable=no-member + request.ip_address = get_setting('INTERDOMAINSERVICE_SERVICE_HOST', default='0.0.0.0') + request.port = int(get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT)) + + reply = interdomain_client.Authenticate(request) + if not reply.authenticated: + msg = 'Authentication against {:s}:{:d} rejected' + raise Exception(msg.format(str(remote_teraflow_ip), GRPC_SERVICE_PORT)) + + self.peer_domain[domain_name] = interdomain_client + + def get_peer(self, domain_name : str) -> InterdomainClient: + LOGGER.warning('peers: {:s}'.format(str(self.peer_domain))) + return self.peer_domain.get(domain_name) + + def remove_peer(self, domain_name : str) -> None: + return self.peer_domain.pop(domain_name, None) diff --git a/src/interdomain/service/__main__.py b/src/interdomain/service/__main__.py index 2b919870a705af980654a2f60dcd16b618cf41a1..ff19271ee4fee7fb975294e11b21de18060607f4 100644 --- a/src/interdomain/service/__main__.py +++ b/src/interdomain/service/__main__.py @@ -15,10 +15,12 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_setting, wait_for_environment_variables -from slice.client.SliceClient import SliceClient +from context.client.ContextClient import ContextClient +from interdomain.service.RemoteDomainClients import RemoteDomainClients from interdomain.Config import ( - SLICE_SERVICE_HOST, SLICE_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, - METRICS_PORT) + CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, SLICE_SERVICE_HOST, SLICE_SERVICE_PORT, GRPC_SERVICE_PORT, + GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT) +from slice.client.SliceClient import SliceClient from .InterdomainService import InterdomainService terminate = threading.Event() @@ -40,11 +42,13 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - #wait_for_environment_variables([ - # 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', - # 'MONITORINGSERVICE_SERVICE_HOST', 'MONITORINGSERVICE_SERVICE_PORT_GRPC' - #]) + wait_for_environment_variables([ + 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', + 'SLICESERVICE_SERVICE_HOST', 'SLICESERVICE_SERVICE_PORT_GRPC', + ]) + context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) + context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT ) slice_service_host = get_setting('SLICESERVICE_SERVICE_HOST', default=SLICE_SERVICE_HOST ) slice_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=SLICE_SERVICE_PORT ) @@ -56,18 +60,29 @@ def main(): # Start metrics server start_http_server(metrics_port) - ## Initialize Slice Client - #if slice_service_host is None or slice_service_port is None: - # raise Exception('Wrong address({:s}):port({:s}) of Slice component'.format( - # str(slice_service_host), str(slice_service_port))) - #slice_client = SliceClient(slice_service_host, slice_service_port) + # Initialize Context Client + if context_service_host is None or context_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( + str(context_service_host), str(context_service_port))) + context_client = ContextClient(context_service_host, context_service_port) + + # Initialize Slice Client + if slice_service_host is None or slice_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Slice component'.format( + str(slice_service_host), str(slice_service_port))) + slice_client = SliceClient(slice_service_host, slice_service_port) + + # Define remote domain clients + remote_domain_clients = RemoteDomainClients() # Starting Interdomain service grpc_service = InterdomainService( - slice_client, port=grpc_service_port, max_workers=max_workers, + context_client, slice_client, remote_domain_clients, port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) grpc_service.start() + remote_domain_clients.add_peer('remote-teraflow', 'remote-teraflow', GRPC_SERVICE_PORT) + # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=0.1): pass diff --git a/src/monitoring/requirements.in b/src/monitoring/requirements.in index 77b66b79498d6df89a1d3b8989a02c06078eb1b9..df2f1cf89412e906a3236ac778f300872c8abf70 100644 --- a/src/monitoring/requirements.in +++ b/src/monitoring/requirements.in @@ -1,16 +1,17 @@ -google-api-core -grpcio-health-checking -grpcio -opencensus[stackdriver] +#google-api-core +grpcio==1.43.0 +grpcio-health-checking==1.43.0 +#opencensus[stackdriver] python-json-logger -google-cloud-profiler -numpy -prometheus-client -pytest -pytest-benchmark +#google-cloud-profiler +#numpy +prometheus-client==0.13.0 +protobuf==3.19.3 +pytest==6.2.5 +pytest-benchmark==3.4.1 influxdb -redis -anytree -apscheduler -xmltodict -coverage +#redis==4.1.2 +#anytree==2.8.0 +#APScheduler==3.8.1 +#xmltodict==0.12.0 +coverage==6.3 diff --git a/src/service/client/ServiceClient.py b/src/service/client/ServiceClient.py index af489c0c672ac601939fe907e179cd0c83a3d140..a44842768b9ee112288653f57706c5c4549503e9 100644 --- a/src/service/client/ServiceClient.py +++ b/src/service/client/ServiceClient.py @@ -14,6 +14,7 @@ import grpc, logging from common.tools.client.RetryDecorator import retry, delay_exponential +from common.tools.grpc.Tools import grpc_message_to_json_string from service.proto.context_pb2 import Empty, Service, ServiceId from service.proto.service_pb2_grpc import ServiceServiceStub @@ -42,21 +43,21 @@ class ServiceClient: @RETRY_DECORATOR def CreateService(self, request : Service) -> ServiceId: - LOGGER.debug('CreateService request: {:s}'.format(str(request))) + LOGGER.debug('CreateService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.CreateService(request) - LOGGER.debug('CreateService result: {:s}'.format(str(response))) + LOGGER.debug('CreateService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def UpdateService(self, request : Service) -> ServiceId: - LOGGER.debug('UpdateService request: {:s}'.format(str(request))) + LOGGER.debug('UpdateService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.UpdateService(request) - LOGGER.debug('UpdateService result: {:s}'.format(str(response))) + LOGGER.debug('UpdateService result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR def DeleteService(self, request : ServiceId) -> Empty: - LOGGER.debug('DeleteService request: {:s}'.format(str(request))) + LOGGER.debug('DeleteService request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.DeleteService(request) - LOGGER.debug('DeleteService result: {:s}'.format(str(response))) + LOGGER.debug('DeleteService result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index 2506a420617674be04772437e72c8c541c9f384c..7720699321daa4f1b77af60a9059e3fc9d7eec79 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -52,6 +52,8 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): @safe_and_metered_rpc_method(METRICS, LOGGER) def CreateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId: + LOGGER.info('[CreateService] begin ; request = {:s}'.format(grpc_message_to_json_string(request))) + service_id = request.service_id service_uuid = service_id.service_uuid.uuid service_context_uuid = service_id.context_id.context_uuid.uuid diff --git a/src/slice/Config.py b/src/slice/Config.py index 6787413a881b4d84ed6b4ecb5e041901399a4ae6..e6d770d000cc249d73cccf17dd17f21ccb001f7d 100644 --- a/src/slice/Config.py +++ b/src/slice/Config.py @@ -4,9 +4,19 @@ import logging LOG_LEVEL = logging.WARNING # gRPC settings -GRPC_SERVICE_PORT = 2020 +GRPC_SERVICE_PORT = 4040 GRPC_MAX_WORKERS = 10 GRPC_GRACE_PERIOD = 60 # Prometheus settings METRICS_PORT = 9192 + +# Dependency micro-service connection settings +CONTEXT_SERVICE_HOST = '127.0.0.1' +CONTEXT_SERVICE_PORT = 1010 + +SERVICE_SERVICE_HOST = '127.0.0.1' +SERVICE_SERVICE_PORT = 3030 + +INTERDOMAIN_SERVICE_HOST = '127.0.0.1' +INTERDOMAIN_SERVICE_PORT = 10010 diff --git a/src/slice/Dockerfile b/src/slice/Dockerfile index 081d60ecde9bdd19d973a058fd1baa4bcf97a29c..d653bb21778adbbd09407c1ca54f0afdc7ae5d81 100644 --- a/src/slice/Dockerfile +++ b/src/slice/Dockerfile @@ -29,6 +29,9 @@ RUN python3 -m pip install -r slice/requirements.in # Add files into working directory COPY common/. common +COPY context/. context +COPY interdomain/. interdomain +COPY service/. service COPY slice/. slice # Start slice service diff --git a/src/slice/_docs/old_conflicting_code.txt b/src/slice/_docs/old_conflicting_code.txt new file mode 100644 index 0000000000000000000000000000000000000000..12bd9bbbcef95c465157dbe678056da95c6175f1 --- /dev/null +++ b/src/slice/_docs/old_conflicting_code.txt @@ -0,0 +1,176 @@ +src/common/tools/service/DeviceCheckers.py + import grpc + from common.database.api.Database import Database + from common.database.api.context.topology.device.Device import Device + from common.database.api.context.topology.device.Endpoint import Endpoint + from common.exceptions.ServiceException import ServiceException + + def check_device_exists(database : Database, context_id : str, topology_id : str, device_id : str) -> Device: + db_context = database.context(context_id).create() + db_topology = db_context.topology(topology_id).create() + if db_topology.devices.contains(device_id): return db_topology.device(device_id) + msg = 'Context({})/Topology({})/Device({}) does not exist in the database.' + msg = msg.format(context_id, topology_id, device_id) + raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) + + +src/common/tools/service/LinkCheckers.py + import grpc + from common.database.api.Database import Database + from common.database.api.context.topology.link.Link import Link + from common.exceptions.ServiceException import ServiceException + + def check_link_exists(database : Database, context_id : str, topology_id : str, link_id : str) -> Link: + db_context = database.context(context_id).create() + db_topology = db_context.topology(topology_id).create() + if db_topology.links.contains(link_id): return db_topology.link(link_id) + msg = 'Context({})/Topology({})/Link({}) does not exist in the database.' + msg = msg.format(context_id, topology_id, link_id) + raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) + + +src/common/tools/service/ServiceCheckers.py + import grpc + from common.database.api.Database import Database + from common.exceptions.ServiceException import ServiceException + + def check_service_exists(database : Database, context_id : str, service_id : str): + if not database.contexts.contains(context_id): + msg = 'Context({}) does not exist in the database.' + msg = msg.format(context_id) + raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) + + db_context = database.context(context_id) + if db_context.services.contains(service_id): + return db_context.service(service_id) + + msg = 'Context({})/Service({}) does not exist in the database.' + msg = msg.format(context_id, service_id) + raise ServiceException(grpc.StatusCode.NOT_FOUND, msg) + + +src/device/service/Tools.py + import grpc, logging + from typing import Dict, List, Set, Tuple + from common.Checkers import chk_options, chk_string + from common.database.api.Database import Database + from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID + from common.database.api.context.topology.device.Endpoint import Endpoint + from common.database.api.context.topology.device.OperationalStatus import OperationalStatus, \ + operationalstatus_enum_values, to_operationalstatus_enum + from common.exceptions.ServiceException import ServiceException + from common.tools.service.DeviceCheckers import check_device_endpoint_exists + from common.tools.service.EndpointIdCheckers import check_endpoint_id + from common.tools.service.EnumCheckers import check_enum + from common.tools.service.DeviceCheckers import check_device_exists, check_device_not_exists + from device.proto.context_pb2 import Device, DeviceId + + # For each method name, define acceptable device operational statuses. Empty set means accept all. + ACCEPTED_DEVICE_OPERATIONAL_STATUSES : Dict[str, Set[OperationalStatus]] = { + 'AddDevice': set([OperationalStatus.ENABLED, OperationalStatus.DISABLED]), + 'UpdateDevice': set([OperationalStatus.KEEP_STATE, OperationalStatus.ENABLED, OperationalStatus.DISABLED]), + } + + def _check_device_exists(method_name : str, database : Database, device_id : str): + if method_name in ['AddDevice']: + check_device_not_exists(database, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id) + elif method_name in ['UpdateDevice', 'DeleteDevice']: + check_device_exists(database, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id) + else: # pragma: no cover (test requires malforming the code) + msg = 'Unexpected condition: _check_device_exists(method_name={}, device_id={})' + msg = msg.format(str(method_name), str(device_id)) + raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, msg) + + def _check_device_endpoint_exists_or_get_pointer( + method_name : str, database : Database, parent_name : str, device_id : str, endpoint_id : str) -> Endpoint: + + if method_name in ['AddDevice']: + db_context = database.context(DEFAULT_CONTEXT_ID) + db_topology = db_context.topology(DEFAULT_TOPOLOGY_ID) + db_device = db_topology.device(device_id) + return db_device.endpoint(endpoint_id) + elif method_name in ['UpdateDevice', 'DeleteDevice']: + return check_device_endpoint_exists( + database, parent_name, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id, endpoint_id) + else: # pragma: no cover (test requires malforming the code) + msg = 'Unexpected condition: _check_device_endpoint_exists_or_get_pointer(method_name={}, ' \ + 'parent_name={}, device_id={}, endpoint_id={})' + msg = msg.format(str(method_name), str(parent_name), str(device_id), str(endpoint_id)) + raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, msg) + + def check_device_operational_status(method_name : str, value : str) -> OperationalStatus: + return check_enum( + 'OperationalStatus', method_name, value, to_operationalstatus_enum, ACCEPTED_DEVICE_OPERATIONAL_STATUSES) + + def check_device_request( + method_name : str, request : Device, database : Database, logger : logging.Logger + ) -> Tuple[str, str, str, OperationalStatus, List[Tuple[Endpoint, str]]]: + + # ----- Parse attributes ------------------------------------------------------------------------------------------- + try: + device_id = chk_string ('device.device_id.device_id.uuid', + request.device_id.device_id.uuid, + allow_empty=False) + device_type = chk_string ('device.device_type', + request.device_type, + allow_empty=False) + device_config = chk_string ('device.device_config.device_config', + request.device_config.device_config, + allow_empty=True) + device_opstat = chk_options('device.devOperationalStatus', + request.devOperationalStatus, + operationalstatus_enum_values()) + except Exception as e: + logger.exception('Invalid arguments:') + raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) + + device_opstat = check_device_operational_status(method_name, device_opstat) + + # ----- Check if device exists in database ------------------------------------------------------------------------- + _check_device_exists(method_name, database, device_id) + + # ----- Parse endpoints and check if they exist in the database as device endpoints -------------------------------- + add_topology_devices_endpoints : Dict[str, Dict[str, Set[str]]] = {} + db_endpoints__port_types : List[Tuple[Endpoint, str]] = [] + for endpoint_number,endpoint in enumerate(request.endpointList): + parent_name = 'Endpoint(#{}) of Context({})/Topology({})/Device({})' + parent_name = parent_name.format(endpoint_number, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id) + + _, ep_device_id, ep_port_id = check_endpoint_id( + logger, endpoint_number, parent_name, endpoint.port_id, add_topology_devices_endpoints, + predefined_device_id=device_id, acceptable_device_ids=set([device_id]), + prevent_same_device_multiple_times=False) + + try: + ep_port_type = chk_string('endpoint[#{}].port_type'.format(endpoint_number), + endpoint.port_type, + allow_empty=False) + except Exception as e: + logger.exception('Invalid arguments:') + raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) + + db_endpoint = _check_device_endpoint_exists_or_get_pointer( + method_name, database, parent_name, ep_device_id, ep_port_id) + db_endpoints__port_types.append((db_endpoint, ep_port_type)) + + return device_id, device_type, device_config, device_opstat, db_endpoints__port_types + + def check_device_id_request( + method_name : str, request : DeviceId, database : Database, logger : logging.Logger) -> str: + + # ----- Parse attributes ------------------------------------------------------------------------------------------- + try: + device_id = chk_string('device_id.device_id.uuid', + request.device_id.uuid, + allow_empty=False) + except Exception as e: + logger.exception('Invalid arguments:') + raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e)) + + # ----- Check if device exists in database --------------------------------------------------------------------------- + _check_device_exists(method_name, database, device_id) + + return device_id + + +src/service/service/Tools.py diff --git a/src/slice/client/SliceClient.py b/src/slice/client/SliceClient.py index a056a7101793abd2b4d3ab2aba0ab4f0b063f573..c30c28002e60d05a7f1b02c462ac45c37fe11aef 100644 --- a/src/slice/client/SliceClient.py +++ b/src/slice/client/SliceClient.py @@ -1,7 +1,7 @@ import grpc, logging from common.tools.client.RetryDecorator import retry, delay_exponential -from slice.proto.context_pb2 import Empty -from slice.proto.slice_pb2 import TransportSlice, SliceStatus +from common.tools.grpc.Tools import grpc_message_to_json_string +from slice.proto.context_pb2 import Empty, Slice, SliceId from slice.proto.slice_pb2_grpc import SliceServiceStub LOGGER = logging.getLogger(__name__) @@ -23,20 +23,27 @@ class SliceClient: self.stub = SliceServiceStub(self.channel) def close(self): - if(self.channel is not None): self.channel.close() + if self.channel is not None: self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR - def CreateUpdateSlice(self, request : TransportSlice) -> SliceStatus: - LOGGER.debug('CreateUpdateSlice request: {:s}'.format(str(request))) - response = self.stub.CreateUpdateSlice(request) - LOGGER.debug('CreateUpdateSlice result: {:s}'.format(str(response))) + def CreateSlice(self, request : Slice) -> SliceId: + LOGGER.debug('CreateSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.CreateSlice(request) + LOGGER.debug('CreateSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def DeleteSlice(self, request : TransportSlice) -> Empty: - LOGGER.debug('DeleteSlice request: {:s}'.format(str(request))) + def UpdateSlice(self, request : Slice) -> SliceId: + LOGGER.debug('UpdateSlice request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.UpdateSlice(request) + LOGGER.debug('UpdateSlice result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def DeleteSlice(self, request : SliceId) -> Empty: + LOGGER.debug('DeleteSlice request: {:s}'.format(grpc_message_to_json_string(request))) response = self.stub.DeleteSlice(request) - LOGGER.debug('DeleteSlice result: {:s}'.format(str(response))) + LOGGER.debug('DeleteSlice result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/slice/requirements.in b/src/slice/requirements.in index 58c398e7de1c46bac98483f3171b7fd37df98370..162ecde82a076fce597850ac8d71de3880c9a5eb 100644 --- a/src/slice/requirements.in +++ b/src/slice/requirements.in @@ -1,5 +1,6 @@ grpcio==1.43.0 grpcio-health-checking==1.43.0 prometheus-client==0.13.0 +protobuf==3.19.3 pytest==6.2.5 pytest-benchmark==3.4.1 diff --git a/src/slice/service/SliceService.py b/src/slice/service/SliceService.py index cddc4efe95ca239c54cb403ea6d89ff3a7e269b9..a7ad2694698333d0450aef9e670dd2a4fe9b30e5 100644 --- a/src/slice/service/SliceService.py +++ b/src/slice/service/SliceService.py @@ -17,6 +17,9 @@ from concurrent import futures from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from context.client.ContextClient import ContextClient +from interdomain.client.InterdomainClient import InterdomainClient +from service.client.ServiceClient import ServiceClient from slice.proto.slice_pb2_grpc import add_SliceServiceServicer_to_server from slice.service.SliceServiceServicerImpl import SliceServiceServicerImpl from slice.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD @@ -26,9 +29,12 @@ LOGGER = logging.getLogger(__name__) class SliceService: def __init__( - self, # database, + self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): - #self.database = database + + self.context_client = context_client + self.interdomain_client = interdomain_client + self.service_client = service_client self.address = address self.port = port self.endpoint = None @@ -48,8 +54,7 @@ class SliceService: self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) self.slice_servicer = SliceServiceServicerImpl( - #self.database - ) + self.context_client, self.interdomain_client, self.service_client) add_SliceServiceServicer_to_server(self.slice_servicer, self.server) self.health_servicer = HealthServicer( diff --git a/src/slice/service/SliceServiceServicerImpl.py b/src/slice/service/SliceServiceServicerImpl.py index 702e0ce38ceaf0fd8df80f1f8d41d769eff1bf43..dcc93622af532dfbd7342a6c130d7ef301b265aa 100644 --- a/src/slice/service/SliceServiceServicerImpl.py +++ b/src/slice/service/SliceServiceServicerImpl.py @@ -12,27 +12,124 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging +import grpc, json, logging +from typing import Dict from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method -from slice.proto.context_pb2 import Empty -from slice.proto.slice_pb2 import SliceStatus, TransportSlice +from context.client.ContextClient import ContextClient +from context.proto.context_pb2 import ( + ConfigActionEnum, Empty, Service, ServiceStatusEnum, ServiceTypeEnum, Slice, SliceId, SliceStatusEnum) +from interdomain.client.InterdomainClient import InterdomainClient +from service.client.ServiceClient import ServiceClient from slice.proto.slice_pb2_grpc import SliceServiceServicer LOGGER = logging.getLogger(__name__) SERVICE_NAME = 'Slice' -METHOD_NAMES = ['CreateUpdateSlice', 'DeleteSlice'] +METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class SliceServiceServicerImpl(SliceServiceServicer): - def __init__(self): + def __init__( + self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient + ): LOGGER.debug('Creating Servicer...') + self.context_client = context_client + self.interdomain_client = interdomain_client + self.service_client = service_client LOGGER.debug('Servicer Created') + def create_update(self, request : Slice) -> SliceId: + slice_id = self.context_client.SetSlice(request) + if len(request.slice_endpoint_ids) != 2: return slice_id + + domains = set() + for slice_endpoint_id in request.slice_endpoint_ids: + device_uuid = slice_endpoint_id.device_id.device_uuid.uuid + domains.add(device_uuid.split('@')[1]) + + is_multi_domain = len(domains) == 2 + if is_multi_domain: + slice_id = self.interdomain_client.RequestSlice(request) + else: + # pylint: disable=no-member + service_request = Service() + service_request.service_id.context_id.context_uuid.uuid = request.slice_id.context_id.context_uuid.uuid + service_request.service_id.service_uuid.uuid = request.slice_id.slice_uuid.uuid + service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM + service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED + + service_reply = self.service_client.CreateService(service_request) + if service_reply != service_request.service_id: # pylint: disable=no-member + raise Exception('Service creation failed. Wrong Service Id was returned') + + config_rule = service_request.service_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.resource_key = '/settings' + config_rule.resource_value = json.dumps( + {'mtu': 1512, 'address_families': ['IPV4'], 'bgp_as': 65000, 'bgp_route_target': '65000:333'}, + sort_keys=True) + + for slice_endpoint_id in request.slice_endpoint_ids: + device_uuid = slice_endpoint_id.device_id.device_uuid.uuid + endpoint_uuid = slice_endpoint_id.endpoint_uuid.uuid + + endpoint_id = service_request.service_endpoint_ids.add() + endpoint_id.device_id.device_uuid.uuid = device_uuid + endpoint_id.endpoint_uuid.uuid = endpoint_uuid + + config_rule = service_request.service_config.config_rules.add() + config_rule.action = ConfigActionEnum.CONFIGACTION_SET + config_rule.resource_key = '/device[{:s}]/endpoint[{:s}]/settings'.format(device_uuid, endpoint_uuid) + config_rule.resource_value = json.dumps( + {'router_id': '0.0.0.0', 'route_distinguisher': '0:0', 'sub_interface_index': 0, 'vlan_id': 0, + 'address_ip': '0.0.0.0', 'address_prefix': 0}, + sort_keys=True) + + service_reply = self.service_client.UpdateService(service_request) + if service_reply != service_request.service_id: # pylint: disable=no-member + raise Exception('Service update failed. Wrong Service Id was returned') + + reply = Slice() + reply.CopyFrom(request) + slice_service_id = reply.slice_service_ids.add() + slice_service_id.CopyFrom(service_reply) + self.context_client.SetSlice(reply) + slice_id = reply.slice_id + + slice_ = self.context_client.GetSlice(slice_id) + slice_active = Slice() + slice_active.CopyFrom(slice_) + slice_active.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE + self.context_client.SetSlice(slice_active) + return slice_id + + @safe_and_metered_rpc_method(METRICS, LOGGER) + def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + #try: + # slice_ = self.context_client.GetSlice(request.slice_id) + # slice_id = slice_.slice_id + #except grpc.RpcError: + # slice_id = self.context_client.SetSlice(request) + #return slice_id + return self.create_update(request) + @safe_and_metered_rpc_method(METRICS, LOGGER) - def CreateUpdateSlice(self, request : TransportSlice, context : grpc.ServicerContext) -> SliceStatus: - return SliceStatus() + def UpdateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: + #slice_id = self.context_client.SetSlice(request) + #if len(request.slice_endpoint_ids) != 2: return slice_id + # + #domains = set() + #for slice_endpoint_id in request.slice_endpoint_ids: + # device_uuid = slice_endpoint_id.device_id.device_uuid.uuid + # domains.add(device_uuid.split('@')[0]) + # + #is_multi_domain = len(domains) == 2 + #if is_multi_domain: + # return self.interdomain_client.LookUpSlice(request) + #else: + # raise NotImplementedError('Slice should create local services for single domain slice') + return self.create_update(request) @safe_and_metered_rpc_method(METRICS, LOGGER) - def DeleteSlice(self, request : TransportSlice, context : grpc.ServicerContext) -> Empty: + def DeleteSlice(self, request : SliceId, context : grpc.ServicerContext) -> Empty: return Empty() diff --git a/src/slice/service/__main__.py b/src/slice/service/__main__.py index 54bd059d253cd4b878b13c971ff0221ed60f9b0b..76bb5fa34eac45c828413d4671e023958a886d1b 100644 --- a/src/slice/service/__main__.py +++ b/src/slice/service/__main__.py @@ -15,8 +15,13 @@ import logging, signal, sys, threading from prometheus_client import start_http_server from common.Settings import get_setting, wait_for_environment_variables -#from common.database.Factory import get_database -from slice.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT +from context.client.ContextClient import ContextClient +from interdomain.client.InterdomainClient import InterdomainClient +from service.client.ServiceClient import ServiceClient +from slice.Config import ( + CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, + INTERDOMAIN_SERVICE_HOST, INTERDOMAIN_SERVICE_PORT, LOG_LEVEL, METRICS_PORT, SERVICE_SERVICE_HOST, + SERVICE_SERVICE_PORT) from .SliceService import SliceService terminate = threading.Event() @@ -29,24 +34,27 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - grpc_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD ) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) + grpc_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) + max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) + grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD ) + log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) + metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - #'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', - #'MONITORINGSERVICE_SERVICE_HOST', 'MONITORINGSERVICE_SERVICE_PORT_GRPC' + 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', + 'INTERDOMAINSERVICE_SERVICE_HOST', 'INTERDOMAINSERVICE_SERVICE_PORT_GRPC', + 'SERVICESERVICE_SERVICE_HOST', 'SERVICESERVICE_SERVICE_PORT_GRPC', ]) - #context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) - #context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT ) - #monitoring_service_host = get_setting('MONITORINGSERVICE_SERVICE_HOST', default=MONITORING_SERVICE_HOST) - #monitoring_service_port = get_setting('MONITORINGSERVICE_SERVICE_PORT_GRPC', default=MONITORING_SERVICE_PORT) + context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) + context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT ) + interdomain_service_host = get_setting('INTERDOMAINSERVICE_SERVICE_HOST', default=INTERDOMAIN_SERVICE_HOST) + interdomain_service_port = get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=INTERDOMAIN_SERVICE_PORT) + service_service_host = get_setting('SERVICESERVICE_SERVICE_HOST', default=SERVICE_SERVICE_HOST ) + service_service_port = get_setting('SERVICESERVICE_SERVICE_PORT_GRPC', default=SERVICE_SERVICE_PORT ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -56,25 +64,28 @@ def main(): # Start metrics server start_http_server(metrics_port) - ## Initialize Context Client - #if context_service_host is None or context_service_port is None: - # raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( - # str(context_service_host), str(context_service_port))) - #context_client = ContextClient(context_service_host, context_service_port) + # Initialize Context Client + if context_service_host is None or context_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( + str(context_service_host), str(context_service_port))) + context_client = ContextClient(context_service_host, context_service_port) - ## Initialize Monitoring Client - #if monitoring_service_host is None or monitoring_service_port is None: - # raise Exception('Wrong address({:s}):port({:s}) of Monitoring component'.format( - # str(monitoring_service_host), str(monitoring_service_port))) - #monitoring_client = MonitoringClient(monitoring_service_host, monitoring_service_port) + # Initialize Interdomain Client + if interdomain_service_host is None or interdomain_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Interdomain component'.format( + str(interdomain_service_host), str(interdomain_service_port))) + interdomain_client = InterdomainClient(interdomain_service_host, interdomain_service_port) - ## Get database instance - #database = get_database() + # Initialize Service Client + if service_service_host is None or service_service_port is None: + raise Exception('Wrong address({:s}):port({:s}) of Service component'.format( + str(service_service_host), str(service_service_port))) + service_client = ServiceClient(service_service_host, service_service_port) # Starting slice service grpc_service = SliceService( - #database, - port=grpc_service_port, max_workers=max_workers, grace_period=grace_period) + context_client, interdomain_client, service_client, port=grpc_service_port, max_workers=max_workers, + grace_period=grace_period) grpc_service.start() # Wait for Ctrl+C or termination signal diff --git a/src/tests/oeccpsc22/deploy_in_kubernetes.sh b/src/tests/oeccpsc22/deploy_in_kubernetes.sh index e350b835501400f7aaa6bf99ffe25aa6ae4620a4..426e07e1376207065b02db3205e46dd2cbe9a39d 100755 --- a/src/tests/oeccpsc22/deploy_in_kubernetes.sh +++ b/src/tests/oeccpsc22/deploy_in_kubernetes.sh @@ -15,19 +15,145 @@ # OECC/PSC 22 deployment settings - export REGISTRY_IMAGE="" export COMPONENTS="context device monitoring service slice interdomain compute" # webui export IMAGE_TAG="oeccpsc22" export K8S_HOSTNAME="kubernetes-master" #export GRAFANA_PASSWORD="admin123+" -# Deploy TeraFlow instance 1 -export K8S_NAMESPACE="oeccpsc22-1" -export EXTRA_MANIFESTS="./oeccpsc22/expose_services_teraflow_1.yaml" -./deploy_in_kubernetes.sh +# Constants +GITLAB_REPO_URL="registry.gitlab.com/teraflow-h2020/controller" +TMP_FOLDER="./tmp" + +# Create a tmp folder for files modified during the deployment +TMP_MANIFESTS_FOLDER="$TMP_FOLDER/manifests" +mkdir -p $TMP_MANIFESTS_FOLDER +TMP_LOGS_FOLDER="$TMP_FOLDER/logs" +mkdir -p $TMP_LOGS_FOLDER + +export K8S_NAMESPACE_1="oeccpsc22-1" +export K8S_NAMESPACE_2="oeccpsc22-2" + +export EXTRA_MANIFESTS_1="./oeccpsc22/expose_services_teraflow_1.yaml" +export EXTRA_MANIFESTS_2="./oeccpsc22/expose_services_teraflow_2.yaml" + +echo "Deleting and Creating new namespaces..." +kubectl delete namespace $K8S_NAMESPACE_1 $K8S_NAMESPACE_2 +kubectl create namespace $K8S_NAMESPACE_1 +kubectl create namespace $K8S_NAMESPACE_2 +printf "\n" + +echo "Creating secrets for InfluxDB..." +#TODO: make sure to change this when having a production deployment +kubectl create secret generic influxdb-secrets --namespace=$K8S_NAMESPACE_1 --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True" +kubectl create secret generic monitoring-secrets --namespace=$K8S_NAMESPACE_1 --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost" + +kubectl create secret generic influxdb-secrets --namespace=$K8S_NAMESPACE_2 --from-literal=INFLUXDB_DB="monitoring" --from-literal=INFLUXDB_ADMIN_USER="teraflow" --from-literal=INFLUXDB_ADMIN_PASSWORD="teraflow" --from-literal=INFLUXDB_HTTP_AUTH_ENABLED="True" +kubectl create secret generic monitoring-secrets --namespace=$K8S_NAMESPACE_2 --from-literal=INFLUXDB_DATABASE="monitoring" --from-literal=INFLUXDB_USER="teraflow" --from-literal=INFLUXDB_PASSWORD="teraflow" --from-literal=INFLUXDB_HOSTNAME="localhost" +printf "\n" + +echo "Pulling/Updating Docker images..." +docker pull redis:6.2 +docker pull influxdb:1.8 +docker pull grafana/grafana:8.2.6 +printf "\n" + +echo "Deploying components..." +for COMPONENT in $COMPONENTS; do + echo "Processing '$COMPONENT' component..." + IMAGE_NAME="$COMPONENT:$IMAGE_TAG" + IMAGE_URL="$REGISTRY_IMAGE/$IMAGE_NAME" + + echo " Building Docker image..." + BUILD_LOG="$TMP_LOGS_FOLDER/build_${COMPONENT}.log" + + if [ "$COMPONENT" == "automation" ] || [ "$COMPONENT" == "policy" ]; then + docker build -t "$IMAGE_NAME" -f ./src/"$COMPONENT"/Dockerfile ./src/"$COMPONENT"/ > "$BUILD_LOG" + else + docker build -t "$IMAGE_NAME" -f ./src/"$COMPONENT"/Dockerfile ./src/ > "$BUILD_LOG" + fi + + if [ -n "$REGISTRY_IMAGE" ]; then + echo "Pushing Docker image to '$REGISTRY_IMAGE'..." + + TAG_LOG="$TMP_LOGS_FOLDER/tag_${COMPONENT}.log" + docker tag "$IMAGE_NAME" "$IMAGE_URL" > "$TAG_LOG" + + PUSH_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}.log" + docker push "$IMAGE_URL" > "$PUSH_LOG" + fi + + echo " Adapting '$COMPONENT' manifest file..." + MANIFEST="$TMP_MANIFESTS_FOLDER/${COMPONENT}service.yaml" + cp ./manifests/"${COMPONENT}"service.yaml "$MANIFEST" + VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f3) + + if [ -n "$REGISTRY_IMAGE" ]; then + + sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" + sed -E -i "s#imagePullPolicy: .*#imagePullPolicy: Always#g" "$MANIFEST" + + else + sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT:${VERSION}#image: $IMAGE_NAME#g" "$MANIFEST" + sed -E -i "s#imagePullPolicy: .*#imagePullPolicy: Never#g" "$MANIFEST" + fi + + echo " Deploying '$COMPONENT' component to Kubernetes $K8S_NAMESPACE_1..." + DEPLOY_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}_1.log" + kubectl --namespace $K8S_NAMESPACE_1 apply -f "$MANIFEST" > "$DEPLOY_LOG" + kubectl --namespace $K8S_NAMESPACE_1 scale deployment --replicas=0 ${COMPONENT}service >> "$DEPLOY_LOG" + kubectl --namespace $K8S_NAMESPACE_1 scale deployment --replicas=1 ${COMPONENT}service >> "$DEPLOY_LOG" + + echo " Deploying '$COMPONENT' component to Kubernetes $K8S_NAMESPACE_2..." + DEPLOY_LOG="$TMP_LOGS_FOLDER/push_${COMPONENT}_2.log" + kubectl --namespace $K8S_NAMESPACE_2 apply -f "$MANIFEST" > "$DEPLOY_LOG" + kubectl --namespace $K8S_NAMESPACE_2 scale deployment --replicas=0 ${COMPONENT}service >> "$DEPLOY_LOG" + kubectl --namespace $K8S_NAMESPACE_2 scale deployment --replicas=1 ${COMPONENT}service >> "$DEPLOY_LOG" + + printf "\n" +done + +echo "Deploying extra manifests to Kubernetes $K8S_NAMESPACE_1..." +for EXTRA_MANIFEST in $EXTRA_MANIFESTS_1; do + echo "Processing manifest '$EXTRA_MANIFEST'..." + kubectl --namespace $K8S_NAMESPACE_1 apply -f $EXTRA_MANIFEST + printf "\n" +done + +echo "Deploying extra manifests to Kubernetes $K8S_NAMESPACE_2..." +for EXTRA_MANIFEST in $EXTRA_MANIFESTS_2; do + echo "Processing manifest '$EXTRA_MANIFEST'..." + kubectl --namespace $K8S_NAMESPACE_2 apply -f $EXTRA_MANIFEST + printf "\n" +done + +# By now, leave this control here. Some component dependencies are not well handled +for COMPONENT in $COMPONENTS; do + echo "Waiting for '$COMPONENT' component in Kubernetes $K8S_NAMESPACE_1..." + kubectl wait --namespace $K8S_NAMESPACE_1 --for='condition=available' --timeout=300s deployment/${COMPONENT}service + printf "\n" + + echo "Waiting for '$COMPONENT' component in Kubernetes $K8S_NAMESPACE_2..." + kubectl wait --namespace $K8S_NAMESPACE_2 --for='condition=available' --timeout=300s deployment/${COMPONENT}service + printf "\n" +done + +if [[ "$COMPONENTS" == *"webui"* ]]; then + echo "Configuring WebUI DataStores and Dashboards..." + ./configure_dashboards.sh + printf "\n\n" +fi + +echo "Removing dangling docker images..." +docker images --filter="dangling=true" --quiet | xargs -r docker rmi +printf "\n" + +echo "Reporting Deployment in Kubernetes $K8S_NAMESPACE_1..." +kubectl --namespace $K8S_NAMESPACE_1 get all +printf "\n" + +echo "Reporting Deployment in Kubernetes $K8S_NAMESPACE_2..." +kubectl --namespace $K8S_NAMESPACE_2 get all +printf "\n" -# Deploy TeraFlow instance 2 -export K8S_NAMESPACE="oeccpsc22-2" -export EXTRA_MANIFESTS="./oeccpsc22/expose_services_teraflow_2.yaml" -./deploy_in_kubernetes.sh +echo "Done!" diff --git a/src/tests/oeccpsc22/dump_logs.sh b/src/tests/oeccpsc22/dump_logs.sh new file mode 100755 index 0000000000000000000000000000000000000000..c3c2e62845666976bb889a491a243b0425b14f38 --- /dev/null +++ b/src/tests/oeccpsc22/dump_logs.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +mkdir -p tmp/exec_logs/ + +kubectl --namespace oeccpsc22-1 logs deployment/computeservice -c server > tmp/exec_logs/d1_compute.log +kubectl --namespace oeccpsc22-1 logs deployment/contextservice -c server > tmp/exec_logs/d1_context.log +kubectl --namespace oeccpsc22-1 logs deployment/deviceservice -c server > tmp/exec_logs/d1_device.log +kubectl --namespace oeccpsc22-1 logs deployment/interdomainservice -c server > tmp/exec_logs/d1_interdomain.log +kubectl --namespace oeccpsc22-1 logs deployment/monitoringservice -c server > tmp/exec_logs/d1_monitoring.log +kubectl --namespace oeccpsc22-1 logs deployment/serviceservice -c server > tmp/exec_logs/d1_service.log +kubectl --namespace oeccpsc22-1 logs deployment/sliceservice -c server > tmp/exec_logs/d1_slice.log + +kubectl --namespace oeccpsc22-2 logs deployment/computeservice -c server > tmp/exec_logs/d2_compute.log +kubectl --namespace oeccpsc22-2 logs deployment/contextservice -c server > tmp/exec_logs/d2_context.log +kubectl --namespace oeccpsc22-2 logs deployment/deviceservice -c server > tmp/exec_logs/d2_device.log +kubectl --namespace oeccpsc22-2 logs deployment/interdomainservice -c server > tmp/exec_logs/d2_interdomain.log +kubectl --namespace oeccpsc22-2 logs deployment/monitoringservice -c server > tmp/exec_logs/d2_monitoring.log +kubectl --namespace oeccpsc22-2 logs deployment/serviceservice -c server > tmp/exec_logs/d2_service.log +kubectl --namespace oeccpsc22-2 logs deployment/sliceservice -c server > tmp/exec_logs/d2_slice.log diff --git a/src/tests/oeccpsc22/expose_services_teraflow_1.yaml b/src/tests/oeccpsc22/expose_services_teraflow_1.yaml index 6d2f0bed72633608a2a0cd80235e7ce4b3d723c5..f2b8de0b1629082eab1a5e638c0e712db47ed0bd 100644 --- a/src/tests/oeccpsc22/expose_services_teraflow_1.yaml +++ b/src/tests/oeccpsc22/expose_services_teraflow_1.yaml @@ -40,6 +40,11 @@ spec: port: 1010 targetPort: 1010 nodePort: 30111 + - name: rest + protocol: TCP + port: 8080 + targetPort: 8080 + nodePort: 30001 - name: redis protocol: TCP port: 6379 diff --git a/src/tests/oeccpsc22/expose_services_teraflow_2.yaml b/src/tests/oeccpsc22/expose_services_teraflow_2.yaml index 32974848ea91560d9fbfe511cae99010a7471a5d..8254c00aa09c9e15a047fd60b6140c68ef1f0e52 100644 --- a/src/tests/oeccpsc22/expose_services_teraflow_2.yaml +++ b/src/tests/oeccpsc22/expose_services_teraflow_2.yaml @@ -18,7 +18,7 @@ metadata: name: remote-teraflow spec: type: ExternalName - externalName: interdomainservice.oeccpsc22-2.svc.cluster.local + externalName: interdomainservice.oeccpsc22-1.svc.cluster.local ports: - name: grpc protocol: TCP @@ -40,6 +40,11 @@ spec: port: 1010 targetPort: 1010 nodePort: 30112 + - name: rest + protocol: TCP + port: 8080 + targetPort: 8080 + nodePort: 30002 - name: redis protocol: TCP port: 6379 diff --git a/src/tests/oeccpsc22/tests/Objects_Domain_1.py b/src/tests/oeccpsc22/tests/Objects_Domain_1.py index 6fefc2d822680bc1173d6d0544eb31d54e4f92ff..8b26348c94b827e4e418a458f21b28a863c4cb68 100644 --- a/src/tests/oeccpsc22/tests/Objects_Domain_1.py +++ b/src/tests/oeccpsc22/tests/Objects_Domain_1.py @@ -40,38 +40,44 @@ D1_DEVICE_ENDPOINT_DEFS = [ ('3/5', '10Gbps', []), ('3/6', '10Gbps', []), ('3/7', '10Gbps', []), ('3/8', '10Gbps', []), ] -D1_DEVICE_D1R1_UUID = 'D1-R1' +D1_DEVICE_D1R1_UUID = 'R1@D1' D1_DEVICE_D1R1_ID = json_device_id(D1_DEVICE_D1R1_UUID) D1_DEVICE_D1R1 = json_device_emulated_packet_router_disabled(D1_DEVICE_D1R1_UUID) D1_DEVICE_D1R1_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) -D1_DEVICE_D1R2_UUID = 'D1-R2' +D1_DEVICE_D1R2_UUID = 'R2@D1' D1_DEVICE_D1R2_ID = json_device_id(D1_DEVICE_D1R2_UUID) D1_DEVICE_D1R2 = json_device_emulated_packet_router_disabled(D1_DEVICE_D1R2_UUID) D1_DEVICE_D1R2_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) -D1_DEVICE_D1R3_UUID = 'D1-R3' +D1_DEVICE_D1R3_UUID = 'R3@D1' D1_DEVICE_D1R3_ID = json_device_id(D1_DEVICE_D1R3_UUID) D1_DEVICE_D1R3 = json_device_emulated_packet_router_disabled(D1_DEVICE_D1R3_UUID) D1_DEVICE_D1R3_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) -D1_DEVICE_D1R4_UUID = 'D1-R4' +D1_DEVICE_D1R4_UUID = 'R4@D1' D1_DEVICE_D1R4_ID = json_device_id(D1_DEVICE_D1R4_UUID) D1_DEVICE_D1R4 = json_device_emulated_packet_router_disabled(D1_DEVICE_D1R4_UUID) D1_DEVICE_D1R4_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) # Virtual devices on remote domains -D1_DEVICE_D2R1_UUID = 'D2-R1' +D1_DEVICE_D2R1_UUID = 'R1@D2' D1_DEVICE_D2R1_ID = json_device_id(D1_DEVICE_D2R1_UUID) D1_DEVICE_D2R1 = json_device_emulated_packet_router_disabled(D1_DEVICE_D2R1_UUID) D1_DEVICE_D2R1_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) +D1_DEVICE_D2R4_UUID = 'R4@D2' +D1_DEVICE_D2R4_ID = json_device_id(D1_DEVICE_D2R4_UUID) +D1_DEVICE_D2R4 = json_device_emulated_packet_router_disabled(D1_DEVICE_D2R4_UUID) +D1_DEVICE_D2R4_CONNECT_RULES = json_device_emulated_connect_rules(D1_DEVICE_ENDPOINT_DEFS) + D1_ENDPOINT_IDS = {} D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D1R1_ID, D1_DEVICE_ENDPOINT_DEFS)) D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D1R2_ID, D1_DEVICE_ENDPOINT_DEFS)) D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D1R3_ID, D1_DEVICE_ENDPOINT_DEFS)) D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D1R4_ID, D1_DEVICE_ENDPOINT_DEFS)) D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D2R1_ID, D1_DEVICE_ENDPOINT_DEFS)) +D1_ENDPOINT_IDS.update(json_endpoint_ids(D1_DEVICE_D2R4_ID, D1_DEVICE_ENDPOINT_DEFS)) # ----- Links ---------------------------------------------------------------------------------------------------------- @@ -118,6 +124,7 @@ D1_DEVICES = [ (D1_DEVICE_D1R3, D1_DEVICE_D1R3_CONNECT_RULES), (D1_DEVICE_D1R4, D1_DEVICE_D1R4_CONNECT_RULES), (D1_DEVICE_D2R1, D1_DEVICE_D2R1_CONNECT_RULES), + (D1_DEVICE_D2R4, D1_DEVICE_D2R4_CONNECT_RULES), ] D1_LINKS = [ diff --git a/src/tests/oeccpsc22/tests/Objects_Domain_2.py b/src/tests/oeccpsc22/tests/Objects_Domain_2.py index ec641dce4675a3965a11b2cacfb08777f8e0aa4f..f9133809243effc0a7d22c953046a4af4d6bad3e 100644 --- a/src/tests/oeccpsc22/tests/Objects_Domain_2.py +++ b/src/tests/oeccpsc22/tests/Objects_Domain_2.py @@ -40,38 +40,44 @@ D2_DEVICE_ENDPOINT_DEFS = [ ('3/5', '10Gbps', []), ('3/6', '10Gbps', []), ('3/7', '10Gbps', []), ('3/8', '10Gbps', []), ] -D2_DEVICE_D2R1_UUID = 'D2-R1' +D2_DEVICE_D2R1_UUID = 'R1@D2' D2_DEVICE_D2R1_ID = json_device_id(D2_DEVICE_D2R1_UUID) D2_DEVICE_D2R1 = json_device_emulated_packet_router_disabled(D2_DEVICE_D2R1_UUID) D2_DEVICE_D2R1_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) -D2_DEVICE_D2R2_UUID = 'D2-R2' +D2_DEVICE_D2R2_UUID = 'R2@D2' D2_DEVICE_D2R2_ID = json_device_id(D2_DEVICE_D2R2_UUID) D2_DEVICE_D2R2 = json_device_emulated_packet_router_disabled(D2_DEVICE_D2R2_UUID) D2_DEVICE_D2R2_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) -D2_DEVICE_D2R3_UUID = 'D2-R3' +D2_DEVICE_D2R3_UUID = 'R3@D2' D2_DEVICE_D2R3_ID = json_device_id(D2_DEVICE_D2R3_UUID) D2_DEVICE_D2R3 = json_device_emulated_packet_router_disabled(D2_DEVICE_D2R3_UUID) D2_DEVICE_D2R3_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) -D2_DEVICE_D2R4_UUID = 'D2-R4' +D2_DEVICE_D2R4_UUID = 'R4@D2' D2_DEVICE_D2R4_ID = json_device_id(D2_DEVICE_D2R4_UUID) D2_DEVICE_D2R4 = json_device_emulated_packet_router_disabled(D2_DEVICE_D2R4_UUID) D2_DEVICE_D2R4_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) # Virtual devices on remote domains -D2_DEVICE_D1R1_UUID = 'D1-R1' +D2_DEVICE_D1R1_UUID = 'R1@D1' D2_DEVICE_D1R1_ID = json_device_id(D2_DEVICE_D1R1_UUID) D2_DEVICE_D1R1 = json_device_emulated_packet_router_disabled(D2_DEVICE_D1R1_UUID) D2_DEVICE_D1R1_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) +D2_DEVICE_D1R4_UUID = 'R4@D1' +D2_DEVICE_D1R4_ID = json_device_id(D2_DEVICE_D1R4_UUID) +D2_DEVICE_D1R4 = json_device_emulated_packet_router_disabled(D2_DEVICE_D1R4_UUID) +D2_DEVICE_D1R4_CONNECT_RULES = json_device_emulated_connect_rules(D2_DEVICE_ENDPOINT_DEFS) + D2_ENDPOINT_IDS = {} D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D2R1_ID, D2_DEVICE_ENDPOINT_DEFS)) D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D2R2_ID, D2_DEVICE_ENDPOINT_DEFS)) D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D2R3_ID, D2_DEVICE_ENDPOINT_DEFS)) D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D2R4_ID, D2_DEVICE_ENDPOINT_DEFS)) D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D1R1_ID, D2_DEVICE_ENDPOINT_DEFS)) +D2_ENDPOINT_IDS.update(json_endpoint_ids(D2_DEVICE_D1R4_ID, D2_DEVICE_ENDPOINT_DEFS)) # ----- Links ---------------------------------------------------------------------------------------------------------- @@ -118,6 +124,7 @@ D2_DEVICES = [ (D2_DEVICE_D2R3, D2_DEVICE_D2R3_CONNECT_RULES), (D2_DEVICE_D2R4, D2_DEVICE_D2R4_CONNECT_RULES), (D2_DEVICE_D1R1, D2_DEVICE_D1R1_CONNECT_RULES), + (D2_DEVICE_D1R4, D2_DEVICE_D1R4_CONNECT_RULES), ] D2_LINKS = [