Skip to content
Snippets Groups Projects
Commit 38659ca6 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Slice component:

- Migrated to use new generic gRPC servicer
- Migrated to use new settings framework
parent 810d16cf
No related branches found
No related tags found
1 merge request!54Release 2.0.0
import logging # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# General settings
LOG_LEVEL = logging.WARNING
# gRPC settings
GRPC_SERVICE_PORT = 4040
GRPC_MAX_WORKERS = 10
GRPC_GRACE_PERIOD = 60
# Prometheus settings
METRICS_PORT = 9192
# Dependency micro-service connection settings
CONTEXT_SERVICE_HOST = '127.0.0.1'
CONTEXT_SERVICE_PORT = 1010
SERVICE_SERVICE_HOST = '127.0.0.1'
SERVICE_SERVICE_PORT = 3030
INTERDOMAIN_SERVICE_HOST = '127.0.0.1'
INTERDOMAIN_SERVICE_PORT = 10010
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
# limitations under the License. # limitations under the License.
import grpc, logging import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from slice.proto.context_pb2 import Empty, Slice, SliceId from slice.proto.context_pb2 import Empty, Slice, SliceId
...@@ -24,8 +26,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) ...@@ -24,8 +26,10 @@ DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
class SliceClient: class SliceClient:
def __init__(self, address, port): def __init__(self, host=None, port=None):
self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) if not host: host = get_service_host(ServiceNameEnum.SLICE)
if not port: port = get_service_port_grpc(ServiceNameEnum.SLICE)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None self.channel = None
self.stub = None self.stub = None
......
...@@ -12,65 +12,17 @@ ...@@ -12,65 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging from common.Constants import ServiceNameEnum
from concurrent import futures from common.Settings import get_service_port_grpc
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from common.tools.service.GenericGrpcService import GenericGrpcService
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from context.client.ContextClient import ContextClient
from interdomain.client.InterdomainClient import InterdomainClient
from service.client.ServiceClient import ServiceClient
from slice.proto.slice_pb2_grpc import add_SliceServiceServicer_to_server from slice.proto.slice_pb2_grpc import add_SliceServiceServicer_to_server
from slice.service.SliceServiceServicerImpl import SliceServiceServicerImpl from slice.service.SliceServiceServicerImpl import SliceServiceServicerImpl
from slice.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
BIND_ADDRESS = '0.0.0.0' class SliceService(GenericGrpcService):
LOGGER = logging.getLogger(__name__) def __init__(self, cls_name: str = __name__) -> None:
port = get_service_port_grpc(ServiceNameEnum.SLICE)
super().__init__(port, cls_name=cls_name)
self.slice_servicer = SliceServiceServicerImpl()
class SliceService: def install_servicers(self):
def __init__(
self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient,
address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD):
self.context_client = context_client
self.interdomain_client = interdomain_client
self.service_client = service_client
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.slice_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.slice_servicer = SliceServiceServicerImpl(
self.context_client, self.interdomain_client, self.service_client)
add_SliceServiceServicer_to_server(self.slice_servicer, self.server) add_SliceServiceServicer_to_server(self.slice_servicer, self.server)
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
...@@ -28,17 +28,14 @@ METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice'] ...@@ -28,17 +28,14 @@ METHOD_NAMES = ['CreateSlice', 'UpdateSlice', 'DeleteSlice']
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class SliceServiceServicerImpl(SliceServiceServicer): class SliceServiceServicerImpl(SliceServiceServicer):
def __init__( def __init__(self):
self, context_client : ContextClient, interdomain_client : InterdomainClient, service_client : ServiceClient
):
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self.context_client = context_client
self.interdomain_client = interdomain_client
self.service_client = service_client
LOGGER.debug('Servicer Created') LOGGER.debug('Servicer Created')
def create_update(self, request : Slice) -> SliceId: def create_update(self, request : Slice) -> SliceId:
slice_id = self.context_client.SetSlice(request) context_client = ContextClient()
slice_id = context_client.SetSlice(request)
if len(request.slice_endpoint_ids) != 2: return slice_id if len(request.slice_endpoint_ids) != 2: return slice_id
domains = set() domains = set()
...@@ -48,7 +45,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): ...@@ -48,7 +45,8 @@ class SliceServiceServicerImpl(SliceServiceServicer):
is_multi_domain = len(domains) == 2 is_multi_domain = len(domains) == 2
if is_multi_domain: if is_multi_domain:
slice_id = self.interdomain_client.RequestSlice(request) interdomain_client = InterdomainClient()
slice_id = interdomain_client.RequestSlice(request)
else: else:
# pylint: disable=no-member # pylint: disable=no-member
service_request = Service() service_request = Service()
...@@ -57,7 +55,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): ...@@ -57,7 +55,8 @@ class SliceServiceServicerImpl(SliceServiceServicer):
service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM service_request.service_type = ServiceTypeEnum.SERVICETYPE_L3NM
service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED service_request.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED
service_reply = self.service_client.CreateService(service_request) service_client = ServiceClient()
service_reply = service_client.CreateService(service_request)
if service_reply != service_request.service_id: # pylint: disable=no-member if service_reply != service_request.service_id: # pylint: disable=no-member
raise Exception('Service creation failed. Wrong Service Id was returned') raise Exception('Service creation failed. Wrong Service Id was returned')
...@@ -84,7 +83,7 @@ class SliceServiceServicerImpl(SliceServiceServicer): ...@@ -84,7 +83,7 @@ class SliceServiceServicerImpl(SliceServiceServicer):
'address_ip': '0.0.0.0', 'address_prefix': 0}, 'address_ip': '0.0.0.0', 'address_prefix': 0},
sort_keys=True) sort_keys=True)
service_reply = self.service_client.UpdateService(service_request) service_reply = service_client.UpdateService(service_request)
if service_reply != service_request.service_id: # pylint: disable=no-member if service_reply != service_request.service_id: # pylint: disable=no-member
raise Exception('Service update failed. Wrong Service Id was returned') raise Exception('Service update failed. Wrong Service Id was returned')
...@@ -92,29 +91,29 @@ class SliceServiceServicerImpl(SliceServiceServicer): ...@@ -92,29 +91,29 @@ class SliceServiceServicerImpl(SliceServiceServicer):
reply.CopyFrom(request) reply.CopyFrom(request)
slice_service_id = reply.slice_service_ids.add() slice_service_id = reply.slice_service_ids.add()
slice_service_id.CopyFrom(service_reply) slice_service_id.CopyFrom(service_reply)
self.context_client.SetSlice(reply) context_client.SetSlice(reply)
slice_id = reply.slice_id slice_id = reply.slice_id
slice_ = self.context_client.GetSlice(slice_id) slice_ = context_client.GetSlice(slice_id)
slice_active = Slice() slice_active = Slice()
slice_active.CopyFrom(slice_) slice_active.CopyFrom(slice_)
slice_active.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE slice_active.slice_status.slice_status = SliceStatusEnum.SLICESTATUS_ACTIVE
self.context_client.SetSlice(slice_active) context_client.SetSlice(slice_active)
return slice_id return slice_id
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: def CreateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
#try: #try:
# slice_ = self.context_client.GetSlice(request.slice_id) # slice_ = context_client.GetSlice(request.slice_id)
# slice_id = slice_.slice_id # slice_id = slice_.slice_id
#except grpc.RpcError: #except grpc.RpcError:
# slice_id = self.context_client.SetSlice(request) # slice_id = context_client.SetSlice(request)
#return slice_id #return slice_id
return self.create_update(request) return self.create_update(request)
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def UpdateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId: def UpdateSlice(self, request : Slice, context : grpc.ServicerContext) -> SliceId:
#slice_id = self.context_client.SetSlice(request) #slice_id = context_client.SetSlice(request)
#if len(request.slice_endpoint_ids) != 2: return slice_id #if len(request.slice_endpoint_ids) != 2: return slice_id
# #
#domains = set() #domains = set()
...@@ -124,7 +123,8 @@ class SliceServiceServicerImpl(SliceServiceServicer): ...@@ -124,7 +123,8 @@ class SliceServiceServicerImpl(SliceServiceServicer):
# #
#is_multi_domain = len(domains) == 2 #is_multi_domain = len(domains) == 2
#if is_multi_domain: #if is_multi_domain:
# return self.interdomain_client.LookUpSlice(request) # interdomain_client = InterdomainClient()
# return interdomain_client.LookUpSlice(request)
#else: #else:
# raise NotImplementedError('Slice should create local services for single domain slice') # raise NotImplementedError('Slice should create local services for single domain slice')
return self.create_update(request) return self.create_update(request)
......
...@@ -14,14 +14,8 @@ ...@@ -14,14 +14,8 @@
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Settings import get_setting, wait_for_environment_variables from common.Constants import ServiceNameEnum
from context.client.ContextClient import ContextClient from common.Settings import ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, wait_for_environment_variables
from interdomain.client.InterdomainClient import InterdomainClient
from service.client.ServiceClient import ServiceClient
from slice.Config import (
CONTEXT_SERVICE_HOST, CONTEXT_SERVICE_PORT, GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD,
INTERDOMAIN_SERVICE_HOST, INTERDOMAIN_SERVICE_PORT, LOG_LEVEL, METRICS_PORT, SERVICE_SERVICE_HOST,
SERVICE_SERVICE_PORT)
from .SliceService import SliceService from .SliceService import SliceService
terminate = threading.Event() terminate = threading.Event()
...@@ -34,58 +28,28 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name ...@@ -34,58 +28,28 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
def main(): def main():
global LOGGER # pylint: disable=global-statement global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT ) log_level = get_log_level()
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
logging.basicConfig(level=log_level) logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
wait_for_environment_variables([ wait_for_environment_variables([
'CONTEXTSERVICE_SERVICE_HOST', 'CONTEXTSERVICE_SERVICE_PORT_GRPC', get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
'INTERDOMAINSERVICE_SERVICE_HOST', 'INTERDOMAINSERVICE_SERVICE_PORT_GRPC', get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
'SERVICESERVICE_SERVICE_HOST', 'SERVICESERVICE_SERVICE_PORT_GRPC', get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.SERVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
]) ])
context_service_host = get_setting('CONTEXTSERVICE_SERVICE_HOST', default=CONTEXT_SERVICE_HOST )
context_service_port = get_setting('CONTEXTSERVICE_SERVICE_PORT_GRPC', default=CONTEXT_SERVICE_PORT )
interdomain_service_host = get_setting('INTERDOMAINSERVICE_SERVICE_HOST', default=INTERDOMAIN_SERVICE_HOST)
interdomain_service_port = get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=INTERDOMAIN_SERVICE_PORT)
service_service_host = get_setting('SERVICESERVICE_SERVICE_HOST', default=SERVICE_SERVICE_HOST )
service_service_port = get_setting('SERVICESERVICE_SERVICE_PORT_GRPC', default=SERVICE_SERVICE_PORT )
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...') LOGGER.info('Starting...')
# Start metrics server # Start metrics server
metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Initialize Context Client
if context_service_host is None or context_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Context component'.format(
str(context_service_host), str(context_service_port)))
context_client = ContextClient(context_service_host, context_service_port)
# Initialize Interdomain Client
if interdomain_service_host is None or interdomain_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Interdomain component'.format(
str(interdomain_service_host), str(interdomain_service_port)))
interdomain_client = InterdomainClient(interdomain_service_host, interdomain_service_port)
# Initialize Service Client
if service_service_host is None or service_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Service component'.format(
str(service_service_host), str(service_service_port)))
service_client = ServiceClient(service_service_host, service_service_port)
# Starting slice service # Starting slice service
grpc_service = SliceService( grpc_service = SliceService()
context_client, interdomain_client, service_client, port=grpc_service_port, max_workers=max_workers,
grace_period=grace_period)
grpc_service.start() grpc_service.start()
# Wait for Ctrl+C or termination signal # Wait for Ctrl+C or termination signal
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment