diff --git a/proto/forecaster.proto b/proto/forecaster.proto index dc2de75eaad355a833348687b62686ad9637c93d..6d1590e1b94b69c1b40da65366f241a6742b9c43 100644 --- a/proto/forecaster.proto +++ b/proto/forecaster.proto @@ -18,52 +18,27 @@ package forecaster; import "context.proto"; service ForecasterService { - rpc ComputeTopologyForecast(ForecastTopology) returns (ListLinkCapacity) {} - rpc ComputeLinkForecast (context.LinkId ) returns (LinkCapacity ) {} + rpc ForecastLinkCapacity (ForecastLinkCapacityRequest ) returns (ForecastLinkCapacityReply ) {} + rpc ForecastTopologyCapacity(ForecastTopologyCapacityRequest) returns (ForecastTopologyCapacityReply) {} } -message ForecastRequest { - oneof uuid { - context.TopologyId topology_id = 1; - context.LinkId link_id = 2; - } - - context.TopogyId topology_id = 1; - float forecast_window_seconds = 2; -} - - -message SingleForecast { - context.Timestamp timestamp= 1; - double value = 2; -} - -message Forecast { - oneof uuid { - context.TopologyId topologyId= 1; - context.LinkId linkId = 2; - } - repeated SingleForecast forecast = 3; -} - -enum AvailabilityPredictionEnum { - FORECASTED_AVAILABILITY = 0; - FORECASTED_UNAVAILABILITY = 1; -} - -message ForecastPrediction { - AvailabilityPredictionEnum prediction = 1; +message ForecastLinkCapacityRequest { + context.LinkId link_id = 1; + double forecast_window_seconds = 2; } - - -message LinkCapacity { +message ForecastLinkCapacityReply { context.LinkId link_id = 1; float total_capacity_gbps = 2; float current_capacity_gbps = 3; float forecasted_capacity_gbps = 4; } -message ListLinkCapacity { - repeated LinkCapacity forecasted_link_capacities = 1; +message ForecastTopologyCapacityRequest { + context.TopologyId topology_id = 1; + double forecast_window_seconds = 2; +} + +message ForecastTopologyCapacityReply { + repeated ForecastLinkCapacityReply link_capacities = 1; } diff --git a/src/common/Constants.py b/src/common/Constants.py index 423f2558b71b189b9e771e5af94968d28f8777c0..5af1d7042a8d00206ab96d59a16e920dee40375a 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -57,6 +57,7 @@ class ServiceNameEnum(Enum): OPTICALATTACKMITIGATOR = 'opticalattackmitigator' CACHING = 'caching' TE = 'te' + FORECASTER = 'forecaster' # Used for test and debugging only DLT_GATEWAY = 'dltgateway' @@ -82,6 +83,7 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.INTERDOMAIN .value : 10010, ServiceNameEnum.PATHCOMP .value : 10020, ServiceNameEnum.TE .value : 10030, + ServiceNameEnum.FORECASTER .value : 10040, # Used for test and debugging only ServiceNameEnum.DLT_GATEWAY .value : 50051, diff --git a/src/forecaster/Dockerfile b/src/forecaster/Dockerfile index 6566625527f8ceaa8de4639d558c92572c4835cb..a0d4cd5613a44f663c9f329e3bc4b51e3550e594 100644 --- a/src/forecaster/Dockerfile +++ b/src/forecaster/Dockerfile @@ -54,17 +54,17 @@ RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; # Create component sub-folders, get specific Python packages -RUN mkdir -p /var/teraflow/device -WORKDIR /var/teraflow/device -COPY src/device/requirements.in requirements.in +RUN mkdir -p /var/teraflow/forecaster +WORKDIR /var/teraflow/forecaster +COPY src/forecaster/requirements.in requirements.in RUN pip-compile --quiet --output-file=requirements.txt requirements.in RUN python3 -m pip install -r requirements.txt # Add component files into working directory WORKDIR /var/teraflow COPY src/context/. context/ -COPY src/device/. device/ +COPY src/forecaster/. forecaster/ COPY src/monitoring/. monitoring/ # Start the service -ENTRYPOINT ["python", "-m", "device.service"] +ENTRYPOINT ["python", "-m", "forecaster.service"] diff --git a/src/forecaster/client/ForecasterClient.py b/src/forecaster/client/ForecasterClient.py index ed33650689547c2ded4e48ebb442215d2887f542..17e0beb339f5dbe748a211c4286a98a376ed6084 100644 --- a/src/forecaster/client/ForecasterClient.py +++ b/src/forecaster/client/ForecasterClient.py @@ -15,9 +15,11 @@ import grpc, logging from common.Constants import ServiceNameEnum from common.Settings import get_service_host, get_service_port_grpc -from common.proto.context_pb2 import Device, DeviceConfig, DeviceId, Empty -from common.proto.device_pb2 import MonitoringSettings -from common.proto.device_pb2_grpc import DeviceServiceStub +from common.proto.forecaster_pb2 import ( + ForecastLinkCapacityReply, ForecastLinkCapacityRequest, + ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest +) +from common.proto.forecaster_pb2_grpc import ForecasterServiceStub from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string @@ -28,8 +30,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class ForecasterClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.DEVICE) - if not port: port = get_service_port_grpc(ServiceNameEnum.DEVICE) + if not host: host = get_service_host(ServiceNameEnum.FORECASTER) + if not port: port = get_service_port_grpc(ServiceNameEnum.FORECASTER) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None @@ -39,7 +41,7 @@ class ForecasterClient: def connect(self): self.channel = grpc.insecure_channel(self.endpoint) - self.stub = DeviceServiceStub(self.channel) + self.stub = ForecasterServiceStub(self.channel) def close(self): if self.channel is not None: self.channel.close() @@ -47,36 +49,15 @@ class ForecasterClient: self.stub = None @RETRY_DECORATOR - def AddDevice(self, request : Device) -> DeviceId: - LOGGER.debug('AddDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.AddDevice(request) - LOGGER.debug('AddDevice result: {:s}'.format(grpc_message_to_json_string(response))) + def ForecastLinkCapacity(self, request : ForecastLinkCapacityRequest) -> ForecastLinkCapacityReply: + LOGGER.debug('ForecastLinkCapacity request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ForecastLinkCapacity(request) + LOGGER.debug('ForecastLinkCapacity result: {:s}'.format(grpc_message_to_json_string(response))) return response @RETRY_DECORATOR - def ConfigureDevice(self, request : Device) -> DeviceId: - LOGGER.debug('ConfigureDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.ConfigureDevice(request) - LOGGER.debug('ConfigureDevice result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def DeleteDevice(self, request : DeviceId) -> Empty: - LOGGER.debug('DeleteDevice request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.DeleteDevice(request) - LOGGER.debug('DeleteDevice result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def GetInitialConfig(self, request : DeviceId) -> DeviceConfig: - LOGGER.debug('GetInitialConfig request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.GetInitialConfig(request) - LOGGER.debug('GetInitialConfig result: {:s}'.format(grpc_message_to_json_string(response))) - return response - - @RETRY_DECORATOR - def MonitorDeviceKpi(self, request : MonitoringSettings) -> Empty: - LOGGER.debug('MonitorDeviceKpi request: {:s}'.format(grpc_message_to_json_string(request))) - response = self.stub.MonitorDeviceKpi(request) - LOGGER.debug('MonitorDeviceKpi result: {:s}'.format(grpc_message_to_json_string(response))) + def ForecastTopologyCapacity(self, request : ForecastTopologyCapacityRequest) -> ForecastTopologyCapacityReply: + LOGGER.debug('ForecastTopologyCapacity request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ForecastTopologyCapacity(request) + LOGGER.debug('ForecastTopologyCapacity result: {:s}'.format(grpc_message_to_json_string(response))) return response diff --git a/src/forecaster/greet_client.py b/src/forecaster/greet_client.py deleted file mode 100644 index e5c42439d86e574b562c9df7b78f8e5a89feda08..0000000000000000000000000000000000000000 --- a/src/forecaster/greet_client.py +++ /dev/null @@ -1,23 +0,0 @@ -import greet_pb2_grpc -import greet_pb2 -import grpc - -#from tests import greet_pb2 -#from tests import greet_pb2_grpc - -import datetime as dt - -def run(): - with grpc.insecure_channel('localhost:50051') as channel: - stub = greet_pb2_grpc.GreeterStub(channel) - print("The stub has been created") - forecastRequest = greet_pb2.ForecastTopology(forecast_window_seconds = 36000, topology_id = "1" ) - print("The request has been sent to the client") - hello_reply = stub.ComputeTopologyForecast(forecastRequest) - print("The response has been received") - print(hello_reply) - - - -if __name__ == "__main__": - run() \ No newline at end of file diff --git a/src/forecaster/greet_pb2.py b/src/forecaster/greet_pb2.py deleted file mode 100644 index 800b3edd823b8cd6eb938c62d76b70e45649bb3b..0000000000000000000000000000000000000000 --- a/src/forecaster/greet_pb2.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: greet.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bgreet.proto\x12\x05greet\"H\n\x10\x46orecastTopology\x12\x1f\n\x17\x66orecast_window_seconds\x18\x01 \x01(\x02\x12\x13\n\x0btopology_id\x18\x02 \x01(\t\">\n\rForecastReply\x12-\n\x10LinkCapacityList\x18\x01 \x03(\x0b\x32\x13.greet.LinkCapacity\"Q\n\x14\x46orecastDelayedReply\x12\x0f\n\x07message\x18\x01 \x01(\t\x12(\n\x07request\x18\x02 \x03(\x0b\x32\x17.greet.ForecastTopology\"Z\n\x0cLinkCapacity\x12\x10\n\x08linkuuid\x18\x01 \x01(\t\x12\x1b\n\x13\x66orecasted_capacity\x18\x02 \x01(\x02\x12\x1b\n\x13total_capacity_gbps\x18\x03 \x01(\x02\x32S\n\x07Greeter\x12H\n\x17\x43omputeTopologyForecast\x12\x17.greet.ForecastTopology\x1a\x14.greet.ForecastReplyb\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'greet_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_FORECASTTOPOLOGY']._serialized_start=22 - _globals['_FORECASTTOPOLOGY']._serialized_end=94 - _globals['_FORECASTREPLY']._serialized_start=96 - _globals['_FORECASTREPLY']._serialized_end=158 - _globals['_FORECASTDELAYEDREPLY']._serialized_start=160 - _globals['_FORECASTDELAYEDREPLY']._serialized_end=241 - _globals['_LINKCAPACITY']._serialized_start=243 - _globals['_LINKCAPACITY']._serialized_end=333 - _globals['_GREETER']._serialized_start=335 - _globals['_GREETER']._serialized_end=418 -# @@protoc_insertion_point(module_scope) diff --git a/src/forecaster/greet_pb2.pyi b/src/forecaster/greet_pb2.pyi deleted file mode 100644 index 5197df5e81bb0d538a0013f4b5b85963cdf3924f..0000000000000000000000000000000000000000 --- a/src/forecaster/greet_pb2.pyi +++ /dev/null @@ -1,30 +0,0 @@ -from google.protobuf.internal import containers as _containers -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Mapping, Optional as _Optional, Union as _Union - -DESCRIPTOR: _descriptor.FileDescriptor - -class ForecastRequest(_message.Message): - __slots__ = ["linkid"] - LINKID_FIELD_NUMBER: _ClassVar[int] - linkid: str - def __init__(self, linkid: _Optional[str] = ...) -> None: ... - -class ForecastReply(_message.Message): - __slots__ = ["message", "errorRate", "calculationTime"] - MESSAGE_FIELD_NUMBER: _ClassVar[int] - ERRORRATE_FIELD_NUMBER: _ClassVar[int] - CALCULATIONTIME_FIELD_NUMBER: _ClassVar[int] - message: str - errorRate: float - calculationTime: float - def __init__(self, message: _Optional[str] = ..., errorRate: _Optional[float] = ..., calculationTime: _Optional[float] = ...) -> None: ... - -class ForecastDelayedReply(_message.Message): - __slots__ = ["message", "request"] - MESSAGE_FIELD_NUMBER: _ClassVar[int] - REQUEST_FIELD_NUMBER: _ClassVar[int] - message: str - request: _containers.RepeatedCompositeFieldContainer[ForecastRequest] - def __init__(self, message: _Optional[str] = ..., request: _Optional[_Iterable[_Union[ForecastRequest, _Mapping]]] = ...) -> None: ... diff --git a/src/forecaster/greet_pb2_grpc.py b/src/forecaster/greet_pb2_grpc.py deleted file mode 100644 index f0a69cca320d518eb750e6a4d36c9e4ef5792d1a..0000000000000000000000000000000000000000 --- a/src/forecaster/greet_pb2_grpc.py +++ /dev/null @@ -1,70 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -import greet_pb2 as greet__pb2 - - -class GreeterStub(object): - """The greeting service definition. - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.ComputeTopologyForecast = channel.unary_unary( - '/greet.Greeter/ComputeTopologyForecast', - request_serializer=greet__pb2.ForecastTopology.SerializeToString, - response_deserializer=greet__pb2.ForecastReply.FromString, - ) - - -class GreeterServicer(object): - """The greeting service definition. - """ - - def ComputeTopologyForecast(self, request, context): - """Unary - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_GreeterServicer_to_server(servicer, server): - rpc_method_handlers = { - 'ComputeTopologyForecast': grpc.unary_unary_rpc_method_handler( - servicer.ComputeTopologyForecast, - request_deserializer=greet__pb2.ForecastTopology.FromString, - response_serializer=greet__pb2.ForecastReply.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'greet.Greeter', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class Greeter(object): - """The greeting service definition. - """ - - @staticmethod - def ComputeTopologyForecast(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/greet.Greeter/ComputeTopologyForecast', - greet__pb2.ForecastTopology.SerializeToString, - greet__pb2.ForecastReply.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/forecaster/service/DeviceService.py b/src/forecaster/service/DeviceService.py deleted file mode 100644 index 6d27ef96eef4b93fa7d6ca294d1fd645e815af03..0000000000000000000000000000000000000000 --- a/src/forecaster/service/DeviceService.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from common.Constants import ServiceNameEnum -from common.Settings import get_service_port_grpc -from common.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server -from common.tools.service.GenericGrpcService import GenericGrpcService -from .driver_api.DriverInstanceCache import DriverInstanceCache -from .DeviceServiceServicerImpl import DeviceServiceServicerImpl -from .monitoring.MonitoringLoops import MonitoringLoops - -# Custom gRPC settings -# Multiple clients might keep connections alive waiting for RPC methods to be executed. -# Requests needs to be serialized to ensure correct device configurations -GRPC_MAX_WORKERS = 200 - -class DeviceService(GenericGrpcService): - def __init__(self, driver_instance_cache : DriverInstanceCache, cls_name: str = __name__) -> None: - port = get_service_port_grpc(ServiceNameEnum.DEVICE) - super().__init__(port, max_workers=GRPC_MAX_WORKERS, cls_name=cls_name) - self.monitoring_loops = MonitoringLoops() - self.device_servicer = DeviceServiceServicerImpl(driver_instance_cache, self.monitoring_loops) - - def install_servicers(self): - self.monitoring_loops.start() - add_DeviceServiceServicer_to_server(self.device_servicer, self.server) - - def stop(self): - super().stop() - self.monitoring_loops.stop() diff --git a/src/forecaster/service/DeviceServiceServicerImpl.py b/src/forecaster/service/DeviceServiceServicerImpl.py deleted file mode 100644 index 8b7855be177bbb11b46e8d21809b7f83a0a64c86..0000000000000000000000000000000000000000 --- a/src/forecaster/service/DeviceServiceServicerImpl.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import grpc, logging, os, time -from typing import Dict -from prometheus_client import Histogram -from common.Constants import ServiceNameEnum -from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, get_env_var_name -from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, safe_and_metered_rpc_method -from common.method_wrappers.ServiceExceptions import NotFoundException, OperationFailedException -from common.proto.context_pb2 import ( - Device, DeviceConfig, DeviceDriverEnum, DeviceId, DeviceOperationalStatusEnum, Empty, Link) -from common.proto.device_pb2 import MonitoringSettings -from common.proto.device_pb2_grpc import DeviceServiceServicer -from common.tools.context_queries.Device import get_device -from common.tools.mutex_queues.MutexQueues import MutexQueues -from context.client.ContextClient import ContextClient - -LOGGER = logging.getLogger(__name__) - -METRICS_POOL = MetricsPool('Device', 'RPC') - -METRICS_POOL_DETAILS = MetricsPool('Device', 'execution', labels={ - 'driver': '', 'operation': '', 'step': '', -}) - -class DeviceServiceServicerImpl(DeviceServiceServicer): - def __init__(self, driver_instance_cache : DriverInstanceCache, monitoring_loops : MonitoringLoops) -> None: - LOGGER.debug('Creating Servicer...') - self.driver_instance_cache = driver_instance_cache - self.monitoring_loops = monitoring_loops - self.mutex_queues = MutexQueues() - LOGGER.debug('Servicer Created') - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def AddDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: - return DeviceId() - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId: - return DeviceId() - - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: - return Empty() diff --git a/src/forecaster/service/ForecasterService.py b/src/forecaster/service/ForecasterService.py new file mode 100644 index 0000000000000000000000000000000000000000..944ceb01e1429df4e124d28993cf001bb683aeb5 --- /dev/null +++ b/src/forecaster/service/ForecasterService.py @@ -0,0 +1,28 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.proto.forecaster_pb2_grpc import add_ForecasterServiceServicer_to_server +from common.tools.service.GenericGrpcService import GenericGrpcService +from .ForecasterServiceServicerImpl import ForecasterServiceServicerImpl + +class ForecasterService(GenericGrpcService): + def __init__(self, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.FORECASTER) + super().__init__(port, cls_name=cls_name) + self.forecaster_servicer = ForecasterServiceServicerImpl() + + def install_servicers(self): + add_ForecasterServiceServicer_to_server(self.forecaster_servicer, self.server) diff --git a/src/forecaster/service/ForecasterServiceServicerImpl.py b/src/forecaster/service/ForecasterServiceServicerImpl.py new file mode 100644 index 0000000000000000000000000000000000000000..8466198a4cea6bca7c3b093349158361d911ab2d --- /dev/null +++ b/src/forecaster/service/ForecasterServiceServicerImpl.py @@ -0,0 +1,44 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc, logging +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method +from common.proto.forecaster_pb2 import ( + ForecastLinkCapacityReply, ForecastLinkCapacityRequest, + ForecastTopologyCapacityReply, ForecastTopologyCapacityRequest +) +from common.proto.forecaster_pb2_grpc import ForecasterServiceServicer +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) + +METRICS_POOL = MetricsPool('Forecaster', 'RPC') + +class ForecasterServiceServicerImpl(ForecasterServiceServicer): + def __init__(self) -> None: + LOGGER.debug('Creating Servicer...') + LOGGER.debug('Servicer Created') + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def ForecastLinkCapacity( + self, request : ForecastLinkCapacityRequest, context : grpc.ServicerContext + ) -> ForecastLinkCapacityReply: + return ForecastLinkCapacityReply() + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def ForecastTopologyCapacity( + self, request : ForecastTopologyCapacityRequest, context : grpc.ServicerContext + ) -> ForecastTopologyCapacityReply: + return ForecastTopologyCapacityReply() diff --git a/src/forecaster/service/__main__.py b/src/forecaster/service/__main__.py index 84dee26905fc404cf1de96f7a1e11e4a59a034b6..780fe5f8f3386b571c37ce64a0b95578e9641110 100644 --- a/src/forecaster/service/__main__.py +++ b/src/forecaster/service/__main__.py @@ -18,10 +18,7 @@ from common.Constants import ServiceNameEnum from common.Settings import ( ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables) -from .DeviceService import DeviceService -from .driver_api.DriverFactory import DriverFactory -from .driver_api.DriverInstanceCache import DriverInstanceCache, preload_drivers -from .drivers import DRIVERSfff +from .ForecasterService import ForecasterService terminate = threading.Event() LOGGER : logging.Logger = None @@ -41,8 +38,10 @@ def main(): LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), - get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) signal.signal(signal.SIGINT, signal_handler) @@ -54,24 +53,15 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - # Initialize Driver framework - driver_factory = DriverFactory(DRIVERS) - driver_instance_cache = DriverInstanceCache(driver_factory) - - # Starting device service - grpc_service = DeviceService(driver_instance_cache) + # Starting Forecaster service + grpc_service = ForecasterService() grpc_service.start() - # Initialize drivers with existing devices in context - LOGGER.info('Pre-loading drivers...') - preload_drivers(driver_instance_cache) - # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass LOGGER.info('Terminating...') grpc_service.stop() - driver_instance_cache.terminate() LOGGER.info('Bye') return 0 diff --git a/src/forecaster/tests/MockService_Dependencies.py b/src/forecaster/tests/MockService_Dependencies.py new file mode 100644 index 0000000000000000000000000000000000000000..858db17a9e35e30ea93c965815b39a068c696b4b --- /dev/null +++ b/src/forecaster/tests/MockService_Dependencies.py @@ -0,0 +1,49 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Union +from common.Constants import ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name +from common.proto.context_pb2_grpc import add_ContextServiceServicer_to_server +from common.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server +from common.tests.MockServicerImpl_Context import MockServicerImpl_Context +from common.tests.MockServicerImpl_Monitoring import MockServicerImpl_Monitoring +from common.tools.service.GenericGrpcService import GenericGrpcService + +LOCAL_HOST = '127.0.0.1' + +SERVICE_CONTEXT = ServiceNameEnum.CONTEXT +SERVICE_MONITORING = ServiceNameEnum.MONITORING + +class MockService_Dependencies(GenericGrpcService): + # Mock Service implementing Context, Device, and Service to simplify unitary tests of PathComp + + def __init__(self, bind_port: Union[str, int]) -> None: + super().__init__(bind_port, LOCAL_HOST, enable_health_servicer=False, cls_name='MockService') + + # pylint: disable=attribute-defined-outside-init + def install_servicers(self): + self.context_servicer = MockServicerImpl_Context() + add_ContextServiceServicer_to_server(self.context_servicer, self.server) + + self.monitoring_servicer = MockServicerImpl_Monitoring() + add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server) + + def configure_env_vars(self): + os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) + os.environ[get_env_var_name(SERVICE_CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) + + os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_HOST )] = str(self.bind_address) + os.environ[get_env_var_name(SERVICE_MONITORING, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(self.bind_port) diff --git a/src/forecaster/tests/PrepareTestScenario.py b/src/forecaster/tests/PrepareTestScenario.py new file mode 100644 index 0000000000000000000000000000000000000000..7d383f616cce90532228efad515ef5a12509403e --- /dev/null +++ b/src/forecaster/tests/PrepareTestScenario.py @@ -0,0 +1,66 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest, os +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_service_port_grpc) +from context.client.ContextClient import ContextClient +from forecaster.client.ForecasterClient import ForecasterClient +from forecaster.service.ForecasterService import ForecasterService +from monitoring.client.MonitoringClient import MonitoringClient +from .MockService_Dependencies import MockService_Dependencies + +LOCAL_HOST = '127.0.0.1' +MOCKSERVICE_PORT = 10000 +# avoid privileged ports +FORECASTER_SERVICE_PORT = MOCKSERVICE_PORT + int(get_service_port_grpc(ServiceNameEnum.FORECASTER)) +os.environ[get_env_var_name(ServiceNameEnum.FORECASTER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.FORECASTER, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(FORECASTER_SERVICE_PORT) + +@pytest.fixture(scope='session') +def mock_service(): + _service = MockService_Dependencies(MOCKSERVICE_PORT) + _service.configure_env_vars() + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def context_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + _client = ContextClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def monitoring_client(mock_service : MockService_Dependencies): # pylint: disable=redefined-outer-name + _client = MonitoringClient() + yield _client + _client.close() + +@pytest.fixture(scope='session') +def forecaster_service( + context_client : ContextClient, # pylint: disable=redefined-outer-name + monitoring_client : MonitoringClient, # pylint: disable=redefined-outer-name +): + _service = ForecasterService() + _service.start() + yield _service + _service.stop() + +@pytest.fixture(scope='session') +def forecaster_client(forecaster_service : ForecasterService): # pylint: disable=redefined-outer-name + _client = ForecasterClient() + yield _client + _client.close() diff --git a/src/forecaster/tests/test_unitary.py b/src/forecaster/tests/test_unitary.py index 38d04994fb0fa1951fb465bc127eb72659dc2eaf..29ebcb340cdcdc4d63814b55ffc50fdccec0f6dd 100644 --- a/src/forecaster/tests/test_unitary.py +++ b/src/forecaster/tests/test_unitary.py @@ -11,3 +11,78 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import logging +from common.Constants import DEFAULT_CONTEXT_NAME +from common.proto.context_pb2 import ContextId +from common.proto.forecaster_pb2 import ForecastLinkCapacityRequest, ForecastTopologyCapacityRequest +from common.tools.descriptor.Loader import DescriptorLoader, check_descriptor_load_results, validate_empty_scenario +from common.tools.object_factory.Context import json_context_id +from context.client.ContextClient import ContextClient +from forecaster.client.ForecasterClient import ForecasterClient +from monitoring.client.MonitoringClient import MonitoringClient + +from .PrepareTestScenario import ( # pylint: disable=unused-import + # be careful, order of symbols is important here! + mock_service, forecaster_service, context_client, monitoring_client, forecaster_client) + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +ADMIN_CONTEXT_ID = ContextId(**json_context_id(DEFAULT_CONTEXT_NAME)) +DESCRIPTORS_FILE = '' # use dummy descriptor here + +def test_prepare_environment( + context_client : ContextClient, # pylint: disable=redefined-outer-name +) -> None: + validate_empty_scenario(context_client) + + descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTORS_FILE, context_client=context_client) + results = descriptor_loader.process() + check_descriptor_load_results(results, descriptor_loader) + descriptor_loader.validate() + + # Verify the scenario has no services/slices + response = context_client.GetContext(ADMIN_CONTEXT_ID) + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + +def test_forecast_link( + context_client : ContextClient, + forecaster_client : ForecasterClient, +): # pylint: disable=redefined-outer-name + + # TODO: select link + + forecast_request = ForecastLinkCapacityRequest() + forecast_request.forecast_window_seconds = 10 * 24 * 60 * 60 # 10 days in seconds + # TODO: populate request + forecast_reply = forecaster_client.ForecastLinkCapacity(forecast_request) + # TODO: validate reply + +def test_forecast_topology( + context_client : ContextClient, + forecaster_client : ForecasterClient, +): # pylint: disable=redefined-outer-name + + # TODO: get topology id + + forecast_request = ForecastTopologyCapacityRequest() + forecast_request.forecast_window_seconds = 10 * 24 * 60 * 60 # 10 days in seconds + # TODO: populate request + forecast_reply = forecaster_client.ForecastTopologyCapacity(forecast_request) + # TODO: validate reply + +def test_cleanup_environment( + context_client : ContextClient, # pylint: disable=redefined-outer-name +) -> None: + # Verify the scenario has no services/slices + response = context_client.GetContext(ADMIN_CONTEXT_ID) + assert len(response.service_ids) == 0 + assert len(response.slice_ids) == 0 + + # Load descriptors and validate the base scenario + descriptor_loader = DescriptorLoader(descriptors_file=DESCRIPTORS_FILE, context_client=context_client) + descriptor_loader.validate() + descriptor_loader.unload() + validate_empty_scenario(context_client)