Skip to content
Snippets Groups Projects
Commit 39b9f3f9 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Several changes:

- Initial (unfinished) version of Slice component
- Factorized and improved check methods from "device" and "service" components to align with "slice" component
parent f2287700
No related branches found
No related tags found
1 merge request!54Release 2.0.0
Showing
with 316 additions and 44 deletions
......@@ -15,5 +15,6 @@ include:
- local: '/src/context/.gitlab-ci.yml'
- local: '/src/device/.gitlab-ci.yml'
- local: '/src/service/.gitlab-ci.yml'
#- local: '/src/slice/.gitlab-ci.yml'
- local: '/src/tester_integration/.gitlab-ci.yml'
- local: '/src/tester_functional/.gitlab-ci.yml'
apiVersion: apps/v1
kind: Deployment
metadata:
name: sliceservice
spec:
selector:
matchLabels:
app: sliceservice
template:
metadata:
labels:
app: sliceservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: registry.gitlab.com/teraflow-h2020/controller/slice:latest
imagePullPolicy: Always
ports:
- containerPort: 4040
env:
- name: DB_ENGINE
value: "redis"
- name: REDIS_DATABASE_ID
value: "0"
- name: LOG_LEVEL
value: "DEBUG"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:4040"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:4040"]
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 700m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: sliceservice
spec:
type: ClusterIP
selector:
app: sliceservice
ports:
- name: grpc
protocol: TCP
port: 4040
targetPort: 4040
#!/bin/bash
./report_coverage_all.sh | grep --color -E -i "^slice/.*$|$"
......@@ -22,6 +22,9 @@ coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
service/tests/test_unitary.py
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
slice/tests/test_unitary.py
# Run integration tests and analyze coverage of code at same time
export DB_ENGINE='redis'
export REDIS_SERVICE_HOST='10.1.7.194'
......
from enum import Enum
class SliceStatus(Enum):
PLANNED = 0
INIT = 1
ACTIVE = 2
DEINIT = 3
ANY_TO_ENUM = {
0: SliceStatus.PLANNED,
1: SliceStatus.INIT,
2: SliceStatus.ACTIVE,
3: SliceStatus.DEINIT,
'0': SliceStatus.PLANNED,
'1': SliceStatus.INIT,
'2': SliceStatus.ACTIVE,
'3': SliceStatus.DEINIT,
'planned': SliceStatus.PLANNED,
'init': SliceStatus.INIT,
'active': SliceStatus.ACTIVE,
'deinit': SliceStatus.DEINIT,
}
def slicestatus_enum_values():
return {m.value for m in SliceStatus.__members__.values()}
def to_slicestatus_enum(int_or_str):
if isinstance(int_or_str, str): int_or_str = int_or_str.lower()
return ANY_TO_ENUM.get(int_or_str)
import grpc, logging
from typing import Dict, List, Set, Tuple
from common.Checkers import chk_string
from common.exceptions.ServiceException import ServiceException
from service.proto.context_pb2 import Constraint
def check_constraint(
logger : logging.Logger, constraint_number : int, parent_name : str, constraint : Constraint,
add_constraints : Dict[str, Dict[str, Set[str]]]) -> Tuple[str, str]:
try:
constraint_type = chk_string('constraint[#{}].constraint_type'.format(constraint_number),
constraint.constraint_type,
allow_empty=False)
constraint_value = chk_string('constraint[#{}].constraint_value'.format(constraint_number),
constraint.constraint_value,
allow_empty=False)
except Exception as e:
logger.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if constraint_type in add_constraints:
msg = 'Duplicated ConstraintType({}) in {}.'
msg = msg.format(constraint_type, parent_name)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
add_constraints[constraint_type] = constraint_value
return constraint_type, constraint_value
def check_constraints(logger : logging.Logger, parent_name : str, constraints):
add_constraints : Dict[str, str] = {}
constraint_tuples : List[Tuple[str, str]] = []
for constraint_number,constraint in enumerate(constraints):
parent_name = 'Constraint(#{}) of {}'.format(constraint_number, parent_name)
constraint_type, constraint_value = check_constraint(
logger, constraint_number, parent_name, constraint, add_constraints)
constraint_tuples.append((constraint_type, constraint_value))
return constraint_tuples
import grpc
from common.database.api.Database import Database
from common.database.api.context.topology.device.Device import Device
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.exceptions.ServiceException import ServiceException
def check_device_exists(database : Database, context_id : str, topology_id : str, device_id : str):
def check_device_exists(database : Database, context_id : str, topology_id : str, device_id : str) -> Device:
db_context = database.context(context_id).create()
db_topology = db_context.topology(topology_id).create()
if db_topology.devices.contains(device_id): return
if db_topology.devices.contains(device_id): return db_topology.device(device_id)
msg = 'Context({})/Topology({})/Device({}) does not exist in the database.'
msg = msg.format(context_id, topology_id, device_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
......
......@@ -3,9 +3,10 @@ from typing import Dict, Set, Tuple, Union
from common.Checkers import chk_string
from common.exceptions.ServiceException import ServiceException
from common.database.api.context.Constants import DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID
from context.proto.context_pb2 import EndPointId
def check_endpoint_id(
logger : logging.Logger, endpoint_number : int, parent_name : str, endpoint_id : 'EndpointId',
logger : logging.Logger, endpoint_number : int, parent_name : str, endpoint_id : 'EndPointId',
add_topology_devices_endpoints : Dict[str, Dict[str, Set[str]]],
predefined_context_id : str = DEFAULT_CONTEXT_ID, acceptable_context_ids : Set[str] = set([DEFAULT_CONTEXT_ID]),
predefined_topology_id : str = DEFAULT_TOPOLOGY_ID, acceptable_topology_ids : Set[str] = set([DEFAULT_TOPOLOGY_ID]),
......
import grpc
from common.database.api.Database import Database
from common.database.api.context.topology.link.Link import Link
from common.exceptions.ServiceException import ServiceException
def check_link_exists(database : Database, context_id : str, topology_id : str, link_id : str):
def check_link_exists(database : Database, context_id : str, topology_id : str, link_id : str) -> Link:
db_context = database.context(context_id).create()
db_topology = db_context.topology(topology_id).create()
if db_topology.links.contains(link_id): return
if db_topology.links.contains(link_id): return db_topology.link(link_id)
msg = 'Context({})/Topology({})/Link({}) does not exist in the database.'
msg = msg.format(context_id, topology_id, link_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
......
......@@ -9,7 +9,8 @@ def check_service_exists(database : Database, context_id : str, service_id : str
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
db_context = database.context(context_id)
if db_context.services.contains(service_id): return
if db_context.services.contains(service_id):
return db_context.service(service_id)
msg = 'Context({})/Service({}) does not exist in the database.'
msg = msg.format(context_id, service_id)
......
import grpc
from common.database.api.Database import Database
from common.database.api.context.slice.Slice import Slice
from common.exceptions.ServiceException import ServiceException
def check_slice_exists(database : Database, context_id : str, slice_id : str) -> Slice:
db_context = database.context(context_id).create()
if db_context.slices.contains(slice_id): return db_context.slice(slice_id)
msg = 'Context({})/Slice({}) does not exist in the database.'
msg = msg.format(context_id, slice_id)
raise ServiceException(grpc.StatusCode.NOT_FOUND, msg)
def check_slice_not_exists(database : Database, context_id : str, slice_id : str):
db_context = database.context(context_id).create()
if not db_context.slices.contains(slice_id): return
msg = 'Context({})/Slice({}) already exists in the database.'
msg = msg.format(context_id, slice_id)
raise ServiceException(grpc.StatusCode.ALREADY_EXISTS, msg)
......@@ -25,12 +25,12 @@ def _check_device_exists(method_name : str, database : Database, device_id : str
elif method_name in ['UpdateDevice', 'DeleteDevice']:
check_device_exists(database, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id)
else: # pragma: no cover (test requires malforming the code)
msg = 'Unexpected condition [_check_device_exists(method_name={}, device_id={})]'
msg = 'Unexpected condition: _check_device_exists(method_name={}, device_id={})'
msg = msg.format(str(method_name), str(device_id))
raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, msg)
def _check_device_endpoint_exists_or_get_pointer(
method_name : str, database : Database, parent_name : str, device_id : str, endpoint_id : str):
method_name : str, database : Database, parent_name : str, device_id : str, endpoint_id : str) -> Endpoint:
if method_name in ['AddDevice']:
db_context = database.context(DEFAULT_CONTEXT_ID)
......@@ -41,8 +41,9 @@ def _check_device_endpoint_exists_or_get_pointer(
return check_device_endpoint_exists(
database, parent_name, DEFAULT_CONTEXT_ID, DEFAULT_TOPOLOGY_ID, device_id, endpoint_id)
else: # pragma: no cover (test requires malforming the code)
msg = 'Unexpected condition [_check_device_exists(method_name={}, device_id={})]'
msg = msg.format(str(method_name), str(device_id))
msg = 'Unexpected condition: _check_device_endpoint_exists_or_get_pointer(method_name={}, ' \
'parent_name={}, device_id={}, endpoint_id={})'
msg = msg.format(str(method_name), str(parent_name), str(device_id), str(endpoint_id))
raise ServiceException(grpc.StatusCode.UNIMPLEMENTED, msg)
def check_device_operational_status(method_name : str, value : str) -> OperationalStatus:
......
......@@ -2,17 +2,16 @@ import grpc, logging
from typing import Dict, List, Set, Tuple
from common.Checkers import chk_options, chk_string
from common.database.api.Database import Database
from common.database.api.context.Constants import DEFAULT_TOPOLOGY_ID
from common.database.api.context.topology.device.Endpoint import Endpoint
from common.database.api.context.service.ServiceState import ServiceState, servicestate_enum_values, \
to_servicestate_enum
from common.database.api.context.service.ServiceType import ServiceType, servicetype_enum_values, to_servicetype_enum
from common.exceptions.ServiceException import ServiceException
from common.tools.service.ConstraintsChecker import check_constraints
from common.tools.service.DeviceCheckers import check_device_endpoint_exists
from common.tools.service.EndpointIdCheckers import check_endpoint_id
from common.tools.service.EnumCheckers import check_enum
from common.tools.service.ServiceCheckers import check_service_exists, check_service_not_exists
from service.proto.context_pb2 import Constraint
from service.proto.service_pb2 import Service, ServiceId
# For each method name, define acceptable service types. Empty set means accept all.
......@@ -43,29 +42,6 @@ def check_service_type(method_name : str, value : str) -> ServiceType:
def check_service_state(method_name : str, value : str) -> ServiceState:
return check_enum('ServiceState', method_name, value, to_servicestate_enum, ACCEPTED_SERVICE_STATES)
def check_service_constraint(
logger : logging.Logger, constraint_number : int, parent_name : str, constraint : Constraint,
add_constraints : Dict[str, Dict[str, Set[str]]]) -> Tuple[str, str]:
try:
constraint_type = chk_string('constraint[#{}].constraint_type'.format(constraint_number),
constraint.constraint_type,
allow_empty=False)
constraint_value = chk_string('constraint[#{}].constraint_value'.format(constraint_number),
constraint.constraint_value,
allow_empty=False)
except Exception as e:
logger.exception('Invalid arguments:')
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, str(e))
if constraint_type in add_constraints:
msg = 'Duplicated ConstraintType({}) in {}.'
msg = msg.format(constraint_type, parent_name)
raise ServiceException(grpc.StatusCode.INVALID_ARGUMENT, msg)
add_constraints[constraint_type] = constraint_value
return constraint_type, constraint_value
def check_service_request(
method_name : str, request : Service, database : Database, logger : logging.Logger
) -> Tuple[str, str, ServiceType, str, ServiceState, List[Endpoint], List[Tuple[str, str]]]:
......@@ -94,18 +70,13 @@ def check_service_request(
service_type = check_service_type(method_name, service_type)
service_state = check_service_state(method_name, service_state)
# ----- Parse constraints ------------------------------------------------------------------------------------------
parent_name = 'Context({})/Service({})'.format(context_id, service_id)
constraint_tuples : List[Tuple[str, str]] = check_constraints(logger, parent_name, request.constraint)
# ----- Check if service exists in database ------------------------------------------------------------------------
_check_service_exists(method_name, database, context_id, service_id)
# ----- Parse constraints ------------------------------------------------------------------------------------------
add_constraints : Dict[str, str] = {}
constraint_tuples : List[Tuple[str, str]] = []
for constraint_number,constraint in enumerate(request.constraint):
parent_name = 'Constraint(#{}) of Context({})/Service({})'.format(constraint_number, context_id, service_id)
constraint_type, constraint_value = check_service_constraint(
logger, constraint_number, parent_name, constraint, add_constraints)
constraint_tuples.append((constraint_type, constraint_value))
# ----- Parse endpoints and check if they exist in the database as device endpoints --------------------------------
add_topology_devices_endpoints : Dict[str, Dict[str, Set[str]]] = {}
db_endpoints : List[Endpoint] = []
......
# Build, tag, and push the Docker images to the GitLab Docker registry
build slice:
variables:
IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# Pull, execute, and run unitary tests for the Docker image from the GitLab registry
unit_test slice:
variables:
IMAGE_NAME: 'slice' # name of the microservice
IMAGE_NAME_TEST: 'slice-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build slice
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run -d -p 4040:4040 --name $IMAGE_NAME --network=teraflowbridge "$IMAGE_NAME:$IMAGE_TAG"
- docker ps -a
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "pytest --log-level=DEBUG --verbose $IMAGE_NAME/tests/test_unitary.py"
after_script:
- docker stop $IMAGE_NAME
- docker rm $IMAGE_NAME
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# Deployment of the service in Kubernetes Cluster
deploy slice:
stage: deploy
needs:
- build slice
- unit_test slice
- dependencies all
- integ_test execute
script:
- kubectl version
- kubectl get all
- kubectl apply -f "manifests/sliceservice.yaml"
- kubectl delete pods --selector app=sliceservice
- kubectl get all
import logging
# General settings
LOG_LEVEL = logging.WARNING
# gRPC settings
GRPC_SERVICE_PORT = 2020
GRPC_MAX_WORKERS = 10
GRPC_GRACE_PERIOD = 60
# Prometheus settings
METRICS_PORT = 9192
FROM python:3-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools
# Set working directory
WORKDIR /var/teraflow
# Create module sub-folders
RUN mkdir -p /var/teraflow/slice
# Get Python packages per module
COPY slice/requirements.in slice/requirements.in
RUN pip-compile --output-file=slice/requirements.txt slice/requirements.in
RUN python3 -m pip install -r slice/requirements.in
# Add files into working directory
COPY common/. common
COPY slice/. slice
# Start slice service
ENTRYPOINT ["python", "-m", "slice.service"]
import grpc, logging
from common.tools.client.RetryDecorator import retry, delay_exponential
from slice.proto.context_pb2 import Empty
from slice.proto.slice_pb2 import TransportSlice, SliceStatus
from slice.proto.slice_pb2_grpc import SliceServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
class SliceClient:
def __init__(self, address, port):
self.endpoint = '{}:{}'.format(address, port)
LOGGER.debug('Creating channel to {}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = SliceServiceStub(self.channel)
def close(self):
if(self.channel is not None): self.channel.close()
self.channel = None
self.stub = None
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def CreateUpdateSlice(self, request : TransportSlice) -> SliceStatus:
LOGGER.debug('CreateUpdateSlice request: {}'.format(request))
response = self.stub.CreateUpdateSlice(request)
LOGGER.debug('CreateUpdateSlice result: {}'.format(response))
return response
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def DeleteSlice(self, request : TransportSlice) -> Empty:
LOGGER.debug('DeleteSlice request: {}'.format(request))
response = self.stub.DeleteSlice(request)
LOGGER.debug('DeleteSlice result: {}'.format(response))
return response
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment