Skip to content
Snippets Groups Projects
Commit bce8d76a authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Service component:

- partial integration of PathComp with Service
- ongoing work
parent 68a97daa
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!4Compute component:
......@@ -21,4 +21,5 @@ RCFILE=$PROJECTDIR/coverage/.coveragerc
# Run unitary tests and analyze coverage of code at same time
coverage run --rcfile=$RCFILE --append -m pytest --log-level=INFO --verbose \
service/tests/test_unitary_dependency_resolver.py \
service/tests/test_unitary.py
......@@ -56,3 +56,11 @@ class OperationFailedException(ServiceException):
details = 'Operation({:s}) failed'.format(str(operation))
super().__init__(grpc.StatusCode.INTERNAL, details, extra_details=extra_details)
class NotImplementedException(ServiceException):
def __init__(
self, operation : str, extra_details : Union[str, Iterable[str]] = None
) -> None:
details = 'Operation({:s}) not implemented'.format(str(operation))
super().__init__(grpc.StatusCode.UNIMPLEMENTED, details, extra_details=extra_details)
......@@ -64,6 +64,7 @@ RUN python3 -m pip install -r requirements.txt
WORKDIR /var/teraflow
COPY src/context/. context/
COPY src/device/. device/
COPY src/pathcomp/frontend/. pathcomp/frontend/
COPY src/service/. service/
# Start the service
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import graphlib
from enum import Enum
from typing import Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import Connection, ConnectionId, Service, ServiceId
from common.proto.pathcomp_pb2 import PathCompReply
# Compose Directed Acyclic Graph of dependencies between connections and services
# retrieved by PathComp to create them in the appropriate order.
class ObjectType(Enum):
CONNECTION = 'connection'
SERVICE = 'service'
ObjectKey = Tuple[ObjectType, str]
ObjectId = Union[ConnectionId, ServiceId]
ObjectData = Union[Connection, Service]
ObjectItem = Tuple[ObjectId, Optional[ObjectData]]
ObjectDict = Dict[ObjectKey, ObjectItem]
Resolution = List[Tuple[ObjectKey, ObjectItem]]
def get_connection_key(connection_id : ConnectionId) -> ObjectKey:
connection_uuid = connection_id.connection_uuid.uuid
return ObjectType.CONNECTION.value, connection_uuid
def get_service_key(service_id : ServiceId) -> ObjectKey:
context_uuid = service_id.context_id.context_uuid.uuid
service_uuid = service_id.service_uuid.uuid
return ObjectType.SERVICE.value, '/'.join([context_uuid, service_uuid])
def resolve_dependencies(pathcomp_reply : PathCompReply) -> Resolution:
dag = graphlib.TopologicalSorter()
objects : ObjectDict = dict()
for service in pathcomp_reply.services:
service_key = get_service_key(service.service_id)
objects[service_key] = (service.service_id, service)
for connection in pathcomp_reply.connections:
connection_key = get_connection_key(connection.connection_id)
objects[connection_key] = (connection.connection_id, connection)
# the connection's service depends on the connection
service_key = get_service_key(connection.service_id)
dag.add(service_key, connection_key)
if service_key not in objects: objects[service_key] = (connection.service_id, None)
# the connection depends on these sub-services
for sub_service_id in connection.sub_service_ids:
sub_service_key = get_service_key(sub_service_id)
dag.add(connection_key, sub_service_key)
if sub_service_key not in objects: objects[sub_service_key] = (sub_service_id, None)
resolution : Resolution = list()
for item_key in dag.static_order():
item_tuple = objects.get(item_key)
resolution.append((item_key, item_tuple))
return resolution
......@@ -13,24 +13,29 @@
# limitations under the License.
from typing import Dict, List
import grpc, json, logging
import graphlib, grpc, json, logging
from common.orm.Database import Database
from common.orm.HighLevel import get_object
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import Empty, Service, ServiceId
from common.proto.context_pb2 import ConnectionId, Empty, Service, ServiceId
from common.proto.pathcomp_pb2 import PathCompRequest
from common.proto.service_pb2_grpc import ServiceServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.rpc_method_wrapper.ServiceExceptions import AlreadyExistsException, InvalidArgumentException, NotFoundException, NotImplementedException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from common.tools.object_factory.Connection import json_connection_id
from common.tools.object_factory.Service import json_service_id
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from pathcomp.frontend.client.PathCompClient import PathCompClient
from service.service.DependencyResolver import ObjectType, resolve_dependencies
from service.service.database.DeviceModel import DeviceModel
from .database.DatabaseServiceTools import (
sync_service_from_context, sync_service_to_context, update_service_in_local_database)
from .database.ServiceModel import ServiceModel
from .path_computation_element.PathComputationElement import PathComputationElement, dump_connectivity
from .service_handler_api.ServiceHandlerFactory import ServiceHandlerFactory
from .Tools import delete_service, sync_devices_from_context, update_service
from .Tools import delete_service, get_connection, get_service, sync_devices_from_context, update_service
LOGGER = logging.getLogger(__name__)
......@@ -41,8 +46,6 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ServiceServiceServicerImpl(ServiceServiceServicer):
def __init__(self, database : Database, service_handler_factory : ServiceHandlerFactory) -> None:
LOGGER.debug('Creating Servicer...')
self.context_client = ContextClient()
self.device_client = DeviceClient()
self.database = database
self.service_handler_factory = service_handler_factory
LOGGER.debug('Servicer Created')
......@@ -84,14 +87,19 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
extra_details='RPC method CreateService does not accept Config Rules. '\
'Config Rules should be configured after creating the service.')
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service,_ = update_service_in_local_database(self.database, request)
# check that service does not exist
context_client = ContextClient()
current_service = get_service(context_client, request.service_id)
if current_service is not None:
context_uuid = request.service_id.context_id.context_uuid.uuid
service_uuid = request.service_id.service_uuid.uuid
raise AlreadyExistsException(
'Service', service_uuid, extra_details='context_uuid={:s}'.format(str(context_uuid)))
LOGGER.info('[CreateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
sync_service_to_context(db_service, self.context_client)
return ServiceId(**db_service.dump_id())
# just create the service in the database to lock the service_id
# update will perform changes on the resources
service_id = context_client.SetService(request)
return service_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
def UpdateService(self, request : Service, context : grpc.ServicerContext) -> ServiceId:
......@@ -101,54 +109,53 @@ class ServiceServiceServicerImpl(ServiceServiceServicer):
service_uuid = service_id.service_uuid.uuid
service_context_uuid = service_id.context_id.context_uuid.uuid
pce = PathComputationElement()
pce.load_topology(self.context_client)
pce.load_connectivity(self.context_client, service_id)
#pce.dump_topology_to_file('../data/topo.dot')
#pce.dump_connectivity_to_file('../data/conn-before.txt')
connectivity = pce.route_service(request)
#pce.dump_connectivity_to_file('../data/conn-after.txt')
LOGGER.info('[UpdateService] connectivity = {:s}'.format(str(dump_connectivity(connectivity))))
if connectivity is None:
# just update local database and context
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
LOGGER.info('[UpdateService] before db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
db_devices : Dict[str, DeviceModel] = sync_devices_from_context(
self.context_client, self.database, db_service, request.service_endpoint_ids)
LOGGER.info('[UpdateService] db_devices[{:d}] = {:s}'.format(
len(db_devices), str({
device_uuid:db_device.dump(include_config_rules=True, include_drivers=True, include_endpoints=True)
for device_uuid,db_device in db_devices.items()
})))
sync_service_from_context(service_context_uuid, service_uuid, self.context_client, self.database)
db_service,_ = update_service_in_local_database(self.database, request)
LOGGER.info('[UpdateService] after db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
sync_service_to_context(db_service, self.context_client)
else:
for sub_service, sub_connections in connectivity.get('requirements', []):
for sub_connection in sub_connections:
update_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
sub_service, sub_connection)
for connection in connectivity.get('connections'):
db_service = update_service(
self.database, self.context_client, self.device_client, self.service_handler_factory,
request, connection)
str_service_key = key_to_str([service_context_uuid, service_uuid])
db_service = get_object(self.database, ServiceModel, str_service_key, raise_if_not_found=False)
if db_service is None: raise NotFoundException('Service', str_service_key)
LOGGER.info('[UpdateService] db_service = {:s}'.format(str(db_service.dump(
include_endpoint_ids=True, include_constraints=True, include_config_rules=True))))
return ServiceId(**db_service.dump_id())
pathcomp_request = PathCompRequest()
pathcomp_request.services.append(request)
pathcomp_request.services.k_shortest_path.k_inspection = 5
pathcomp_request.services.k_shortest_path.k_return = 5
pathcomp = PathCompClient()
pathcomp_response = pathcomp.Compute(pathcomp_request)
# convert from a unordered lists of services and connections to a list of ordered items
# that fulfill interdependencies among them. E.g., a service cannot be created if connections
# supporting that service still does not exist.
resolution = resolve_dependencies(pathcomp_response)
# implement changes
context_client = ContextClient()
device_client = DeviceClient()
for (obj_type, obj_key), (grpc_objid, grpc_obj) in resolution:
if grpc_obj is None:
# check if the resource already exists
if obj_type == ObjectType.CONNECTION.value:
connection = get_connection(context_client, grpc_objid)
if connection is None: raise NotFoundException('Connection', obj_key)
elif obj_type == ObjectType.SERVICE.value:
service = get_service(context_client, grpc_objid)
if service is None: raise NotFoundException('Service', obj_key)
else:
MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
raise NotImplementedException('Empty Dependency', extra_details=msg_extra_details)
else:
# create/update the resource
if obj_type == ObjectType.CONNECTION.value:
update_connection(context_client, device_client, self.service_handler_factory, grpc_obj)
context_client.SetConnection(grpc_obj)
elif obj_type == ObjectType.SERVICE.value:
update_service(context_client, device_client, self.service_handler_factory, grpc_obj)
context_client.SetService(grpc_obj)
else:
MSG_EXTRA_DETAILS = 'obj_type={:s} obj_key={:s} grpc_objid={:s} grpc_obj={:s}'
str_grpc_obj = 'None' if grpc_obj is None else grpc_message_to_json_string(grpc_obj)
str_grpc_objid = 'None' if grpc_objid is None else grpc_message_to_json_string(grpc_objid)
msg_extra_details = MSG_EXTRA_DETAILS.format(obj_type, obj_key, str_grpc_objid, str_grpc_obj)
raise NotImplementedException('Specified Dependency', extra_details=msg_extra_details)
return request.service_id
@safe_and_metered_rpc_method(METRICS, LOGGER)
def DeleteService(self, request : ServiceId, context : grpc.ServicerContext) -> Empty:
......
......@@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import grpc, logging
from typing import Any, Dict, List, Optional, Tuple
from common.orm.Database import Database
from common.orm.HighLevel import get_object, get_related_objects
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import (
ConfigRule, Connection, Constraint, EndPointId, Service, ServiceId, ServiceStatusEnum)
ConfigRule, Connection, ConnectionId, Constraint, EndPointId, Service, ServiceId, ServiceStatusEnum)
from common.rpc_method_wrapper.ServiceExceptions import (
InvalidArgumentException, NotFoundException, OperationFailedException)
from context.client.ContextClient import ContextClient
......@@ -42,6 +42,22 @@ from .service_handler_api.Tools import (
LOGGER = logging.getLogger(__name__)
def get_connection(context_client : ContextClient, connection_id : ConnectionId) -> Optional[Connection]:
try:
connection : Connection = context_client.GetConnection(connection_id)
return connection
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
return None
def get_service(context_client : ContextClient, service_id : ServiceId) -> Optional[Service]:
try:
service : Service = context_client.GetService(service_id)
return service
except grpc.RpcError as e:
if e.code() != grpc.StatusCode.NOT_FOUND: raise # pylint: disable=no-member
return None
def sync_devices_from_context(
context_client : ContextClient, database : Database, db_service : Optional[ServiceModel],
service_endpoint_ids : List[EndPointId]
......
......@@ -37,10 +37,12 @@ def main():
LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.PATHCOMP, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
......
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, operator
from common.proto.context_pb2 import Connection, Service
from common.proto.pathcomp_pb2 import PathCompReply
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.service.DependencyResolver import resolve_dependencies
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def test_dependency_resolver():
# test: add services and connections that depend on each other
# then, check if they are properly resolved.
# - service MAIN, depends on connection PKT-1, TAPI, and PKT-2
# - connection PKT-1, depends on nothing
# - connection TAPI, depends on service TAPI-1 and TAPI-2
# - connection PKT-2, depends on nothing
# - service TAPI-1, depends on connection TAPI-1
# - service TAPI-2, depends on connection TAPI-2
pathcomp_reply = PathCompReply()
service_main = pathcomp_reply.services.add()
service_main.service_id.context_id.context_uuid.uuid = 'admin'
service_main.service_id.service_uuid.uuid = 'MAIN'
service_tapi1 = pathcomp_reply.services.add()
service_tapi1.service_id.context_id.context_uuid.uuid = 'admin'
service_tapi1.service_id.service_uuid.uuid = 'TAPI-1'
service_tapi2 = pathcomp_reply.services.add()
service_tapi2.service_id.context_id.context_uuid.uuid = 'admin'
service_tapi2.service_id.service_uuid.uuid = 'TAPI-2'
connection_pkt1 = pathcomp_reply.connections.add()
connection_pkt1.connection_id.connection_uuid.uuid = 'PKT-1'
connection_pkt1.service_id.CopyFrom(service_main.service_id)
connection_tapi = pathcomp_reply.connections.add()
connection_tapi.connection_id.connection_uuid.uuid = 'TAPI'
connection_tapi.service_id.CopyFrom(service_main.service_id)
connection_pkt2 = pathcomp_reply.connections.add()
connection_pkt2.connection_id.connection_uuid.uuid = 'PKT-2'
connection_pkt2.service_id.CopyFrom(service_main.service_id)
connection_tapi1 = pathcomp_reply.connections.add()
connection_tapi1.connection_id.connection_uuid.uuid = 'TAPI-1'
connection_tapi1.service_id.CopyFrom(service_tapi1.service_id)
connection_tapi.sub_service_ids.append(service_tapi1.service_id)
connection_tapi2 = pathcomp_reply.connections.add()
connection_tapi2.connection_id.connection_uuid.uuid = 'TAPI-2'
connection_tapi2.service_id.CopyFrom(service_tapi2.service_id)
connection_tapi.sub_service_ids.append(service_tapi2.service_id)
LOGGER.info('pathcomp_reply={:s}'.format(grpc_message_to_json_string(pathcomp_reply)))
resolution = resolve_dependencies(pathcomp_reply)
LOGGER.info('resolution={:s}'.format(str(list(map(operator.itemgetter(0), resolution)))))
CORRECT_RESOLUTION_KEYS = [
('connection', 'PKT-1' ),
('connection', 'PKT-2' ),
('connection', 'TAPI-1' ),
('connection', 'TAPI-2' ),
('service' , 'admin/TAPI-1'),
('service' , 'admin/TAPI-2'),
('connection', 'TAPI' ),
('service' , 'admin/MAIN' ),
]
for (resolved_key,(resolved_objid, resolved_obj)),correct_key in zip(resolution, CORRECT_RESOLUTION_KEYS):
assert resolved_key == correct_key
assert resolved_obj is not None
if resolved_key[0] == 'connection':
assert isinstance(resolved_obj, Connection)
assert resolved_objid == resolved_obj.connection_id
connection_key = resolved_obj.connection_id.connection_uuid.uuid
assert resolved_key[1] == connection_key
elif resolved_key[0] == 'service':
assert isinstance(resolved_obj, Service)
assert resolved_objid == resolved_obj.service_id
context_uuid = resolved_obj.service_id.context_id.context_uuid.uuid
service_uuid = resolved_obj.service_id.service_uuid.uuid
service_key = '/'.join([context_uuid, service_uuid])
assert resolved_key[1] == service_key
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment