Skip to content
Snippets Groups Projects
Commit c6e5c4cc authored by Ricard Vilalta's avatar Ricard Vilalta
Browse files

Working on first implementation of Interdomain service

parent 04d5a299
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!42Interdomain Component
Showing
with 3544 additions and 0 deletions
# Build, tag and push the Docker image to the GitLab registry
build service:
variables:
IMAGE_NAME: 'service' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit test service:
variables:
IMAGE_NAME: 'service' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build service
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
- if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker run --name $IMAGE_NAME -d -p 3030:3030 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- sleep 5
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml; coverage xml -o /opt/results/${IMAGE_NAME}_coverage.xml; coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- src/$IMAGE_NAME/tests/Dockerfile
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report.xml
cobertura: src/$IMAGE_NAME/tests/${IMAGE_NAME}_coverage.xml
# Deployment of the service in Kubernetes Cluster
deploy service:
variables:
IMAGE_NAME: 'service' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: deploy
needs:
- unit test service
# - integ_test execute
script:
- 'sed -i "s/$IMAGE_NAME:.*/$IMAGE_NAME:$IMAGE_TAG/" manifests/${IMAGE_NAME}service.yaml'
- kubectl version
- kubectl get all
- kubectl apply -f "manifests/${IMAGE_NAME}service.yaml"
- kubectl get all
# environment:
# name: test
# url: https://example.com
# kubernetes:
# namespace: test
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
when: manual
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
when: manual
import logging
# General settings
LOG_LEVEL = logging.WARNING
# gRPC settings
GRPC_INTERDOMAIN_PORT = 9090
GRPC_MAX_WORKERS = 10
GRPC_GRACE_PERIOD = 60
# Prometheus settings
METRICS_PORT = 9192
# Dependency micro-service connection settings
SLICE_SERVICE_HOST = '127.0.0.1'
SLICE_SERVICE_PORT = 1010
FROM python:3-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools
# Set working directory
WORKDIR /var/teraflow
# Create module sub-folders
RUN mkdir -p /var/teraflow/interdomain
# Get Python packages per module
COPY interdomain/requirements.in interdomain/requirements.in
RUN pip-compile --output-file=interdomain/requirements.txt interdomain/requirements.in
RUN python3 -m pip install -r interdomain/requirements.in
# Add files into working directory
COPY common/. common
COPY context/. context
COPY device/. device
COPY monitoring/. monitoring
COPY service/. service
COPY interdomain/. interdomain
# Start service interdomain
ENTRYPOINT ["python", "-m", "service.interdomain"]
import grpc, logging
from common.tools.client.RetryDecorator import retry, delay_exponential
from interdomain.proto.context_pb2 import TeraFlowController, AuthenticationResult
from interdomain.proto.slice_pb2 import TransportSlice, SliceId
from interdomain.proto.interdomain_pb2_grpc import InterdomainServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
class InterdomainClient:
def __init__(self, address, port):
self.endpoint = '{:s}:{:s}'.format(str(address), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = InterdomainServiceStub(self.channel)
def close(self):
if self.channel is not None: self.channel.close()
self.channel = None
self.stub = None
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def Authenticate(self, request : TeraFlowController) -> AuthenticationResult:
LOGGER.debug('Authenticate request: {:s}'.format(str(request)))
response = self.stub.Authenticate(request)
LOGGER.debug('Authenticate result: {:s}'.format(str(response)))
return response
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def LookUpSlice(self, request : TransportSlice) -> SliceId:
LOGGER.debug('LookUpSlice request: {:s}'.format(str(request)))
response = self.stub.LookUpSlice(request)
LOGGER.debug('LookUpSlice result: {:s}'.format(str(response)))
return response
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def OrderSliceFromCatalog(self, request : TransportSlice) -> SliceStatus:
LOGGER.debug('OrderSliceFromCatalog request: {:s}'.format(str(request)))
response = self.stub.OrderSliceFromCatalog(request)
LOGGER.debug('OrderSliceFromCatalog result: {:s}'.format(str(response)))
return response
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def CreateSliceAndAddToCatalog(self, request : TransportSlice) -> SliceStatus:
LOGGER.debug('CreateSliceAndAddToCatalog request: {:s}'.format(str(request)))
response = self.stub.CreateSliceAndAddToCatalog(request)
LOGGER.debug('CreateSliceAndAddToCatalog result: {:s}'.format(str(response)))
return response
#!/bin/bash -eu
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash -e
# Make folder containing the script the root folder for its execution
cd $(dirname $0)
rm -rf proto/*.py
rm -rf proto/__pycache__
touch proto/__init__.py
python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto context.proto
python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto service.proto
python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto interdomain.proto
rm proto/context_pb2_grpc.py
rm proto/interdomain_pb2_grpc.py
sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/context_pb2.py
sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/service_pb2.py
sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/service_pb2_grpc.py
sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' proto/interdomain_pb2.py
This diff is collapsed.
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: interdomain.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import context_pb2 as context__pb2
from . import slice_pb2 as slice__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='interdomain.proto',
package='interdomain',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\x11interdomain.proto\x12\x0binterdomain\x1a\rcontext.proto\x1a\x0bslice.proto2\xab\x02\n\x12InterdomainService\x12L\n\x0c\x41uthenticate\x12\x1b.context.TeraFlowController\x1a\x1d.context.AuthenticationResult\"\x00\x12\x36\n\x0bLookUpSlice\x12\x15.slice.TransportSlice\x1a\x0e.slice.SliceId\"\x00\x12\x44\n\x15OrderSliceFromCatalog\x12\x15.slice.TransportSlice\x1a\x12.slice.SliceStatus\"\x00\x12I\n\x1a\x43reateSliceAndAddToCatalog\x12\x15.slice.TransportSlice\x1a\x12.slice.SliceStatus\"\x00\x62\x06proto3'
,
dependencies=[context__pb2.DESCRIPTOR,slice__pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_INTERDOMAINSERVICE = _descriptor.ServiceDescriptor(
name='InterdomainService',
full_name='interdomain.InterdomainService',
file=DESCRIPTOR,
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=63,
serialized_end=362,
methods=[
_descriptor.MethodDescriptor(
name='Authenticate',
full_name='interdomain.InterdomainService.Authenticate',
index=0,
containing_service=None,
input_type=context__pb2._TERAFLOWCONTROLLER,
output_type=context__pb2._AUTHENTICATIONRESULT,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='LookUpSlice',
full_name='interdomain.InterdomainService.LookUpSlice',
index=1,
containing_service=None,
input_type=slice__pb2._TRANSPORTSLICE,
output_type=slice__pb2._SLICEID,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='OrderSliceFromCatalog',
full_name='interdomain.InterdomainService.OrderSliceFromCatalog',
index=2,
containing_service=None,
input_type=slice__pb2._TRANSPORTSLICE,
output_type=slice__pb2._SLICESTATUS,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='CreateSliceAndAddToCatalog',
full_name='interdomain.InterdomainService.CreateSliceAndAddToCatalog',
index=3,
containing_service=None,
input_type=slice__pb2._TRANSPORTSLICE,
output_type=slice__pb2._SLICESTATUS,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_INTERDOMAINSERVICE)
DESCRIPTOR.services_by_name['InterdomainService'] = _INTERDOMAINSERVICE
# @@protoc_insertion_point(module_scope)
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: service.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from . import context_pb2 as context__pb2
DESCRIPTOR = _descriptor.FileDescriptor(
name='service.proto',
package='service',
syntax='proto3',
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\rservice.proto\x12\x07service\x1a\rcontext.proto2\xfd\x01\n\x0eServiceService\x12\x37\n\rCreateService\x12\x10.context.Service\x1a\x12.context.ServiceId\"\x00\x12\x37\n\rUpdateService\x12\x10.context.Service\x1a\x12.context.ServiceId\"\x00\x12\x35\n\rDeleteService\x12\x12.context.ServiceId\x1a\x0e.context.Empty\"\x00\x12\x42\n\x11GetConnectionList\x12\x12.context.ServiceId\x1a\x17.context.ConnectionList\"\x00\x62\x06proto3'
,
dependencies=[context__pb2.DESCRIPTOR,])
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_SERVICESERVICE = _descriptor.ServiceDescriptor(
name='ServiceService',
full_name='service.ServiceService',
file=DESCRIPTOR,
index=0,
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_start=42,
serialized_end=295,
methods=[
_descriptor.MethodDescriptor(
name='CreateService',
full_name='service.ServiceService.CreateService',
index=0,
containing_service=None,
input_type=context__pb2._SERVICE,
output_type=context__pb2._SERVICEID,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='UpdateService',
full_name='service.ServiceService.UpdateService',
index=1,
containing_service=None,
input_type=context__pb2._SERVICE,
output_type=context__pb2._SERVICEID,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='DeleteService',
full_name='service.ServiceService.DeleteService',
index=2,
containing_service=None,
input_type=context__pb2._SERVICEID,
output_type=context__pb2._EMPTY,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
_descriptor.MethodDescriptor(
name='GetConnectionList',
full_name='service.ServiceService.GetConnectionList',
index=3,
containing_service=None,
input_type=context__pb2._SERVICEID,
output_type=context__pb2._CONNECTIONLIST,
serialized_options=None,
create_key=_descriptor._internal_create_key,
),
])
_sym_db.RegisterServiceDescriptor(_SERVICESERVICE)
DESCRIPTOR.services_by_name['ServiceService'] = _SERVICESERVICE
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from . import context_pb2 as context__pb2
class ServiceServiceStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.CreateService = channel.unary_unary(
'/service.ServiceService/CreateService',
request_serializer=context__pb2.Service.SerializeToString,
response_deserializer=context__pb2.ServiceId.FromString,
)
self.UpdateService = channel.unary_unary(
'/service.ServiceService/UpdateService',
request_serializer=context__pb2.Service.SerializeToString,
response_deserializer=context__pb2.ServiceId.FromString,
)
self.DeleteService = channel.unary_unary(
'/service.ServiceService/DeleteService',
request_serializer=context__pb2.ServiceId.SerializeToString,
response_deserializer=context__pb2.Empty.FromString,
)
self.GetConnectionList = channel.unary_unary(
'/service.ServiceService/GetConnectionList',
request_serializer=context__pb2.ServiceId.SerializeToString,
response_deserializer=context__pb2.ConnectionList.FromString,
)
class ServiceServiceServicer(object):
"""Missing associated documentation comment in .proto file."""
def CreateService(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def UpdateService(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def DeleteService(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetConnectionList(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_ServiceServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'CreateService': grpc.unary_unary_rpc_method_handler(
servicer.CreateService,
request_deserializer=context__pb2.Service.FromString,
response_serializer=context__pb2.ServiceId.SerializeToString,
),
'UpdateService': grpc.unary_unary_rpc_method_handler(
servicer.UpdateService,
request_deserializer=context__pb2.Service.FromString,
response_serializer=context__pb2.ServiceId.SerializeToString,
),
'DeleteService': grpc.unary_unary_rpc_method_handler(
servicer.DeleteService,
request_deserializer=context__pb2.ServiceId.FromString,
response_serializer=context__pb2.Empty.SerializeToString,
),
'GetConnectionList': grpc.unary_unary_rpc_method_handler(
servicer.GetConnectionList,
request_deserializer=context__pb2.ServiceId.FromString,
response_serializer=context__pb2.ConnectionList.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'service.ServiceService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class ServiceService(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def CreateService(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/service.ServiceService/CreateService',
context__pb2.Service.SerializeToString,
context__pb2.ServiceId.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def UpdateService(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/service.ServiceService/UpdateService',
context__pb2.Service.SerializeToString,
context__pb2.ServiceId.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def DeleteService(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/service.ServiceService/DeleteService',
context__pb2.ServiceId.SerializeToString,
context__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def GetConnectionList(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/service.ServiceService/GetConnectionList',
context__pb2.ServiceId.SerializeToString,
context__pb2.ConnectionList.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
anytree
apscheduler
fastcache
flask-restful
grpcio-health-checking
grpcio
Jinja2
netconf-client #1.7.3
prometheus-client
pytest
pytest-benchmark
python-json-logger
pytz
redis
requests
xmltodict
p4runtime==1.3.0
coverage
from concurrent import futures
import grpc
from interdomain.service.InterdomainServiceServicerImpl import InterdomainServiceServicerImpl
from interdomain.Config import GRPC_SLICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from interdomain.proto.interdomain_pb2_grpc import add_InterdomainServiceServicer_to_server
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from common.logger import getJSONLogger
LOGGER = getJSONLogger('interdomainservice-server')
LOGGER.setLevel('DEBUG')
BIND_ADDRESS = '0.0.0.0'
class InterdomainService:
def __init__(self, address=BIND_ADDRESS, slice_client=None, port=GRPC_INTERDOMAIN_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):
self.address = address
self.slice_client = slice_client
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.monitoring_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
# create gRPC server
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.max_workers)) # ,interceptors=(tracer_interceptor,))
# add monitoring servicer class to gRPC server
self.interdomain_servicer = InterdomainServiceServicerImpl()
add_InterdomainServiceServicer_to_server(self.interdomain_servicer, self.server)
# add gRPC health checker servicer class to gRPC server
self.health_servicer = health.HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
# start server
endpoint = '{}:{}'.format(self.address, self.port)
LOGGER.info('Listening on {}'.format(endpoint))
self.server.add_insecure_port(endpoint)
self.server.start()
self.health_servicer.set('', health_pb2.HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')
import os,grpc
from prometheus_client import Summary
from prometheus_client import Counter
from monitoring.service import SqliteTools, InfluxTools
from monitoring.proto import monitoring_pb2
from monitoring.proto import monitoring_pb2_grpc
from common.rpc_method_wrapper.ServiceExceptions import ServiceException
from common.logger import getJSONLogger
from context.proto import context_pb2
from device.Config import GRPC_SERVICE_PORT
from device.client.DeviceClient import DeviceClient
from device.proto import device_pb2
LOGGER = getJSONLogger('monitoringservice-server')
LOGGER.setLevel('DEBUG')
MONITORING_GETINSTANTKPI_REQUEST_TIME = Summary('monitoring_getinstantkpi_processing_seconds', 'Time spent processing monitoring instant kpi request')
MONITORING_INCLUDEKPI_COUNTER = Counter('monitoring_includekpi_counter', 'Monitoring include kpi request counter')
INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
class MonitoringServiceServicerImpl(monitoring_pb2_grpc.MonitoringServiceServicer):
def __init__(self):
LOGGER.info('Init monitoringService')
# Init sqlite monitoring db
self.sql_db = SqliteTools.SQLite('monitoring.db')
# Create influx_db client
self.influx_db = InfluxTools.Influx(INFLUXDB_HOSTNAME,"8086",INFLUXDB_USER,INFLUXDB_PASSWORD,INFLUXDB_DATABASE)
# CreateKpi (CreateKpiRequest) returns (KpiId) {}
def CreateKpi(self, request : monitoring_pb2.KpiDescriptor, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiId :
# CREATEKPI_COUNTER_STARTED.inc()
LOGGER.info('CreateKpi')
try:
# Here the code to create a sqlite query to crete a KPI and return a KpiID
kpi_id = monitoring_pb2.KpiId()
kpi_description = request.kpi_description
kpi_sample_type = request.kpi_sample_type
kpi_device_id = request.device_id.device_uuid.uuid
kpi_endpoint_id = request.endpoint_id.endpoint_uuid.uuid
kpi_service_id = request.service_id.service_uuid.uuid
data = self.sql_db.insert_KPI(kpi_description, kpi_sample_type, kpi_device_id, kpi_endpoint_id, kpi_service_id)
kpi_id.kpi_id.uuid = str(data)
# CREATEKPI_COUNTER_COMPLETED.inc()
return kpi_id
except ServiceException as e:
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('CreateKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(grpc.StatusCode.INTERNAL, str(e))
# rpc MonitorKpi (MonitorKpiRequest) returns (context.Empty) {}
def MonitorKpi ( self, request : monitoring_pb2.MonitorKpiRequest, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('MonitorKpi')
try:
# Creates the request to send to the device service
monitor_device_request = device_pb2.MonitoringSettings()
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
monitor_device_request.kpi_descriptor.CopyFrom(kpiDescriptor)
monitor_device_request.kpi_id.kpi_id.uuid = request.kpi_id.kpi_id.uuid
monitor_device_request.sampling_duration_s = request.sampling_duration_s
monitor_device_request.sampling_interval_s = request.sampling_interval_s
deviceClient = DeviceClient(address="localhost", port=GRPC_SERVICE_PORT ) # instantiate the client
# deviceClient.MonitorDeviceKpi(monitor_device_request)
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('MonitorKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
# rpc IncludeKpi(IncludeKpiRequest) returns(context.Empty) {}
def IncludeKpi(self, request : monitoring_pb2.Kpi, grpc_context : grpc.ServicerContext) -> context_pb2.Empty:
LOGGER.info('IncludeKpi')
try:
kpiDescriptor = self.GetKpiDescriptor(request.kpi_id, grpc_context)
kpiSampleType = kpiDescriptor.kpi_sample_type
kpiId = request.kpi_id.kpi_id.uuid
deviceId = kpiDescriptor.device_id.device_uuid.uuid
endpointId = kpiDescriptor.endpoint_id.endpoint_uuid.uuid
serviceId = kpiDescriptor.service_id.service_uuid.uuid
time_stamp = request.timestamp
kpi_value = request.kpi_value.intVal
# Build the structure to be included as point in the influxDB
self.influx_db.write_KPI(time_stamp,kpiId,kpiSampleType,deviceId,endpointId,serviceId,kpi_value)
self.influx_db.read_KPI_points()
return context_pb2.Empty()
except ServiceException as e:
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('IncludeKpi exception')
# CREATEKPI_COUNTER_FAILED.inc()
def GetStreamKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns stream monitoring.Kpi
LOGGER.info('GetStreamKpi')
yield monitoring_pb2.Kpi()
@MONITORING_GETINSTANTKPI_REQUEST_TIME.time()
def GetInstantKpi ( self, request, grpc_context : grpc.ServicerContext):
# receives monitoring.KpiId returns monitoring.Kpi
LOGGER.info('GetInstantKpi')
return monitoring_pb2.Kpi()
def GetKpiDescriptor(self, request : monitoring_pb2.KpiId, grpc_context : grpc.ServicerContext) -> monitoring_pb2.KpiDescriptor:
LOGGER.info('getting Kpi by KpiID')
try:
kpi_db = self.sql_db.get_KPI(int(request.kpi_id.uuid))
print(self.sql_db.get_KPIS())
kpiDescriptor = monitoring_pb2.KpiDescriptor()
kpiDescriptor.kpi_description = kpi_db[1]
kpiDescriptor.kpi_sample_type = kpi_db[2]
kpiDescriptor.device_id.device_uuid.uuid = str(kpi_db[3])
kpiDescriptor.endpoint_id.endpoint_uuid.uuid = str(kpi_db[4])
kpiDescriptor.service_id.service_uuid.uuid = str(kpi_db[5])
return kpiDescriptor
except ServiceException as e:
LOGGER.exception('GetKpiDescriptor exception')
grpc_context.abort(e.code, e.details)
except Exception as e: # pragma: no cover
LOGGER.exception('GetKpiDescriptor exception')
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_setting
from slice.client.SliceClient import SliceClient
from interdomain.Config import (
SLICE_SERVICE_HOST, SLICE_SERVICE_PORT, GRPC_INTERDOMAIN_PORT,
GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT)
from .InterdomainService import InterdomainService
terminate = threading.Event()
LOGGER = None
def signal_handler(signal, frame): # pylint: disable=redefined-outer-name
LOGGER.warning('Terminate signal received')
terminate.set()
def main():
global LOGGER # pylint: disable=global-statement
grpc_service_port = get_setting('INTERDOMAINSERVICE_SERVICE_PORT_GRPC', default=GRPC_INTERDOMAIN_PORT )
max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS )
grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD )
log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL )
metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT )
slice_service_host = get_setting('SLICESERVICE_SERVICE_HOST', default=SLICE_SERVICE_HOST)
slice_service_port = get_setting('SLICESERVICE_SERVICE_PORT_GRPC', default=SLICE_SERVICE_PORT)
logging.basicConfig(level=log_level)
LOGGER = logging.getLogger(__name__)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
LOGGER.info('Starting...')
# Start metrics server
start_http_server(metrics_port)
# Initialize Slice Client
if slice_service_host is None or slice_service_port is None:
raise Exception('Wrong address({:s}):port({:s}) of Device component'.format(
str(slice_service_host), str(slice_service_port)))
slice_client = SliceClient(slice_service_host, slice_service_port)
# Starting service service
grpc_interdomain = InterdomainService( slice_client=slice_client, port=grpc_interdomain_port, max_workers=max_workers, grace_period=grace_period)
grpc_interdomain.start()
# Wait for Ctrl+C or termination signal
while not terminate.wait(timeout=0.1): pass
LOGGER.info('Terminating...')
grpc_interdomain.stop()
LOGGER.info('Bye')
return 0
if __name__ == '__main__':
sys.exit(main())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment