Skip to content
Snippets Groups Projects
Commit c3e31051 authored by Carlos Manso's avatar Carlos Manso
Browse files

Update

parent 86a9db7b
No related branches found
No related tags found
2 merge requests!235Release TeraFlowSDN 3.0,!155Resolve "(CTTC) Add SBI driver for FLEX-SCALE Optical SDN Controller"
Showing
with 777 additions and 22 deletions
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apps/v1
kind: Deployment
metadata:
name: e2eorchestratorservice
spec:
selector:
matchLabels:
app: e2eorchestratorservice
template:
metadata:
labels:
app: e2eorchestratorservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: labs.etsi.org:5050/tfs/controller/e2eorchestrator:latest
imagePullPolicy: Always
ports:
- containerPort: 10040
- containerPort: 9192
env:
- name: LOG_LEVEL
value: "INFO"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secrets
key: REDIS_PASSWORD
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10009"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:10009"]
resources:
requests:
cpu: 250m
memory: 128Mi
limits:
cpu: 1000m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: e2eorchestratorservice
labels:
app: e2eorchestratorservice
spec:
type: ClusterIP
selector:
app: e2eorchestratorservice
ports:
- name: grpc
port: 10040
targetPort: 10040
- name: metrics
port: 9192
targetPort: 9192
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: e2eorchestratorservice-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: e2eorchestratorservice
minReplicas: 1
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80
#behavior:
# scaleDown:
# stabilizationWindowSeconds: 30
......@@ -37,6 +37,10 @@ export TFS_COMPONENTS="context device pathcomp service slice compute webui load_
# Uncomment to activate TE
#export TFS_COMPONENTS="${TFS_COMPONENTS} te"
# Uncomment to activate E2EOrchestrator
export TFS_COMPONENTS="${TFS_COMPONENTS} e2eorchestrator"
# Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev"
......@@ -102,7 +106,7 @@ export NATS_EXT_PORT_CLIENT="4222"
export NATS_EXT_PORT_HTTP="8222"
# Disable flag for re-deploying NATS from scratch.
export NATS_REDEPLOY=""
export NATS_REDEPLOY="YES"
# ----- QuestDB ----------------------------------------------------------------
......
......@@ -279,6 +279,7 @@ enum ServiceTypeEnum {
SERVICETYPE_L2NM = 2;
SERVICETYPE_TAPI_CONNECTIVITY_SERVICE = 3;
SERVICETYPE_TE = 4;
SERVICETYPE_E2E = 5;
}
enum ServiceStatusEnum {
......
// Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3
syntax = "proto3";
package orchestrator;
import "context.proto";
service E2EOrchestratorService {
rpc Compute(E2EOrchestratorRequest) returns (E2EOrchestratorReply) {}
}
message E2EOrchestratorRequest {
context.Service service = 1;
}
message E2EOrchestratorReply {
// Service requested completed with possible missing fields, and
// sub-services required for supporting requested service on the
// underlying layers.
repeated context.Service services = 1;
// Connections supporting the requested service and sub-services
// required for the underlying layers.
repeated context.Connection connections = 2;
}
\ No newline at end of file
#!/bin/bash
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################################################################
# Define your deployment settings here
########################################################################################################################
# If not already set, set the name of the Kubernetes namespace to deploy to.
export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
########################################################################################################################
# Automated steps start here
########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/e2eorchestratorservice -c server
......@@ -57,6 +57,7 @@ class ServiceNameEnum(Enum):
OPTICALATTACKMITIGATOR = 'opticalattackmitigator'
CACHING = 'caching'
TE = 'te'
E2EORCHESTRATOR = 'e2eorchestrator'
# Used for test and debugging only
DLT_GATEWAY = 'dltgateway'
......@@ -82,6 +83,7 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.INTERDOMAIN .value : 10010,
ServiceNameEnum.PATHCOMP .value : 10020,
ServiceNameEnum.TE .value : 10030,
ServiceNameEnum.E2EORCHESTRATOR .value : 10040,
# Used for test and debugging only
ServiceNameEnum.DLT_GATEWAY .value : 50051,
......
......@@ -12,20 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, requests, threading
import json, logging, requests, threading
from requests.auth import HTTPBasicAuth
from typing import Any, Iterator, List, Optional, Tuple, Union
from common.method_wrappers.Decorator import MetricsPool, metered_subclass_method
from common.type_checkers.Checkers import chk_string, chk_type
from device.service.driver_api._Driver import _Driver
from . import ALL_RESOURCE_KEYS
from .Tools import find_key
from .Tools import find_key, add_lightpath, del_lightpath, get_lightpaths
LOGGER = logging.getLogger(__name__)
DRIVER_NAME = 'flexscale'
METRICS_POOL = MetricsPool('Device', 'Driver', labels={'driver': DRIVER_NAME})
class FlexScaleDriver(_Driver):
def __init__(self, address: str, port: int, **settings) -> None:
super().__init__(DRIVER_NAME, address, port, **settings)
......@@ -40,7 +41,7 @@ class FlexScaleDriver(_Driver):
self.__timeout = int(self.settings.get('timeout', 120))
def Connect(self) -> bool:
url = self.__flexscale_root + '/TODO' # TODO
url = self.__flexscale_root + '/OpticalTFS/GetLightpaths'
with self.__lock:
if self.__started.is_set(): return True
try:
......@@ -74,8 +75,8 @@ class FlexScaleDriver(_Driver):
for i, resource_key in enumerate(resource_keys):
str_resource_name = 'resource_key[#{:d}]'.format(i)
chk_string(str_resource_name, resource_key, allow_empty=False)
# TODO results.extend(config_getter(
# self.__tapi_root, resource_key, timeout=self.__timeout, auth=self.__auth))
results.extend(get_lightpaths(
self.__flexscale_root, resource_key, timeout=self.__timeout, auth=self.__auth))
return results
@metered_subclass_method(METRICS_POOL)
......@@ -84,40 +85,48 @@ class FlexScaleDriver(_Driver):
if len(resources) == 0:
return results
with self.__lock:
for resource in resources:
for _, resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
# data = create_connectivity_service( TODO
# self.__tapi_root, uuid, input_sip, output_sip, direction, capacity_value, capacity_unit,
# layer_protocol_name, layer_protocol_qualifier, timeout=self.__timeout, auth=self.__auth)
data = None
results.extend(data)
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = add_lightpath(self.__flexscale_root, src_node, dst_node, bitrate,
auth=self.__auth, timeout=self.__timeout)
results.extend(response)
return results
@metered_subclass_method(METRICS_POOL)
def DeleteConfig(self, resources: List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
results = []
if len(resources) == 0: return results
if len(resources) == 0:
return results
with self.__lock:
for resource in resources:
for _, resource in resources:
LOGGER.info('resource = {:s}'.format(str(resource)))
uuid = find_key(resource, 'uuid')
# results.extend(delete_connectivity_service( TODO
# self.__tapi_root, uuid, timeout=self.__timeout, auth=self.__auth))
flow_id = find_key(resource, 'flow_id')
src_node = find_key(resource, 'src_node')
dst_node = find_key(resource, 'dst_node')
bitrate = find_key(resource, 'bitrate')
response = del_lightpath(self.__flexscale_root, flow_id, src_node, dst_node, bitrate)
results.extend(response)
return results
@metered_subclass_method(METRICS_POOL)
def SubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: TAPI does not support monitoring by now
# FlexScale does not support monitoring by now
return [False for _ in subscriptions]
@metered_subclass_method(METRICS_POOL)
def UnsubscribeState(self, subscriptions : List[Tuple[str, float, float]]) -> List[Union[bool, Exception]]:
# TODO: TAPI does not support monitoring by now
# FlexScale does not support monitoring by now
return [False for _ in subscriptions]
def GetState(
self, blocking=False, terminate : Optional[threading.Event] = None
) -> Iterator[Tuple[float, str, Any]]:
# TODO: TAPI does not support monitoring by now
# FlexScale does not support monitoring by now
return []
......@@ -26,6 +26,121 @@ HTTP_OK_CODES = {
204, # No Content
}
def find_key(resource, key):
return json.loads(resource[1])[key]
def get_lightpaths(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLightpaths'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
flows = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
# if resource_key == RESOURCE_ENDPOINTS:
for flow in flows:
flow_id = flow.get('flow_id')
source = flow.get('src')
destination = flow.get('dst')
bitrate = flow.get('bitrate')
# more TODO
endpoint_url = '/flows/flow[{:s}]'.format(flow_id)
endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate}
result.append((endpoint_url, endpoint_data))
return result
def add_lightpath(root_url, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/AddLightpath/{:s}/{:s}/{:s}'.format(root_url, src_node, dst_node, bitrate)
results = []
try:
response = requests.put(url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
results.extend(response)
LOGGER.info('Lightpath request: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
LOGGER.info('Response: {:s}'.format(str(response)))
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception requesting Lightpath: {:s} <-> {:s} with {:s} bitrate'.format(
str(src_node), str(dst_node), str(bitrate)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not create Lightpath(status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def del_lightpath(root_url, flow_id, src_node, dst_node, bitrate,
auth : Optional[HTTPBasicAuth] = None, timeout : Optional[int] = None):
url = '{:s}/OpticalTFS/DelLightpath/{:s}/{:s}/{:s}/{:s}'.format(root_url, flow_id, src_node, dst_node, bitrate)
headers = {'accept': 'application/json'}
results = []
try:
response = requests.delete(url=url, timeout=timeout, headers=headers, verify=False, auth=auth)
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception deleting Lightpath(uuid={:s})'.format(str(flow_id)))
results.append(e)
else:
if response.status_code not in HTTP_OK_CODES:
msg = 'Could not delete Lightpath(flow_id={:s}). status_code={:s} reply={:s}'
LOGGER.error(msg.format(str(flow_id), str(response.status_code), str(response)))
results.append(response.status_code in HTTP_OK_CODES)
return results
def get_topology(root_url : str, resource_key : str,auth : Optional[HTTPBasicAuth] = None,
timeout : Optional[int] = None):
headers = {'accept': 'application/json'}
url = '{:s}/OpticalTFS/GetLinks'.format(root_url)
result = []
try:
response = requests.get(url, timeout=timeout, headers=headers, verify=False, auth=auth)
except requests.exceptions.Timeout:
LOGGER.exception('Timeout connecting {:s}'.format(url))
return result
except Exception as e: # pylint: disable=broad-except
LOGGER.exception('Exception retrieving {:s}'.format(resource_key))
result.append((resource_key, e))
return result
try:
links = json.loads(response.content)
except Exception as e: # pylint: disable=broad-except
LOGGER.warning('Unable to decode reply: {:s}'.format(str(response.content)))
result.append((resource_key, e))
return result
# if resource_key == RESOURCE_ENDPOINTS:
for link in links:
# TODO
# endpoint_url = '/flows/flow[{:s}]'.format(flow_id)
# endpoint_data = {'flow_id': flow_id, 'src': source, 'dst': destination, 'bitrate': bitrate}
# result.append((endpoint_url, endpoint_data))
return result
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# build, tag and push the Docker image to the gitlab registry
build e2eorchestrator:
variables:
IMAGE_NAME: 'e2eorchestrator' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM python:3.9-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
ENV PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Creating a user for security reasons
RUN groupadd -r teraflow && useradd -u 1001 --no-log-init -r -m -g teraflow teraflow
USER teraflow
# set working directory
RUN mkdir -p /home/teraflow/controller/common/
WORKDIR /home/teraflow/controller
# Get Python packages per module
ENV VIRTUAL_ENV=/home/teraflow/venv
RUN python3 -m venv ${VIRTUAL_ENV}
ENV PATH="${VIRTUAL_ENV}/bin:${PATH}"
# Get generic Python packages
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install --upgrade setuptools wheel
RUN python3 -m pip install --upgrade pip-tools
# Get common Python packages
# Note: this step enables sharing the previous Docker build steps among all the Python components
COPY --chown=teraflow:teraflow common_requirements.in common_requirements.in
RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in
RUN python3 -m pip install -r common_requirements.txt
# Add common files into working directory
WORKDIR /home/teraflow/controller/common
COPY --chown=teraflow:teraflow src/common/. ./
RUN rm -rf proto
# Create proto sub-folder, copy .proto files, and generate Python code
RUN mkdir -p /home/teraflow/controller/common/proto
WORKDIR /home/teraflow/controller/common/proto
RUN touch __init__.py
COPY --chown=teraflow:teraflow proto/*.proto ./
RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Create module sub-folders
RUN mkdir -p /home/teraflow/controller/e2eorchestrator
WORKDIR /home/teraflow/controller
# Get Python packages per module
COPY --chown=teraflow:teraflow ./src/e2eorchestrator/requirements.in e2eorchestrator/requirements.in
# consider common and specific requirements to avoid inconsistencies with dependencies
RUN pip-compile --quiet --output-file=e2eorchestrator/requirements.txt e2eorchestrator/requirements.in common_requirements.in
RUN python3 -m pip install -r e2eorchestrator/requirements.txt
# Add component files into working directory
COPY --chown=teraflow:teraflow ./src/context/. context
COPY --chown=teraflow:teraflow ./src/e2eorchestrator/. e2eorchestrator
# Start the service
ENTRYPOINT ["python", "-m", "e2eorchestrator.service"]
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import grpc
from common.Constants import ServiceNameEnum
from common.proto.context_pb2 import Empty
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceStub
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import delay_exponential, retry
from common.tools.grpc.Tools import grpc_message_to_json
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(
max_retries=MAX_RETRIES,
delay_function=DELAY_FUNCTION,
prepare_method_name="connect",
)
class E2EOrchestratorServiceClient:
def __init__(self, host=None, port=None):
if not host:
host = get_service_host(ServiceNameEnum.E2EORCHESTRATOR)
if not port:
port = get_service_port_grpc(ServiceNameEnum.E2EORCHESTRATOR)
self.endpoint = "{:s}:{:s}".format(str(host), str(port))
LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint)))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug("Channel created")
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = E2EOrchestratorServiceStub(self.channel)
def close(self):
if self.channel is not None:
self.channel.close()
self.channel = None
self.stub = None
@RETRY_DECORATOR
def Compute(self, request: Empty) -> Empty:
LOGGER.debug(
"Compute request: {:s}".format(str(grpc_message_to_json(request)))
)
response = self.stub.GetPath(request)
LOGGER.debug(
"Compute result: {:s}".format(str(grpc_message_to_json(response)))
)
return response
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
networkx
\ No newline at end of file
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from common.Constants import ServiceNameEnum
from common.proto.e2eorchestrator_pb2_grpc import add_E2EOrchestratorServiceServicer_to_server
from common.Settings import get_service_port_grpc
from common.tools.service.GenericGrpcService import GenericGrpcService
from .E2EOrchestratorServiceServicerImpl import E2EOrchestratorServiceServicerImpl
LOGGER = logging.getLogger(__name__)
class E2EOrchestratorService(GenericGrpcService):
def __init__(self, cls_name: str = __name__):
port = get_service_port_grpc(ServiceNameEnum.E2EORCHESTRATOR)
super().__init__(port, cls_name=cls_name)
self.e2eorchestrator_servicer = E2EOrchestratorServiceServicerImpl()
def install_servicers(self):
add_E2EOrchestratorServiceServicer_to_server(
self.e2eorchestrator_servicer, self.server
)
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import networkx as nx
import grpc
import copy
from common.Constants import ServiceNameEnum
from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method)
from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply
from common.proto.context_pb2 import Empty, Connection, EndPointId
from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC")
context_client: ContextClient = ContextClient()
class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer):
def __init__(self):
LOGGER.debug("Creating Servicer...")
LOGGER.debug("Servicer Created")
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply:
endpoints_ids = []
for endpoint_id in request.service_endpoint_ids:
endpoints_ids.append(endpoint_id.endpoint_uuid.uuid)
graph = nx.Graph()
devices = context_client.ListDevices(Empty()).devices
for device in devices:
endpoints_uuids = [endpoint.endpoint_uuid.uuid for endpoint in device.device_endpoints]
for ep in endpoints_uuids:
graph.add_node(ep)
for ep in endpoints_uuids:
for ep_i in endpoints_uuids:
if ep == ep_i:
continue
graph.add_edge(ep. ep_i)
links = context_client.ListLinks(Empty()).links
for link in links:
eps = []
for endpoint_id in link.link_endpoint_ids:
eps.append(endpoint_id.endpoint_uuid.uuid)
graph.add_edge(eps[0], eps[1])
shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1])
path = E2EOrchestratorReply()
path.services.append(copy.deepcopy(request.service))
for i in len(shortest):
conn = Connection
conn.connection_id.connection_uuid.uuid = str(shortest[i*2]) + '_->_' + str(shortest[i*2+1])
ep0 = EndPointId
ep0.endpoint_uuid.uuid = ep
conn.path_hops_endpoint_ids.append(shortest[i*2])
ep1 = EndPointId
ep1.endpoint_uuid.uuid = ep
conn.path_hops_endpoint_ids.append(shortest[i*2+1])
path.connections.append(conn)
return path
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import signal
import sys
import threading
from prometheus_client import start_http_server
from common.Constants import ServiceNameEnum
from common.Settings import (ENVVAR_SUFIX_SERVICE_HOST,
ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name,
get_log_level, get_metrics_port,
wait_for_environment_variables)
from .E2EOrchestratorService import E2EOrchestratorService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning("Terminate signal received")
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
log_level = get_log_level()
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables(
[
get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_HOST),
get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]
)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info("Starting...")
# Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Starting CentralizedCybersecurity service
grpc_service = E2EOrchestratorService()
grpc_service.start()
LOGGER.info("Started...")
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=1):
pass
LOGGER.info("Terminating...")
grpc_service.stop()
LOGGER.info("Bye")
return 0
if __name__ == "__main__":
sys.exit(main())
......@@ -66,6 +66,7 @@ COPY src/context/. context/
COPY src/device/. device/
COPY src/pathcomp/frontend/. pathcomp/frontend/
COPY src/service/. service/
COPY src/e2eorchestrator/. e2eorchestrator/
# Start the service
ENTRYPOINT ["python", "-m", "service.service"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment