diff --git a/src/slice/Config.py b/src/slice/Config.py index e6d770d000cc249d73cccf17dd17f21ccb001f7d..70a33251242c51f49140e596b8208a19dd5245f7 100644 --- a/src/slice/Config.py +++ b/src/slice/Config.py @@ -1,22 +1,14 @@ -import logging +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -# General settings -LOG_LEVEL = logging.WARNING - -# gRPC settings -GRPC_SERVICE_PORT = 4040 -GRPC_MAX_WORKERS = 10 -GRPC_GRACE_PERIOD = 60 - -# Prometheus settings -METRICS_PORT = 9192 - -# Dependency micro-service connection settings -CONTEXT_SERVICE_HOST = '127.0.0.1' -CONTEXT_SERVICE_PORT = 1010 - -SERVICE_SERVICE_HOST = '127.0.0.1' -SERVICE_SERVICE_PORT = 3030 - -INTERDOMAIN_SERVICE_HOST = '127.0.0.1' -INTERDOMAIN_SERVICE_PORT = 10010 diff --git a/src/slice/client/SliceClient.py b/src/slice/client/SliceClient.py index 5566108f83a68cbac695a117b17b7a3fd1bd3de1..d1783e882faf2ea0e89fd3d1e034d8cba02dc24b 100644 --- a/src/slice/client/SliceClient.py +++ b/src/slice/client/SliceClient.py @@ -13,6 +13,8 @@ # limitations under the License. import grpc, logging +from common.Constants import ServiceNameEnum +from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.grpc.Tools import grpc_message_to_json_string from slice.proto.context_pb2 import Empty, Slice, SliceId @@ -24,8 +26,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') class SliceClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) + def __init__(self, host=None, port=None): + if not host: host = get_service_host(ServiceNameEnum.SLICE) + if not port: port = get_service_port_grpc(ServiceNameEnum.SLICE) + self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) self.channel = None self.stub = None diff --git a/src/slice/service/SliceService.py b/src/slice/service/SliceService.py index a7ad2694698333d0450aef9e670dd2a4fe9b30e5..7121ae4676d63977953bb3f66cb0754c8d89de88 100644 --- a/src/slice/service/SliceService.py +++ b/src/slice/service/SliceService.py @@ -12,65 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging -from concurrent import futures -from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH -from grpc_health.v1.health_pb2 import HealthCheckResponse -from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server -from context.client.ContextClient import ContextClient -from interdomain.client.InterdomainClient import InterdomainClient -from service.client.ServiceClient import ServiceClient +from common.Constants import ServiceNameEnum +from common.Settings import get_service_port_grpc +from common.tools.service.GenericGrpcService import GenericGrpcService from slice.proto.slice_pb2_grpc import add_SliceServiceServicer_to_server from slice.service.SliceServiceServicerImpl import SliceServiceServicerImpl -from slice.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD -BIND_ADDRESS = '0.0.0.0' -LOGGER = logging.getLogger(__name__) +class SliceService(GenericGrpcService): + def __init__(self, cls_name: str = __name__) -> None: + port = get_service_port_grpc(ServiceNameEnum.SLICE) + super().__init__(port, cls_name=cls_name) + self.slice_servicer = SliceServiceServicerImpl() -class SliceService: - def __init__( - self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient, - address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD): - - self.context_client = context_client - self.interdomain_client = interdomain_client - self.service_client = service_client - self.address = address - self.port = port - self.endpoint = None - self.max_workers = max_workers - self.grace_period = grace_period - self.slice_servicer = None - self.health_servicer = None - self.pool = None - self.server = None - - def start(self): - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) - - self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) - - self.slice_servicer = SliceServiceServicerImpl( - self.context_client, self.interdomain_client, self.service_client) + def install_servicers(self): add_SliceServiceServicer_to_server(self.slice_servicer, self.server) - - self.health_servicer = HealthServicer( - experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) - add_HealthServicer_to_server(self.health_servicer, self.server) - - port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) - LOGGER.info('Listening on {:s}...'.format(str(self.endpoint))) - self.server.start() - self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member - - LOGGER.debug('Service started') - - def stop(self): - LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) - self.health_servicer.enter_graceful_shutdown() - self.server.stop(self.grace_period) - LOGGER.debug('Service stopped') diff --git a/src/slice/service/SliceServiceServicerImpl.py b/src/slice/service/SliceServiceServicerImpl.py index bd26d19435092bd8ebea5748d49488975d4d675b..eae45240066cb4f88fd095ac3966daa5a270d5f4 100644 --- a/src/slice/service/SliceServiceServicerImpl.py +++ b/src/slice/service/SliceServiceServicerImpl.py @@ -28,17 +28,14 @@ METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice'] METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) class SliceServiceServicerImpl(SliceServiceServicer): - def __init__( - self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient - ): + def __init__(self): LOGGER.debug('Creating Servicer...') - self.context_client = context_client - self.interdomain_client = interdomain_client - self.service_client = service_client LOGGER.debug('Servicer Created') def create_update(self, request : Slice) -> SliceId: - slice_id = self.context_client.SetSlice(request) + context_client = ContextClient() + + slice_id = context_client.SetSlice(request) if len(request.slice_endpoint_ids) != 2: return slice_id domains = set() @@ -48,7 +45,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): is_multi_domain = len(domains) == 2 if is_multi_domain: - slice_id = self.interdomain_client.RequestSlice(request) + interdomain_client = InterdomainClient() + slice_id = interdomain_client.RequestSlice(request) else: # pylint: disable=no-member service_request = Service() @@ -57,7 +55,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED - service_reply = self.service_client.CreateService(service_request) + service_client = ServiceClient() + service_reply = service_client.CreateService(service_request) if service_reply != service_request.service_id: # pylint: disable=no-member raise Exception('Service creation failed. Wrong Service Id was returned') @@ -84,7 +83,7 @@ class SliceServiceServicerImpl(SliceServiceServicer): 'address_ip': '0.0.0.0', 'address_prefix': 0}, sort_keys=True) - service_reply = self.service_client.UpdateService(service_request) + service_reply = service_client.UpdateService(service_request) if service_reply != service_request.service_id: # pylint: disable=no-member raise Exception('Service update failed. Wrong Service Id was returned') @@ -92,29 +91,29 @@ class SliceServiceServicerImpl(SliceServiceServicer): reply.CopyFrom(request) slice_service_id = reply.slice_service_ids.add() slice_service_id.CopyFrom(service_reply) - self.context_client.SetSlice(reply) + context_client.SetSlice(reply) slice_id = reply.slice_id - slice_ = self.context_client.GetSlice(slice_id) + slice_ = context_client.GetSlice(slice_id) slice_active = Slice() slice_active.CopyFrom(slice_) slice_active.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE - self.context_client.SetSlice(slice_active) + context_client.SetSlice(slice_active) return slice_id @safe_and_metered_rpc_method(METRICS, LOGGER) def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: #try: - # slice_ = self.context_client.GetSlice(request.slice_id) + # slice_ = context_client.GetSlice(request.slice_id) # slice_id = slice_.slice_id #except grpc.RpcError: - # slice_id = self.context_client.SetSlice(request) + # slice_id = context_client.SetSlice(request) #return slice_id return self.create_update(request) @safe_and_metered_rpc_method(METRICS, LOGGER) def UpdateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: - #slice_id = self.context_client.SetSlice(request) + #slice_id = context_client.SetSlice(request) #if len(request.slice_endpoint_ids) != 2: return slice_id # #domains = set() @@ -124,7 +123,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): # #is_multi_domain = len(domains) == 2 #if is_multi_domain: - # return self.interdomain_client.LookUpSlice(request) + # interdomain_client = InterdomainClient() + # return interdomain_client.LookUpSlice(request) #else: # raise NotImplementedError('Slice should create local services for single domain slice') return self.create_update(request) diff --git a/src/slice/service/__main__.py b/src/slice/service/__main__.py index 76bb5fa34eac45c828413d4671e023958a886d1b..f77d86bffe9b722f414be4f85adcaf0ef2cc4a8e 100644 --- a/src/slice/service/__main__.py +++ b/src/slice/service/__main__.py @@ -14,14 +14,8 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from common.Settings import get_setting, wait_for_environment_variables -from context.client.ContextClient import ContextClient -from interdomain.client.InterdomainClient import InterdomainClient -from service.client.ServiceClient import ServiceClient -from slice.Config import ( - CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, - INTERDOMAIN_SERVICE_HOST, INTERDOMAIN_SERVICE_PORT, LOG_LEVEL, METRICS_PORT, SERVICE_SERVICE_HOST, - SERVICE_SERVICE_PORT) +from common.Constants import ServiceNameEnum +from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables from .SliceService import SliceService terminate = threading.Event() @@ -34,58 +28,28 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name def main(): global LOGGER # pylint: disable=global-statement - grpc_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD ) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) - + log_level = get_log_level() logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) wait_for_environment_variables([ - 'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', - 'INTERDOMAINSERVICE_SERVICE_HOST', 'INTERDOMAINSERVICE_SERVICE_PORT_GRPC', - 'SERVICESERVICE_SERVICE_HOST', 'SERVICESERVICE_SERVICE_PORT_GRPC', + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), ]) - context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST ) - context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT ) - interdomain_service_host = get_setting('INTERDOMAINSERVICE_SERVICE_HOST', default=INTERDOMAIN_SERVICE_HOST) - interdomain_service_port = get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=INTERDOMAIN_SERVICE_PORT) - service_service_host = get_setting('SERVICESERVICE_SERVICE_HOST', default=SERVICE_SERVICE_HOST ) - service_service_port = get_setting('SERVICESERVICE_SERVICE_PORT_GRPC', default=SERVICE_SERVICE_PORT ) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) LOGGER.info('Starting...') # Start metrics server + metrics_port = get_metrics_port() start_http_server(metrics_port) - # Initialize Context Client - if context_service_host is None or context_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Context component'.format( - str(context_service_host), str(context_service_port))) - context_client = ContextClient(context_service_host, context_service_port) - - # Initialize Interdomain Client - if interdomain_service_host is None or interdomain_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Interdomain component'.format( - str(interdomain_service_host), str(interdomain_service_port))) - interdomain_client = InterdomainClient(interdomain_service_host, interdomain_service_port) - - # Initialize Service Client - if service_service_host is None or service_service_port is None: - raise Exception('Wrong address({:s}):port({:s}) of Service component'.format( - str(service_service_host), str(service_service_port))) - service_client = ServiceClient(service_service_host, service_service_port) - # Starting slice service - grpc_service = SliceService( - context_client, interdomain_client, service_client, port=grpc_service_port, max_workers=max_workers, - grace_period=grace_period) + grpc_service = SliceService() grpc_service.start() # Wait for Ctrl+C or termination signal