diff --git a/manifests/e2eorchestratorservice.yaml b/manifests/e2eorchestratorservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..ba2e4fabddd89df539831ff5b68cc3c8fd77efdb --- /dev/null +++ b/manifests/e2eorchestratorservice.yaml @@ -0,0 +1,96 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: e2eorchestratorservice +spec: + selector: + matchLabels: + app: e2eorchestratorservice + template: + metadata: + labels: + app: e2eorchestratorservice + spec: + terminationGracePeriodSeconds: 5 + containers: + - name: server + image: labs.etsi.org:5050/tfs/controller/e2eorchestrator:latest + imagePullPolicy: Always + ports: + - containerPort: 10040 + - containerPort: 9192 + env: + - name: LOG_LEVEL + value: "INFO" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD + readinessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:10009"] + livenessProbe: + exec: + command: ["/bin/grpc_health_probe", "-addr=:10009"] + resources: + requests: + cpu: 250m + memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi +--- +apiVersion: v1 +kind: Service +metadata: + name: e2eorchestratorservice + labels: + app: e2eorchestratorservice +spec: + type: ClusterIP + selector: + app: e2eorchestratorservice + ports: + - name: grpc + port: 10040 + targetPort: 10040 + - name: metrics + port: 9192 + targetPort: 9192 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: e2eorchestratorservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: e2eorchestratorservice + minReplicas: 1 + maxReplicas: 20 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + #behavior: + # scaleDown: + # stabilizationWindowSeconds: 30 diff --git a/my_deploy.sh b/my_deploy.sh index 888fc98903eb665729d7e0843cf9e9fc8b60741d..358ddcf087429e35b058efe1fbf12ab529097d87 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -37,6 +37,10 @@ export TFS_COMPONENTS="context device pathcomp service slice compute webui load_ # Uncomment to activate TE #export TFS_COMPONENTS="${TFS_COMPONENTS} te" +# Uncomment to activate E2EOrchestrator +export TFS_COMPONENTS="${TFS_COMPONENTS} e2eorchestrator" + + # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" @@ -102,7 +106,7 @@ export NATS_EXT_PORT_CLIENT="4222" export NATS_EXT_PORT_HTTP="8222" # Disable flag for re-deploying NATS from scratch. -export NATS_REDEPLOY="" +export NATS_REDEPLOY="YES" # ----- QuestDB ---------------------------------------------------------------- diff --git a/proto/context.proto b/proto/context.proto index 22e11bc68b840115a19551958ac322acb71fb9a4..4068bf1f85fc6d4fa74f08dab5434c4a97d2350f 100644 --- a/proto/context.proto +++ b/proto/context.proto @@ -279,6 +279,7 @@ enum ServiceTypeEnum { SERVICETYPE_L2NM = 2; SERVICETYPE_TAPI_CONNECTIVITY_SERVICE = 3; SERVICETYPE_TE = 4; + SERVICETYPE_E2E = 5; } enum ServiceStatusEnum { diff --git a/proto/e2eorchestrator.proto b/proto/e2eorchestrator.proto new file mode 100644 index 0000000000000000000000000000000000000000..9eed8523e52faa32d4397bb4635a1cf0c56aa4d5 --- /dev/null +++ b/proto/e2eorchestrator.proto @@ -0,0 +1,39 @@ +// Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3 +syntax = "proto3"; +package orchestrator; + +import "context.proto"; + + +service E2EOrchestratorService { + rpc Compute(E2EOrchestratorRequest) returns (E2EOrchestratorReply) {} +} + +message E2EOrchestratorRequest { + context.Service service = 1; +} + +message E2EOrchestratorReply { + // Service requested completed with possible missing fields, and + // sub-services required for supporting requested service on the + // underlying layers. + repeated context.Service services = 1; + + // Connections supporting the requested service and sub-services + // required for the underlying layers. + repeated context.Connection connections = 2; +} \ No newline at end of file diff --git a/scripts/show_logs_e2eorchestrator.sh b/scripts/show_logs_e2eorchestrator.sh new file mode 100755 index 0000000000000000000000000000000000000000..84951ed8dc24c145715c971b76cd22b79a920f98 --- /dev/null +++ b/scripts/show_logs_e2eorchestrator.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +######################################################################################################################## +# Define your deployment settings here +######################################################################################################################## + +# If not already set, set the name of the Kubernetes namespace to deploy to. +export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} + +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/e2eorchestratorservice -c server diff --git a/src/common/Constants.py b/src/common/Constants.py index 423f2558b71b189b9e771e5af94968d28f8777c0..3bf294a50641d378965b682edf7e9105f99add37 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -57,6 +57,7 @@ class ServiceNameEnum(Enum): OPTICALATTACKMITIGATOR = 'opticalattackmitigator' CACHING = 'caching' TE = 'te' + E2EORCHESTRATOR = 'e2eorchestrator' # Used for test and debugging only DLT_GATEWAY = 'dltgateway' @@ -82,6 +83,7 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.INTERDOMAIN .value : 10010, ServiceNameEnum.PATHCOMP .value : 10020, ServiceNameEnum.TE .value : 10030, + ServiceNameEnum.E2EORCHESTRATOR .value : 10040, # Used for test and debugging only ServiceNameEnum.DLT_GATEWAY .value : 50051, diff --git a/src/device/service/drivers/flexscale/FlexScaleDriver.py b/src/device/service/drivers/flexscale/FlexScaleDriver.py index f512a79233fac5d4e80aa1d06786ebf801e42772..1733f504dcc0bca511a093c9b4b32a48127e8f47 100644 --- a/src/device/service/drivers/flexscale/FlexScaleDriver.py +++ b/src/device/service/drivers/flexscale/FlexScaleDriver.py @@ -12,20 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging, requests, threading +import json, logging, requests, threading from requests.auth import HTTPBasicAuth from typing import Any, Iterator, List, Optional, Tuple, Union from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method from common.type_checkers.Checkers import chk_string, chk_type from device.service.driver_api._Driver import _Driver from . import ALL_RESOURCE_KEYS -from .Tools import find_key +from .Tools import find_key, add_lightpath, del_lightpath, get_lightpaths LOGGER = logging.getLogger(__name__) DRIVER_NAME = 'flexscale' METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME}) + class FlexScaleDriver(_Driver): def __init__(self, address: str, port: int, **settings) -> None: super().__init__(DRIVER_NAME, address, port, **settings) @@ -40,7 +41,7 @@ class FlexScaleDriver(_Driver): self.__timeout = int(self.settings.get('timeout', 120)) def Connect(self) -> bool: - url = self.__flexscale_root + '/TODO' # TODO + url = self.__flexscale_root + '/OpticalTFS/GetLightpaths' with self.__lock: if self.__started.is_set(): return True try: @@ -74,8 +75,8 @@ class FlexScaleDriver(_Driver): for i, resource_key in enumerate(resource_keys): str_resource_name = 'resource_key[#{:d}]'.format(i) chk_string(str_resource_name, resource_key, allow_empty=False) - # TODO results.extend(config_getter( - # self.__tapi_root, resource_key, timeout=self.__timeout, auth=self.__auth)) + results.extend(get_lightpaths( + self.__flexscale_root, resource_key, timeout=self.__timeout, auth=self.__auth)) return results @metered_subclass_method(METRICS_POOL) @@ -84,40 +85,48 @@ class FlexScaleDriver(_Driver): if len(resources) == 0: return results with self.__lock: - for resource in resources: + for _, resource in resources: LOGGER.info('resource = {:s}'.format(str(resource))) - # data = create_connectivity_service( TODO - # self.__tapi_root, uuid, input_sip, output_sip, direction, capacity_value, capacity_unit, - # layer_protocol_name, layer_protocol_qualifier, timeout=self.__timeout, auth=self.__auth) - data = None - results.extend(data) + src_node = find_key(resource, 'src_node') + dst_node = find_key(resource, 'dst_node') + bitrate = find_key(resource, 'bitrate') + + response = add_lightpath(self.__flexscale_root, src_node, dst_node, bitrate, + auth=self.__auth, timeout=self.__timeout) + results.extend(response) return results @metered_subclass_method(METRICS_POOL) def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: results = [] - if len(resources) == 0: return results + if len(resources) == 0: + return results with self.__lock: - for resource in resources: + for _, resource in resources: LOGGER.info('resource = {:s}'.format(str(resource))) - uuid = find_key(resource, 'uuid') - # results.extend(delete_connectivity_service( TODO - # self.__tapi_root, uuid, timeout=self.__timeout, auth=self.__auth)) + flow_id = find_key(resource, 'flow_id') + src_node = find_key(resource, 'src_node') + dst_node = find_key(resource, 'dst_node') + bitrate = find_key(resource, 'bitrate') + + response = del_lightpath(self.__flexscale_root, flow_id, src_node, dst_node, bitrate) + results.extend(response) + return results @metered_subclass_method(METRICS_POOL) def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - # TODO: TAPI does not support monitoring by now + # FlexScale does not support monitoring by now return [False for _ in subscriptions] @metered_subclass_method(METRICS_POOL) def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]: - # TODO: TAPI does not support monitoring by now + # FlexScale does not support monitoring by now return [False for _ in subscriptions] def GetState( self, blocking=False, terminate : Optional[threading.Event] = None ) -> Iterator[Tuple[float, str, Any]]: - # TODO: TAPI does not support monitoring by now + # FlexScale does not support monitoring by now return [] diff --git a/src/device/service/drivers/flexscale/Tools.py b/src/device/service/drivers/flexscale/Tools.py index 49a01b267bd2707b8888d29f2d4abf3bf146e716..15b60d7656159b0a76fa9493de7af5d76f5aa076 100644 --- a/src/device/service/drivers/flexscale/Tools.py +++ b/src/device/service/drivers/flexscale/Tools.py @@ -26,6 +26,121 @@ HTTP_OK_CODES = { 204, # No Content } -def find_key(resource, key): - return json.loads(resource[1])[key] +def get_lightpaths(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None, + timeout : Optional[int] = None): + headers = {'accept': 'application/json'} + url = '{:s}/OpticalTFS/GetLightpaths'.format(root_url) + result = [] + try: + response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth) + except requests.exceptions.Timeout: + LOGGER.exception('Timeout connecting {:s}'.format(url)) + return result + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception retrieving {:s}'.format(resource_key)) + result.append((resource_key, e)) + return result + + try: + flows = json.loads(response.content) + except Exception as e: # pylint: disable=broad-except + LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content))) + result.append((resource_key, e)) + return result + + # if resource_key == RESOURCE_ENDPOINTS: + for flow in flows: + flow_id = flow.get('flow_id') + source = flow.get('src') + destination = flow.get('dst') + bitrate = flow.get('bitrate') + # more TODO + + endpoint_url = '/flows/flow[{:s}]'.format(flow_id) + endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate} + result.append((endpoint_url, endpoint_data)) + + return result + + +def add_lightpath(root_url, src_node, dst_node, bitrate, + auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None): + + headers = {'accept': 'application/json'} + url = '{:s}/OpticalTFS/AddLightpath/{:s}/{:s}/{:s}'.format(root_url, src_node, dst_node, bitrate) + + results = [] + try: + response = requests.put(url=url, timeout=timeout, headers=headers, verify=False, auth=auth) + results.extend(response) + LOGGER.info('Lightpath request: {:s} <-> {:s} with {:s} bitrate'.format( + str(src_node), str(dst_node), str(bitrate))) + LOGGER.info('Response: {:s}'.format(str(response))) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'.format( + str(src_node), str(dst_node), str(bitrate))) + results.append(e) + else: + if response.status_code not in HTTP_OK_CODES: + msg = 'Could not create Lightpath(status_code={:s} reply={:s}' + LOGGER.error(msg.format(str(response.status_code), str(response))) + results.append(response.status_code in HTTP_OK_CODES) + + return results + + + +def del_lightpath(root_url, flow_id, src_node, dst_node, bitrate, + auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None): + url = '{:s}/OpticalTFS/DelLightpath/{:s}/{:s}/{:s}/{:s}'.format(root_url, flow_id, src_node, dst_node, bitrate) + headers = {'accept': 'application/json'} + + results = [] + + try: + response = requests.delete(url=url, timeout=timeout, headers=headers, verify=False, auth=auth) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception deleting Lightpath(uuid={:s})'.format(str(flow_id))) + results.append(e) + else: + if response.status_code not in HTTP_OK_CODES: + msg = 'Could not delete Lightpath(flow_id={:s}). status_code={:s} reply={:s}' + LOGGER.error(msg.format(str(flow_id), str(response.status_code), str(response))) + results.append(response.status_code in HTTP_OK_CODES) + + return results + + +def get_topology(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None, + timeout : Optional[int] = None): + headers = {'accept': 'application/json'} + url = '{:s}/OpticalTFS/GetLinks'.format(root_url) + + result = [] + try: + response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth) + except requests.exceptions.Timeout: + LOGGER.exception('Timeout connecting {:s}'.format(url)) + return result + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception retrieving {:s}'.format(resource_key)) + result.append((resource_key, e)) + return result + + try: + links = json.loads(response.content) + except Exception as e: # pylint: disable=broad-except + LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content))) + result.append((resource_key, e)) + return result + + # if resource_key == RESOURCE_ENDPOINTS: + for link in links: + # TODO + + # endpoint_url = '/flows/flow[{:s}]'.format(flow_id) + # endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate} + # result.append((endpoint_url, endpoint_data)) + + return result diff --git a/src/e2eorchestrator/.gitlab-ci.yml b/src/e2eorchestrator/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..a14a215afc33067e923a7064b8f4617c61f4de9d --- /dev/null +++ b/src/e2eorchestrator/.gitlab-ci.yml @@ -0,0 +1,38 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# build, tag and push the Docker image to the gitlab registry +build e2eorchestrator: + variables: + IMAGE_NAME: 'e2eorchestrator' # name of the microservice + IMAGE_TAG: 'latest' # tag of the container image (production, development, etc) + stage: build + before_script: + - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY + script: + - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . + - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + - docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" + after_script: + - docker images --filter="dangling=true" --quiet | xargs -r docker rmi + rules: + - if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)' + - if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"' + - changes: + - src/$IMAGE_NAME/**/*.{py,in,yml} + - src/$IMAGE_NAME/Dockerfile + - src/$IMAGE_NAME/tests/*.py + - src/$IMAGE_NAME/tests/Dockerfile + - manifests/${IMAGE_NAME}service.yaml + - .gitlab-ci.yml diff --git a/src/e2eorchestrator/Config.py b/src/e2eorchestrator/Config.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/e2eorchestrator/Config.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/e2eorchestrator/Dockerfile b/src/e2eorchestrator/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..52bd806f5c52c4033fb4fa54a0ee4ff773e901b4 --- /dev/null +++ b/src/e2eorchestrator/Dockerfile @@ -0,0 +1,84 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-slim + +# Install dependencies +RUN apt-get --yes --quiet --quiet update && \ + apt-get --yes --quiet --quiet install wget g++ && \ + rm -rf /var/lib/apt/lists/* + +# Set Python to show logs as they occur +ENV PYTHONUNBUFFERED=0 +ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python + +# Download the gRPC health probe +RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +# Creating a user for security reasons +RUN groupadd -r teraflow && useradd -u 1001 --no-log-init -r -m -g teraflow teraflow +USER teraflow + +# set working directory +RUN mkdir -p /home/teraflow/controller/common/ +WORKDIR /home/teraflow/controller + +# Get Python packages per module +ENV VIRTUAL_ENV=/home/teraflow/venv +RUN python3 -m venv ${VIRTUAL_ENV} +ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" + +# Get generic Python packages +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +COPY --chown=teraflow:teraflow common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /home/teraflow/controller/common +COPY --chown=teraflow:teraflow src/common/. ./ +RUN rm -rf proto + +# Create proto sub-folder, copy .proto files, and generate Python code +RUN mkdir -p /home/teraflow/controller/common/proto +WORKDIR /home/teraflow/controller/common/proto +RUN touch __init__.py +COPY --chown=teraflow:teraflow proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; + +# Create module sub-folders +RUN mkdir -p /home/teraflow/controller/e2eorchestrator +WORKDIR /home/teraflow/controller + +# Get Python packages per module +COPY --chown=teraflow:teraflow ./src/e2eorchestrator/requirements.in e2eorchestrator/requirements.in +# consider common and specific requirements to avoid inconsistencies with dependencies +RUN pip-compile --quiet --output-file=e2eorchestrator/requirements.txt e2eorchestrator/requirements.in common_requirements.in +RUN python3 -m pip install -r e2eorchestrator/requirements.txt + +# Add component files into working directory +COPY --chown=teraflow:teraflow ./src/context/. context +COPY --chown=teraflow:teraflow ./src/e2eorchestrator/. e2eorchestrator + +# Start the service +ENTRYPOINT ["python", "-m", "e2eorchestrator.service"] diff --git a/src/e2eorchestrator/__init__.py b/src/e2eorchestrator/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/e2eorchestrator/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/e2eorchestrator/client/E2EOrchestratorServiceClient.py b/src/e2eorchestrator/client/E2EOrchestratorServiceClient.py new file mode 100644 index 0000000000000000000000000000000000000000..6efb0e3fc0333c00c8dac39d869f8f2104fa7367 --- /dev/null +++ b/src/e2eorchestrator/client/E2EOrchestratorServiceClient.py @@ -0,0 +1,68 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import grpc + +from common.Constants import ServiceNameEnum +from common.proto.context_pb2 import Empty +from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceStub +from common.Settings import get_service_host, get_service_port_grpc +from common.tools.client.RetryDecorator import delay_exponential, retry +from common.tools.grpc.Tools import grpc_message_to_json + +LOGGER = logging.getLogger(__name__) +MAX_RETRIES = 15 +DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) +RETRY_DECORATOR = retry( + max_retries=MAX_RETRIES, + delay_function=DELAY_FUNCTION, + prepare_method_name="connect", +) + + +class E2EOrchestratorServiceClient: + def __init__(self, host=None, port=None): + if not host: + host = get_service_host(ServiceNameEnum.E2EORCHESTRATOR) + if not port: + port = get_service_port_grpc(ServiceNameEnum.E2EORCHESTRATOR) + self.endpoint = "{:s}:{:s}".format(str(host), str(port)) + LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) + self.channel = None + self.stub = None + self.connect() + LOGGER.debug("Channel created") + + def connect(self): + self.channel = grpc.insecure_channel(self.endpoint) + self.stub = E2EOrchestratorServiceStub(self.channel) + + def close(self): + if self.channel is not None: + self.channel.close() + self.channel = None + self.stub = None + + @RETRY_DECORATOR + def Compute(self, request: Empty) -> Empty: + LOGGER.debug( + "Compute request: {:s}".format(str(grpc_message_to_json(request))) + ) + response = self.stub.GetPath(request) + LOGGER.debug( + "Compute result: {:s}".format(str(grpc_message_to_json(response))) + ) + return response diff --git a/src/e2eorchestrator/client/__init__.py b/src/e2eorchestrator/client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/e2eorchestrator/client/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/e2eorchestrator/requirements.in b/src/e2eorchestrator/requirements.in new file mode 100644 index 0000000000000000000000000000000000000000..4c4720a2df4482faeda1ad99f9d383ebb5c0f848 --- /dev/null +++ b/src/e2eorchestrator/requirements.in @@ -0,0 +1,15 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +networkx \ No newline at end of file diff --git a/src/e2eorchestrator/service/E2EOrchestratorService.py b/src/e2eorchestrator/service/E2EOrchestratorService.py new file mode 100644 index 0000000000000000000000000000000000000000..4d6125d4a11210786de5cb83970743f480d4c8cf --- /dev/null +++ b/src/e2eorchestrator/service/E2EOrchestratorService.py @@ -0,0 +1,35 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from common.Constants import ServiceNameEnum +from common.proto.e2eorchestrator_pb2_grpc import add_E2EOrchestratorServiceServicer_to_server +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService +from .E2EOrchestratorServiceServicerImpl import E2EOrchestratorServiceServicerImpl + +LOGGER = logging.getLogger(__name__) + + +class E2EOrchestratorService(GenericGrpcService): + def __init__(self, cls_name: str = __name__): + port = get_service_port_grpc(ServiceNameEnum.E2EORCHESTRATOR) + super().__init__(port, cls_name=cls_name) + self.e2eorchestrator_servicer = E2EOrchestratorServiceServicerImpl() + + def install_servicers(self): + add_E2EOrchestratorServiceServicer_to_server( + self.e2eorchestrator_servicer, self.server + ) diff --git a/src/e2eorchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2eorchestrator/service/E2EOrchestratorServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..8e5edfd63050d58c0da330ef164fce1240d43a20 --- /dev/null +++ b/src/e2eorchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -0,0 +1,89 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import networkx as nx +import grpc +import copy + +from common.Constants import ServiceNameEnum +from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method) +from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply +from common.proto.context_pb2 import Empty, Connection, EndPointId +from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer +from context.client.ContextClient import ContextClient + + +LOGGER = logging.getLogger(__name__) + +METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") + +context_client: ContextClient = ContextClient() + + +class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): + def __init__(self): + LOGGER.debug("Creating Servicer...") + LOGGER.debug("Servicer Created") + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: + + endpoints_ids = [] + for endpoint_id in request.service_endpoint_ids: + endpoints_ids.append(endpoint_id.endpoint_uuid.uuid) + + graph = nx.Graph() + + devices = context_client.ListDevices(Empty()).devices + + for device in devices: + endpoints_uuids = [endpoint.endpoint_uuid.uuid for endpoint in device.device_endpoints] + for ep in endpoints_uuids: + graph.add_node(ep) + + for ep in endpoints_uuids: + for ep_i in endpoints_uuids: + if ep == ep_i: + continue + graph.add_edge(ep. ep_i) + + links = context_client.ListLinks(Empty()).links + for link in links: + eps = [] + for endpoint_id in link.link_endpoint_ids: + eps.append(endpoint_id.endpoint_uuid.uuid) + graph.add_edge(eps[0], eps[1]) + + + shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) + + path = E2EOrchestratorReply() + path.services.append(copy.deepcopy(request.service)) + for i in len(shortest): + conn = Connection + conn.connection_id.connection_uuid.uuid = str(shortest[i*2]) + '_->_' + str(shortest[i*2+1]) + + ep0 = EndPointId + ep0.endpoint_uuid.uuid = ep + conn.path_hops_endpoint_ids.append(shortest[i*2]) + + ep1 = EndPointId + ep1.endpoint_uuid.uuid = ep + conn.path_hops_endpoint_ids.append(shortest[i*2+1]) + + path.connections.append(conn) + + return path diff --git a/src/e2eorchestrator/service/__init__.py b/src/e2eorchestrator/service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..38d04994fb0fa1951fb465bc127eb72659dc2eaf --- /dev/null +++ b/src/e2eorchestrator/service/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/e2eorchestrator/service/__main__.py b/src/e2eorchestrator/service/__main__.py new file mode 100644 index 0000000000000000000000000000000000000000..a586543a7078d9b7f868967ad7eea7d228985086 --- /dev/null +++ b/src/e2eorchestrator/service/__main__.py @@ -0,0 +1,80 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import signal +import sys +import threading + +from prometheus_client import start_http_server + +from common.Constants import ServiceNameEnum +from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST, + ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, + get_log_level, get_metrics_port, + wait_for_environment_variables) + +from .E2EOrchestratorService import E2EOrchestratorService + +terminate = threading.Event() +LOGGER = None + + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning("Terminate signal received") + terminate.set() + + +def main(): + global LOGGER # pylint: disable=global-statement + + log_level = get_log_level() + logging.basicConfig(level=log_level) + LOGGER = logging.getLogger(__name__) + + wait_for_environment_variables( + [ + get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_HOST), + get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ] + ) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + LOGGER.info("Starting...") + + # Start metrics server + metrics_port = get_metrics_port() + start_http_server(metrics_port) + + # Starting CentralizedCybersecurity service + grpc_service = E2EOrchestratorService() + grpc_service.start() + LOGGER.info("Started...") + # Wait for Ctrl+C or termination signal + + while not terminate.wait(timeout=1): + pass + + + LOGGER.info("Terminating...") + grpc_service.stop() + + LOGGER.info("Bye") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/service/Dockerfile b/src/service/Dockerfile index 5988374e07a731c0f99f732a271c767e16281114..23be900e6fe4ef4018a50b0b44f861ecb733ede8 100644 --- a/src/service/Dockerfile +++ b/src/service/Dockerfile @@ -66,6 +66,7 @@ COPY src/context/. context/ COPY src/device/. device/ COPY src/pathcomp/frontend/. pathcomp/frontend/ COPY src/service/. service/ +COPY src/e2eorchestrator/. e2eorchestrator/ # Start the service ENTRYPOINT ["python", "-m", "service.service"] diff --git a/src/service/service/ServiceServiceServicerImpl.py b/src/service/service/ServiceServiceServicerImpl.py index f79e3e5f3aa983aacda3163682737cdf0ff6be03..20b6048af8b48428d46db19ef4231c4f602d2327 100644 --- a/src/service/service/ServiceServiceServicerImpl.py +++ b/src/service/service/ServiceServiceServicerImpl.py @@ -21,10 +21,12 @@ from common.method_wrappers.ServiceExceptions import ( from common.proto.context_pb2 import ( Connection, Empty, Service, ServiceId, ServiceStatusEnum, ServiceTypeEnum, ConstraintActionEnum) from common.proto.pathcomp_pb2 import PathCompRequest +from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.service_pb2_grpc import ServiceServiceServicer from common.tools.context_queries.Service import get_service_by_id from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string from context.client.ContextClient import ContextClient +from e2eorchestrator.client.E2EOrchestratorServiceClient import E2EOrchestratorServiceClient from pathcomp.frontend.client.PathCompClient import PathCompClient from service.service.tools.ConnectionToString import connection_to_string from service.client.TEServiceClient import TEServiceClient @@ -32,6 +34,7 @@ from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory from .task_scheduler.TaskScheduler import TasksScheduler from .tools.GeodesicDistance import gps_distance + LOGGER = logging.getLogger(__name__) METRICS_POOL = MetricsPool('Service', 'RPC') @@ -153,6 +156,32 @@ class ServiceServiceServicerImpl(ServiceServiceServicer): str_service_status = ServiceStatusEnum.Name(service_status.service_status) raise Exception(MSG.format(service_key, str_service_status)) + if service.service_type == ServiceTypeEnum.SERVICETYPE_E2E: + # End-to-End service: + service_id_with_uuids = context_client.SetService(request) + + service_with_uuids = get_service_by_id( + context_client, service_id_with_uuids, rw_copy=False, + include_config_rules=True, include_constraints=True, include_endpoint_ids=True) + + e2e_orch_request = E2EOrchestratorRequest() + e2e_orch_request.service.CopyFrom(service_with_uuids) + + e2e_orch_client = E2EOrchestratorServiceClient() + e2e_orch_reply = e2e_orch_client.Compute(e2e_orch_request) + + # Feed TaskScheduler with this end-to-end orchestrator reply. TaskScheduler identifies + # inter-dependencies among the services and connections retrieved and produces a + # schedule of tasks (an ordered list of tasks to be executed) to implement the + # requested create/update operation. + tasks_scheduler = TasksScheduler(self.service_handler_factory) + # e2e_orch_reply should be compatible with pathcomp_reply + # TODO: if we extend e2e_orch_reply, implement method TasksScheduler::compose_from_e2eorchreply() + tasks_scheduler.compose_from_pathcompreply(e2e_orch_reply, is_delete=False) + tasks_scheduler.execute_all() + return service_with_uuids.service_id + + # Normal service del service.service_endpoint_ids[:] # pylint: disable=no-member for endpoint_id in request.service_endpoint_ids: diff --git a/src/service/service/service_handlers/2e2_orch/E2EOrchServiceHandler.py b/src/service/service/service_handlers/2e2_orch/E2EOrchServiceHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..44272fee8b382cd1f44d22fe4195c3e3273bcbb7 --- /dev/null +++ b/src/service/service/service_handlers/2e2_orch/E2EOrchServiceHandler.py @@ -0,0 +1,176 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging +from typing import Any, Dict, List, Optional, Tuple, Union +from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method +from common.proto.context_pb2 import ConfigRule, DeviceId, Service +from common.tools.object_factory.ConfigRule import json_config_rule_delete, json_config_rule_set +from common.tools.object_factory.Device import json_device_id +from common.type_checkers.Checkers import chk_type +from service.service.service_handler_api.Tools import get_device_endpoint_uuids +from service.service.service_handler_api._ServiceHandler import _ServiceHandler +from service.service.service_handler_api.SettingsHandler import SettingsHandler +from service.service.task_scheduler.TaskExecutor import TaskExecutor + +LOGGER = logging.getLogger(__name__) + +METRICS_POOL = MetricsPool('Service', 'Handler', labels={'handler': 'e2e_orch'}) + +class E2EOrchServiceHandler(_ServiceHandler): + def __init__( # pylint: disable=super-init-not-called + self, service : Service, task_executor : TaskExecutor, **settings + ) -> None: + self.__service = service + self.__task_executor = task_executor + self.__settings_handler = SettingsHandler(service.service_config, **settings) + + @metered_subclass_method(METRICS_POOL) + def SetEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: + + chk_type('endpoints', endpoints, list) + if len(endpoints) < 2: return [] + + service_uuid = self.__service.service_id.service_uuid.uuid + settings = self.__settings_handler.get('/settings') + json_settings : Dict = {} if settings is None else settings.value + bitrate = json_settings.get('bitrate', 1000) + + results = [] + try: + src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) + src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) + src_controller = self.__task_executor.get_device_controller(src_device) + if src_controller is None: src_controller = src_device + + dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[-1]) + dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) + dst_controller = self.__task_executor.get_device_controller(dst_device) + if dst_controller is None: dst_controller = dst_device + + controller = src_controller + + json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { + 'uuid' : service_uuid, + 'src_node' : src_endpoint_uuid, + 'dst_node' : dst_endpoint_uuid, + 'bitrate' : bitrate + }) + del controller.device_config.config_rules[:] + controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) + self.__task_executor.configure_device(controller) + results.append(True) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Unable to SetEndpoint for Service({:s})'.format(str(service_uuid))) + results.append(e) + + return results + + @metered_subclass_method(METRICS_POOL) + def DeleteEndpoint( + self, endpoints : List[Tuple[str, str, Optional[str]]], connection_uuid : Optional[str] = None + ) -> List[Union[bool, Exception]]: + + chk_type('endpoints', endpoints, list) + if len(endpoints) < 2: return [] + + service_uuid = self.__service.service_id.service_uuid.uuid + settings = self.__settings_handler.get('/settings') + json_settings : Dict = {} if settings is None else settings.value + flow_id = json_settings.get('flow_id', 100) + bitrate = json_settings.get('bitrate', 1000) + + results = [] + try: + src_device_uuid, src_endpoint_uuid = get_device_endpoint_uuids(endpoints[0]) + src_device = self.__task_executor.get_device(DeviceId(**json_device_id(src_device_uuid))) + src_controller = self.__task_executor.get_device_controller(src_device) + if src_controller is None: src_controller = src_device + + dst_device_uuid, dst_endpoint_uuid = get_device_endpoint_uuids(endpoints[1]) + dst_device = self.__task_executor.get_device(DeviceId(**json_device_id(dst_device_uuid))) + dst_controller = self.__task_executor.get_device_controller(dst_device) + if dst_controller is None: dst_controller = dst_device + + controller = src_controller + + json_config_rule = json_config_rule_set('/services/service[{:s}]'.format(service_uuid), { + 'uuid' : service_uuid, + 'flow_id' : flow_id, + 'src_node' : src_endpoint_uuid, + 'dst_node' : dst_endpoint_uuid, + 'bitrate' : bitrate + }) + + del controller.device_config.config_rules[:] + controller.device_config.config_rules.append(ConfigRule(**json_config_rule)) + self.__task_executor.configure_device(controller) + results.append(True) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Unable to DeleteEndpoint for Service({:s})'.format(str(service_uuid))) + results.append(e) + + return results + + @metered_subclass_method(METRICS_POOL) + def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('constraints', constraints, list) + if len(constraints) == 0: return [] + + msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.' + LOGGER.warning(msg.format(str(constraints))) + return [True for _ in range(len(constraints))] + + @metered_subclass_method(METRICS_POOL) + def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('constraints', constraints, list) + if len(constraints) == 0: return [] + + msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.' + LOGGER.warning(msg.format(str(constraints))) + return [True for _ in range(len(constraints))] + + @metered_subclass_method(METRICS_POOL) + def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('resources', resources, list) + if len(resources) == 0: return [] + + results = [] + for resource in resources: + try: + resource_value = json.loads(resource[1]) + self.__settings_handler.set(resource[0], resource_value) + results.append(True) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Unable to SetConfig({:s})'.format(str(resource))) + results.append(e) + + return results + + @metered_subclass_method(METRICS_POOL) + def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]: + chk_type('resources', resources, list) + if len(resources) == 0: return [] + + results = [] + for resource in resources: + try: + self.__settings_handler.delete(resource[0]) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Unable to DeleteConfig({:s})'.format(str(resource))) + results.append(e) + + return results diff --git a/src/service/service/service_handlers/2e2_orch/__init__.py b/src/service/service/service_handlers/2e2_orch/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..1549d9811aa5d1c193a44ad45d0d7773236c0612 --- /dev/null +++ b/src/service/service/service_handlers/2e2_orch/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/src/tests/tools/mock_flexscale_opt_ctrl/MockFlexscaleOptCtrl.py b/src/tests/tools/mock_flexscale_opt_ctrl/MockFlexscaleOptCtrl.py new file mode 100644 index 0000000000000000000000000000000000000000..09e1de99548d7f5c2cc173e3131bb9eddeaea25c --- /dev/null +++ b/src/tests/tools/mock_flexscale_opt_ctrl/MockFlexscaleOptCtrl.py @@ -0,0 +1,73 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import functools, logging, sys, time +from flask import Flask, jsonify, make_response, request +from flask_restful import Api, Resource +from data import ADDLIGHTPATH_REPLY + +BIND_ADDRESS = '0.0.0.0' +BIND_PORT = 8443 +BASE_URL = '/OpticalTFS' +STR_ENDPOINT = 'https://{:s}:{:s}{:s}'.format(str(BIND_ADDRESS), str(BIND_PORT), str(BASE_URL)) +LOG_LEVEL = logging.DEBUG + + +logging.basicConfig(level=LOG_LEVEL, format="[%(asctime)s] %(levelname)s:%(name)s:%(message)s") +LOGGER = logging.getLogger(__name__) +logging.getLogger('werkzeug').setLevel(logging.WARNING) + +def log_request(logger : logging.Logger, response): + timestamp = time.strftime('[%Y-%b-%d %H:%M]') + logger.info('%s %s %s %s %s', timestamp, request.remote_addr, request.method, request.full_path, response.status) + return response + +class AddLightpath(Resource): + def put(self): + return make_response(jsonify(ADDLIGHTPATH_REPLY), 200) + +class DelLightpath(Resource): + def delete(self): + return make_response(jsonify({}), 200) + +class GetLightpaths(Resource): + def get(self): + return make_response(jsonify({}), 200) + +class GetLinks(Resource): + def get(self): + return make_response(jsonify({}), 200) + + +def main(): + LOGGER.info('Starting...') + + app = Flask(__name__) + app.after_request(functools.partial(log_request, LOGGER)) + + api = Api(app, prefix=BASE_URL) + api.add_resource(AddLightpath, '/AddLightpath/<string:src_node>/<string:dst_node>/<string:bitrate>') + api.add_resource(DelLightpath, '/DelLightpath/<string:flow_id>/<string:src_node>/<string:dst_node>/<string:bitrate>') + api.add_resource(GetLightpaths, '/GetLightpaths') + api.add_resource(GetLinks, '/GetLinks') + + LOGGER.info('Listening on {:s}...'.format(str(STR_ENDPOINT))) + app.run(debug=True, host=BIND_ADDRESS, port=BIND_PORT) + + LOGGER.info('Bye') + return 0 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/tests/tools/mock_flexscale_opt_ctrl/data.py b/src/tests/tools/mock_flexscale_opt_ctrl/data.py new file mode 100644 index 0000000000000000000000000000000000000000..e89abeb3a99460b52dc55463df29ae8755079fc4 --- /dev/null +++ b/src/tests/tools/mock_flexscale_opt_ctrl/data.py @@ -0,0 +1,98 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +ADDLIGHTPATH_REPLY = """{ + "flow_id": 1, + "src": "t1", + "dst": "t2", + "bitrate": 100, + "bidir": 1, + "flows": { + "t1": [ + { + "in": 0, + "out": "1" + }, + { + "in": "1", + "out": 0 + } + ], + "r1": [ + { + "in": "101R", + "out": "1T" + }, + { + "in": "1R", + "out": "101T" + } + ], + "r2": [ + { + "in": "1R", + "out": "101T" + }, + { + "in": "101R", + "out": "1T" + } + ], + "t2": [ + { + "in": "1", + "out": 0 + }, + { + "in": 0, + "out": "1" + } + ] + }, + "band_type": "c_slots", + "slots": [ + 1, + 2, + 3, + 4 + ], + "fiber_forward": { + "t1-r1": "M1", + "r1-r2": "d1-1", + "r2-t2": "S1" + }, + "fiber_backward": { + "r1-t1": "S1", + "r2-r1": "d1-1", + "t2-r2": "M1" + }, + "op-mode": 1, + "n_slots": 4, + "links": [ + "t1-r1", + "r1-r2", + "r2-t2" + ], + "path": [ + "t1", + "r1", + "r2", + "t2" + ], + "band": 50, + "freq": 192031.25, + "is_active": true +} +""" diff --git a/src/tests/tools/mock_flexscale_opt_ctrl/run.sh b/src/tests/tools/mock_flexscale_opt_ctrl/run.sh new file mode 100755 index 0000000000000000000000000000000000000000..183df7a030dca352ee2bb5fdfd0ac081cdcec960 --- /dev/null +++ b/src/tests/tools/mock_flexscale_opt_ctrl/run.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +python MockFlexscaleOptCtrl.py diff --git a/src/tests/tools/mock_flexscale_opt_ctrl/test_mw.py b/src/tests/tools/mock_flexscale_opt_ctrl/test_mw.py new file mode 100644 index 0000000000000000000000000000000000000000..0329d30ad234398200c0fe29aac46f72f5a2e924 --- /dev/null +++ b/src/tests/tools/mock_flexscale_opt_ctrl/test_mw.py @@ -0,0 +1,84 @@ +import json, logging, requests +from requests.auth import HTTPBasicAuth +from typing import Optional + +LOGGER = logging.getLogger(__name__) + +HTTP_OK_CODES = { + 200, # OK + 201, # Created + 202, # Accepted + 204, # No Content +} + +def create_connectivity_service( + root_url, uuid, node_id_src, tp_id_src, node_id_dst, tp_id_dst, vlan_id, + auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None +): + + url = '{:s}/nmswebs/restconf/data/ietf-eth-tran-service:etht-svc'.format(root_url) + headers = {'content-type': 'application/json'} + data = { + 'etht-svc-instances': [ + { + 'etht-svc-name': uuid, + 'etht-svc-type': 'ietf-eth-tran-types:p2p-svc', + 'etht-svc-end-points': [ + { + 'etht-svc-access-points': [ + {'access-node-id': node_id_src, 'access-ltp-id': tp_id_src, 'access-point-id': '1'} + ], + 'outer-tag': {'vlan-value': vlan_id, 'tag-type': 'ietf-eth-tran-types:classify-c-vlan'}, + 'etht-svc-end-point-name': '{:s}:{:s}'.format(str(node_id_src), str(tp_id_src)), + 'service-classification-type': 'ietf-eth-tran-types:vlan-classification' + }, + { + 'etht-svc-access-points': [ + {'access-node-id': node_id_dst, 'access-ltp-id': tp_id_dst, 'access-point-id': '2'} + ], + 'outer-tag': {'vlan-value': vlan_id, 'tag-type': 'ietf-eth-tran-types:classify-c-vlan'}, + 'etht-svc-end-point-name': '{:s}:{:s}'.format(str(node_id_dst), str(tp_id_dst)), + 'service-classification-type': 'ietf-eth-tran-types:vlan-classification' + } + ] + } + ] + } + results = [] + try: + LOGGER.info('Connectivity service {:s}: {:s}'.format(str(uuid), str(data))) + response = requests.post( + url=url, data=json.dumps(data), timeout=timeout, headers=headers, verify=False, auth=auth) + LOGGER.info('Microwave Driver response: {:s}'.format(str(response))) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception creating ConnectivityService(uuid={:s}, data={:s})'.format(str(uuid), str(data))) + results.append(e) + else: + if response.status_code not in HTTP_OK_CODES: + msg = 'Could not create ConnectivityService(uuid={:s}, data={:s}). status_code={:s} reply={:s}' + LOGGER.error(msg.format(str(uuid), str(data), str(response.status_code), str(response))) + results.append(response.status_code in HTTP_OK_CODES) + return results + +def delete_connectivity_service(root_url, uuid, auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None): + url = '{:s}/nmswebs/restconf/data/ietf-eth-tran-service:etht-svc/etht-svc-instances={:s}' + url = url.format(root_url, uuid) + results = [] + try: + response = requests.delete(url=url, timeout=timeout, verify=False, auth=auth) + except Exception as e: # pylint: disable=broad-except + LOGGER.exception('Exception deleting ConnectivityService(uuid={:s})'.format(str(uuid))) + results.append(e) + else: + if response.status_code not in HTTP_OK_CODES: + msg = 'Could not delete ConnectivityService(uuid={:s}). status_code={:s} reply={:s}' + LOGGER.error(msg.format(str(uuid), str(response.status_code), str(response))) + results.append(response.status_code in HTTP_OK_CODES) + return results + +if __name__ == '__main__': + ROOT_URL = 'https://127.0.0.1:8443' + SERVICE_UUID = 'my-service' + + create_connectivity_service(ROOT_URL, SERVICE_UUID, '172.18.0.1', '1', '172.18.0.2', '2', 300) + delete_connectivity_service(ROOT_URL, SERVICE_UUID)