Skip to content
Snippets Groups Projects
Commit a5a3da5d authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Merge branch 'feat/context-service' into 'master'

Integrate initial release of Context service

See merge request teraflow-h2020/controller!3
parents 29231f0d d014b96b
No related branches found
No related tags found
No related merge requests found
Showing
with 1169 additions and 1 deletion
......@@ -49,6 +49,7 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
.benchmarks/
cover/
# Translations
......@@ -85,7 +86,7 @@ ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
......@@ -117,6 +118,9 @@ venv.bak/
.spyderproject
.spyproject
# VSCode project settings
.vscode/
# Rope project settings
.ropeproject
......
apiVersion: apps/v1
kind: Deployment
metadata:
name: contextservice
spec:
selector:
matchLabels:
app: contextservice
template:
metadata:
labels:
app: contextservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: context_service:develop
imagePullPolicy: Never
ports:
- containerPort: 7070
env:
- name: DB_ENGINE
value: "redis"
- name: REDISDB_DATABASE_ID
value: "0"
- name: LOG_LEVEL
value: "DEBUG"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"]
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 700m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: contextservice
spec:
type: ClusterIP
selector:
app: contextservice
ports:
- name: grpc
port: 7070
targetPort: 7070
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redisdb
spec:
selector:
matchLabels:
app: redisdb
replicas: 1
template:
metadata:
labels:
app: redisdb
version: v1
spec:
containers:
- name: redisdb
image: redis:6.2
ports:
- containerPort: 6379
---
apiVersion: v1
kind: Service
metadata:
name: redisdb
labels:
app: redisdb
spec:
type: ClusterIP
selector:
app: redisdb
ports:
- name: redisdb
protocol: TCP
port: 6379
targetPort: 6379
---
#!/usr/bin/env bash
# Make folder containing the script the root folder for its execution
cd $(dirname $0)
echo "BUILD context"
context/genproto.sh
docker build -t "context_service:develop" -f context/Dockerfile_develop --quiet .
docker build -t "context_service:test" -f context/Dockerfile_test --quiet .
cd monitoring
./genproto.sh
cd ..
......@@ -7,3 +15,5 @@ cd ..
echo "BUILD monitoring"
docker build -t "monitoring:dockerfile" -f monitoring/Dockerfile .
echo "Prune unused images"
docker image prune --force
import logging, json
LOGGER = logging.getLogger(__name__)
FILEPATH = 'data/topo_nsfnet.json'
class InMemoryDatabase:
def __init__(self, filepath=FILEPATH, **parameters):
with open(filepath, 'r') as f:
self.json_topology = json.loads(f.read())
def get_topology(self):
return(self.json_topology)
import logging, json, os, redis
#from .context_api.Context import Context
LOGGER = logging.getLogger(__name__)
# 60.0 seconds (aprox) in incremental steps from 0.002 till 29.99 seconds
RETRY_DELAY_INITIAL = 0.002
RETRY_DELAY_INCREMENT = 1.831
MAX_RETRIES = 15
URL_TEMPLATE = 'redis://{host}:{port}/{dbid}'
FILEPATH = 'data/topo_nsfnet.json'
class RedisDatabase:
def __init__(self, **parameters):
host = os.environ.get('REDISDB_SERVICE_HOST')
if(host is None): raise Exception('EnvironmentVariable(REDISDB_SERVICE_HOST) not found')
port = os.environ.get('REDISDB_SERVICE_PORT')
if(port is None): raise Exception('EnvironmentVariable(REDISDB_SERVICE_PORT) not found')
dbid = os.environ.get('REDISDB_DATABASE_ID')
if(dbid is None): raise Exception('EnvironmentVariable(REDISDB_DATABASE_ID) not found')
self.redis_url = URL_TEMPLATE.format(host=host, port=port, dbid=dbid)
self.handler = None
self.connect() # initialize self.handler and connect to server
def connect(self):
self.handler = redis.Redis.from_url(self.redis_url)
def close(self):
if(self.handler is not None): del self.handler
self.handler = None
def get_topology(self):
str_topology = self.handler.get('topology')
if(str_topology is None):
with open(FILEPATH, 'r') as f:
json_topology = json.loads(f.read())
str_topology = json.dumps(json_topology)
self.handler.setnx('topology', str_topology)
json_topology['source'] = 'redis missing, loaded from file'
else:
json_topology = json.loads(str_topology)
json_topology['source'] = 'redis found!'
return(json_topology)
#def __getattr__(self, method_name):
# # Expose all methods in the database engine as owned
# def method(*args, **kwargs):
# num_try, delay = 1, RETRY_DELAY_INITIAL
# while num_try <= MAX_RETRIES:
# try:
# handler_method = getattr(self.handler, method_name, None)
# if handler_method is None: raise Exception('Redis does not support method({})'.format(method_name))
# return handler_method(*args, **kwargs)
# except ConnectionError: # as e:
# #if('Channel is closed' not in str(e)): raise
# self.connect() # Try to reconnect
# time.sleep(delay)
# num_try, delay = num_try + 1, delay * RETRY_DELAY_INCREMENT
# raise Exception('Unable to reconnect to Redis after {}'.format(MAX_RETRIES))
# return(method)
# This decorator re-executes the decorated function when it raises an exception. It enables to control the exception
# classes that should trigger the re-execution, the maximum number of retries, the delay between retries, and set the
# execution of a preparation method before every retry. The delay is specfied by means of user-customizable functions.
#
# Delay functions should return a compute function with a single parameter, the number of retry. For instance:
# delay_linear(initial=0, increment=0):
# adds a constant delay of 0 seconds between retries
# delay_linear(initial=1, increment=0):
# adds a constant delay of 1 second between retries
# delay_linear(initial=1, increment=0.5, maximum=10):
# adds an increasing delay between retries, starting with 1 second, and incresing it linearly by steps of 0.5
# seconds, up to 10 seconds, every time an exception is caught within the current execution.
# E.g. 1.0, 1.5, 2.0, 2.5, ..., 10.0, 10.0, 10.0, ...
# delay_exponential(initial=1, increment=1): adds a constant delay of 1 second between retries
# delay_exponential(initial=1, increment=2, maximum=10):
# adds an increasing delay between retries, starting with 1 second, and incresing it exponentially by steps of 2
# seconds, up to 10 seconds, every time an exception is caught within the current execution.
# E.g. 1.0, 2.0, 4.0, 8.0, 10.0, 10.0, 10.0, ...
# Arguments:
# - exceptions: defines the set of exception classes to be catched for reconnection. Others are re-raised.
# By default all exceptions are re-raised.
# - max_retries: defines the maximum number of retries acceptable before giving up. By default, 0 retries are executed.
# - delay_function: defines the delay computation method to be used. By default, delay_linear with a fixed delay of 0.1
# seconds is used.
# - prepare_method_name: if not None, defines the name of the preparation method within the same class to be executed
# when an exception in exceptions is caught, and before running the next retry. By default, is None, meaning that no
# method is executed.
# - prepare_method_args: defines the list of positional arguments to be provided to the preparation method. If no
# preparation method is specified, the argument is silently ignored. By default, an empty list is defined.
# - prepare_method_kwargs: defines the dictionary of keyword arguments to be provided to the preparation method. If no
# preparation method is specified, the argument is silently ignored. By default, an empty dictionary is defined.
import time
def delay_linear(initial=0, increment=0, maximum=None):
def compute(num_try):
delay = initial + (num_try - 1) * increment
if maximum is not None: delay = max(delay, maximum)
return delay
return compute
def delay_exponential(initial=1, increment=1, maximum=None):
def compute(num_try):
delay = initial * (num_try - 1) ^ increment
if maximum is not None: delay = max(delay, maximum)
return delay
return compute
def retry(exceptions=set(), max_retries=0, delay_function=delay_linear(initial=0, increment=0),
prepare_method_name=None, prepare_method_args=[], prepare_method_kwargs={}):
def _reconnect(func):
def wrapper(self, *args, **kwargs):
if prepare_method_name is not None:
prepare_method = getattr(self, prepare_method_name, None)
if prepare_method is None: raise Exception('Prepare Method ({}) not found'.format(prepare_method_name))
num_try, given_up = 0, False
while not given_up:
try:
return func(self, *args, **kwargs)
except Exception as e:
if not isinstance(e, tuple(exceptions)): raise
num_try += 1
given_up = num_try > max_retries
if given_up: raise Exception('Giving up... {} tries failed'.format(max_retries))
if delay_function is not None: time.sleep(delay_function(num_try))
if prepare_method_name is not None: prepare_method(*prepare_method_args, **prepare_method_kwargs)
return(wrapper)
return(_reconnect)
import logging
# gRPC settings
SERVICE_PORT = 7070
MAX_WORKERS = 10
GRACE_PERIOD = 60
LOG_LEVEL = logging.WARNING
# Prometheus settings
METRICS_PORT = 8080
FROM python:3-slim
# Install dependencies
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ && \
rm -rf /var/lib/apt/lists/*
# Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0
# Download the gRPC health probe
RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# Get generic Python packages
RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools
# Set working directory
WORKDIR /var/teraflow
# Create module sub-folders
RUN mkdir -p /var/teraflow/context
# Get Python packages per module
COPY context/requirements.in context/requirements.in
RUN pip-compile --output-file=context/requirements.txt context/requirements.in
RUN python3 -m pip install -r context/requirements.in
# Add files into working directory
COPY common/. common
COPY context/. context
# Start context service
ENTRYPOINT ["python", "-m", "context.service"]
FROM context_service:develop
# Run integration tests
ENTRYPOINT ["pytest", "-v", "--log-level=DEBUG", "context/tests/test_integration.py"]
import grpc, logging
from google.protobuf.json_format import MessageToDict
from common.tools.RetryDecorator import retry, delay_exponential
from context.proto.context_pb2_grpc import ContextServiceStub
LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15
DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0)
class ContextClient:
def __init__(self, address, port):
self.endpoint = '{}:{}'.format(address, port)
LOGGER.debug('Creating channel to {}...'.format(self.endpoint))
self.channel = None
self.stub = None
self.connect()
LOGGER.debug('Channel created')
def connect(self):
self.channel = grpc.insecure_channel(self.endpoint)
self.stub = ContextServiceStub(self.channel)
def close(self):
if(self.channel is not None): self.channel.close()
self.channel = None
self.stub = None
@retry(exceptions=set(), max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect')
def GetTopology(self, request):
LOGGER.debug('GetTopology request: {}'.format(request))
response = self.stub.GetTopology(request)
LOGGER.debug('GetTopology result: {}'.format(response))
return MessageToDict(
response, including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False)
#!/bin/bash -eu
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/bin/bash -e
# Make folder containing the script the root folder for its execution
cd $(dirname $0)
rm -rf proto/*.py
touch proto/__init__.py
python -m grpc_tools.protoc -I../../proto --python_out=proto --grpc_python_out=proto context.proto
sed -i -E 's/(import\ .*)_pb2/from context.proto \1_pb2/g' proto/context_pb2.py
sed -i -E 's/(import\ .*)_pb2/from context.proto \1_pb2/g' proto/context_pb2_grpc.py
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment