Skip to content
Snippets Groups Projects
Commit c8fa042e authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Preliminary unfinished version of device service and Context API based on...

Preliminary unfinished version of device service and Context API based on Redis. To be finished, it is just a backup.
parent 5f749f41
No related branches found
No related tags found
1 merge request!54Release 2.0.0
Showing
with 920 additions and 0 deletions
apiVersion: apps/v1
kind: Deployment
metadata:
name: deviceservice
spec:
selector:
matchLabels:
app: deviceservice
template:
metadata:
labels:
app: deviceservice
spec:
terminationGracePeriodSeconds: 5
containers:
- name: server
image: device:dockerfile
imagePullPolicy: Never
ports:
- containerPort: 7070
env:
- name: DB_ENGINE
value: "redis"
- name: REDISDB_DATABASE_ID
value: "0"
- name: LOG_LEVEL
value: "DEBUG"
readinessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"]
livenessProbe:
exec:
command: ["/bin/grpc_health_probe", "-addr=:7070"]
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 700m
memory: 1024Mi
---
apiVersion: v1
kind: Service
metadata:
name: deviceservice
spec:
type: ClusterIP
selector:
app: deviceservice
ports:
- name: grpc
port: 7070
targetPort: 7070
from redis.client import Redis
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .tools.EntityCollection import EntityCollection
from .tools.Mutex import Mutex
from .Keys import KEY_CONTEXT, KEY_TOPOLOGIES
from .Topology import Topology
VALIDATORS = {}
class Context(_Entity):
def __init__(self, context_uuid : str, redis_client: Redis):
self.__redis_client = redis_client
self.__mutex = Mutex(self.__redis_client)
self.__acquired = False
self.__owner_key = None
self.__entity_key = KEY_CONTEXT.format(context_uuid=context_uuid)
super().__init__(context_uuid, self)
self.context_uuid = self.get_uuid()
self.attributes = EntityAttributes(self, KEY_CONTEXT, validators=VALIDATORS)
self.topologies = EntityCollection(self, KEY_TOPOLOGIES)
def get_parent(self): return(self)
def get_context(self): return(self)
def get_redis_client(self): return(self.__redis_client)
def __enter__(self):
self.__acquired,self.__owner_key = self.__mutex.acquire(self.__entity_key, owner_key=self.__owner_key)
if not self.__acquired: raise Exception('Unable to acquire {}'.format(self.__entity_key))
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.__acquired: return
self.__mutex.release(self.__entity_key, self.__owner_key)
def topology(self, topology_uuid : str) -> Topology:
return Topology(topology_uuid, self)
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .tools.EntityCollection import EntityCollection
from .Keys import KEY_DEVICE, KEY_DEVICE_ENDPOINTS
from .Endpoint import Endpoint
from .OperationalStatus import OperationalStatus, to_operationalstatus_enum
VALIDATORS = {
'device_type': lambda v: v is None or isinstance(v, str),
'device_config': lambda v: v is None or isinstance(v, str),
'device_operational_status': lambda v: v is None or isinstance(v, OperationalStatus),
}
TRANSCODERS = {
'device_operational_status': {
OperationalStatus: lambda v: v.value,
str : lambda v: to_operationalstatus_enum(v),
}
}
class Device(_Entity):
def __init__(self, device_uuid : str, parent : 'Topology'): # type: ignore
super().__init__(device_uuid, parent=parent)
self.device_uuid = self.get_uuid()
self.topology_uuid = self._parent.get_uuid()
self.context_uuid = self._parent._parent.get_uuid()
self.attributes = EntityAttributes(self, KEY_DEVICE, VALIDATORS, transcoders=TRANSCODERS)
self.endpoints = EntityCollection(self, KEY_DEVICE_ENDPOINTS)
def endpoint(self, endpoint_uuid : str) -> Endpoint:
return Endpoint(endpoint_uuid, self)
def create(self, type : str, config : str, operational_status : OperationalStatus) -> 'Device':
self.update(update_attributes={
'device_type': type,
'device_config': config,
'device_operational_status': operational_status,
})
self._parent.devices.add(self.get_uuid())
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'Device':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
remove_attributes = ['device_type', 'device_config', 'device_operational_status']
self.update(remove_attributes=remove_attributes)
self._parent.devices.delete(self.get_uuid())
def dump(self) -> Dict:
attributes = self.attributes.get()
dev_op_status = attributes.get('device_operational_status', None)
if isinstance(dev_op_status, OperationalStatus): dev_op_status = dev_op_status.value
endpoints = [self.endpoint(endpoint_uuid).dump() for endpoint_uuid in self.endpoints.get()]
return {
'device_id': {'device_id': {'uuid': self.device_uuid}},
'device_type': attributes.get('device_type', None),
'device_config': {'device_config': attributes.get('device_config', None)},
'devOperationalStatus': dev_op_status,
'endpointList': endpoints
}
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .Keys import KEY_ENDPOINT
VALIDATORS = {
'port_type': lambda v: v is None or isinstance(v, str),
}
class Endpoint(_Entity):
def __init__(self, endpoint_uuid : str, parent : 'Device'): # type: ignore
super().__init__(endpoint_uuid, parent=parent)
self.endpoint_uuid = self.get_uuid()
self.device_uuid = self._parent.get_uuid()
self.topology_uuid = self._parent._parent.get_uuid()
self.context_uuid = self._parent._parent._parent.get_uuid()
self.attributes = EntityAttributes(self, KEY_ENDPOINT, VALIDATORS)
def create(self, type : str) -> 'Endpoint':
self.update(update_attributes={
'port_type': type
})
self._parent.endpoints.add(self.get_uuid())
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'Endpoint':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
remove_attributes = ['port_type']
self.update(remove_attributes=remove_attributes)
self._parent.endpoints.delete(self.get_uuid())
def dump_uuid(self) -> Dict:
return {
'topoId': {'uuid': self.topology_uuid},
'dev_id': {'uuid': self.device_uuid},
'port_id': {'uuid': self.endpoint_uuid},
}
def dump(self) -> Dict:
attributes = self.attributes.get()
return {
'port_id': self.dump_uuid(),
'port_type': attributes.get('port_type', None),
}
KEY_CONTEXT = 'context[{context_uuid}]'
KEY_TOPOLOGIES = KEY_CONTEXT + '/topologies{container_name}'
KEY_TOPOLOGY = KEY_CONTEXT + '/topology[{topology_uuid}]'
KEY_DEVICES = KEY_TOPOLOGY + '/devices{container_name}'
KEY_LINKS = KEY_TOPOLOGY + '/links{container_name}'
KEY_DEVICE = KEY_TOPOLOGY + '/device[{device_uuid}]'
KEY_DEVICE_ENDPOINTS = KEY_DEVICE + '/endpoints{container_name}'
KEY_ENDPOINT = KEY_DEVICE + '/endpoint[{endpoint_uuid}]'
KEY_LINK = KEY_TOPOLOGY + '/link[{link_uuid}]'
KEY_LINK_ENDPOINTS = KEY_LINK + '/endpoints{container_name}'
KEY_LINK_ENDPOINT = KEY_LINK + '/endpoint[{link_endpoint_uuid}]'
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityCollection import EntityCollection
from .Keys import KEY_LINK_ENDPOINTS
from .LinkEndpoint import LinkEndpoint
class Link(_Entity):
def __init__(self, link_uuid : str, parent : 'Topology'): # type: ignore
super().__init__(link_uuid, parent=parent)
self.link_uuid = self.get_uuid()
self.topology_uuid = self._parent.get_uuid()
self.context_uuid = self._parent._parent.get_uuid()
self.endpoints = EntityCollection(self, KEY_LINK_ENDPOINTS)
def endpoint(self, link_endpoint_uuid : str) -> LinkEndpoint:
return LinkEndpoint(link_endpoint_uuid, self)
def create(self) -> 'Link':
self._parent.links.add(self.get_uuid())
return self
def delete(self) -> None:
self._parent.links.delete(self.get_uuid())
def dump(self) -> Dict:
endpoints = [self.endpoint(link_endpoint_uuid).dump() for link_endpoint_uuid in self.endpoints.get()]
return {
'link_id': {'link_id': {'uuid': self.link_uuid}},
'endpointList': endpoints
}
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .Endpoint import Endpoint
from .Keys import KEY_LINK_ENDPOINT
VALIDATORS = {
'device_uuid': lambda v: v is None or isinstance(v, str),
'endpoint_uuid': lambda v: v is None or isinstance(v, str),
}
class LinkEndpoint(_Entity):
def __init__(self, link_endpoint_uuid : str, parent : 'Link'): # type: ignore
super().__init__(link_endpoint_uuid, parent=parent)
self.link_endpoint_uuid = self.get_uuid()
self.link_uuid = self._parent.get_uuid()
self.topology_uuid = self._parent._parent.get_uuid()
self.context_uuid = self._parent._parent._parent.get_uuid()
self.attributes = EntityAttributes(self, KEY_LINK_ENDPOINT, VALIDATORS)
def create(self, endpoint : Endpoint) -> 'LinkEndpoint':
self.update(update_attributes={
'device_uuid': endpoint._parent.get_uuid(),
'endpoint_uuid': endpoint.get_uuid(),
})
self._parent.endpoints.add(self.get_uuid())
return self
def update(self, update_attributes={}, remove_attributes=[]) -> 'LinkEndpoint':
self.attributes.update(update_attributes=update_attributes, remove_attributes=remove_attributes)
return self
def delete(self) -> None:
remove_attributes = ['device_uuid', 'endpoint_uuid']
self.update(remove_attributes=remove_attributes)
self._parent.endpoints.delete(self.get_uuid())
def dump(self) -> Dict:
attributes = self.attributes.get()
return {
'topoId': {'uuid': self.topology_uuid},
'dev_id': {'uuid': attributes.get('device_uuid', None)},
'port_id': {'uuid': attributes.get('endpoint_uuid', None)},
}
from enum import Enum
class OperationalStatus(Enum):
ENABLED = 1
DISABLED = 0
TO_ENUM = {
1: OperationalStatus.ENABLED,
0: OperationalStatus.DISABLED,
'1': OperationalStatus.ENABLED,
'0': OperationalStatus.DISABLED,
'enabled': OperationalStatus.ENABLED,
'disabled': OperationalStatus.DISABLED,
}
def to_operationalstatus_enum(int_or_str):
if isinstance(int_or_str, str): int_or_str = int_or_str.lower()
return TO_ENUM.get(int_or_str)
from typing import Dict
from .tools._Entity import _Entity
from .tools.EntityAttributes import EntityAttributes
from .tools.EntityCollection import EntityCollection
from .Keys import KEY_TOPOLOGY, KEY_DEVICES, KEY_LINKS
from .Device import Device
from .Link import Link
VALIDATORS = {}
class Topology(_Entity):
def __init__(self, topology_uuid : str, parent : 'Context'): # type: ignore
super().__init__(topology_uuid, parent=parent)
self.topology_uuid = self.get_uuid()
self.context_uuid = self._parent.get_uuid()
self.attributes = EntityAttributes(self, KEY_TOPOLOGY, validators=VALIDATORS)
self.devices = EntityCollection(self, KEY_DEVICES)
self.links = EntityCollection(self, KEY_LINKS)
def device(self, device_uuid : str) -> Device:
return Device(device_uuid, self)
def link(self, link_uuid : str) -> Link:
return Link(link_uuid, self)
def create(self) -> 'Topology':
self._parent.topologies.add(self.get_uuid())
return self
def delete(self):
self._parent.topologies.delete(self.get_uuid())
def dump(self) -> Dict:
devices = [self.device(device_uuid).dump() for device_uuid in self.devices.get()]
links = [self.link (link_uuid ).dump() for link_uuid in self.links.get()]
return {
'topoId': {
'contextId': {'contextUuid': {'uuid': self.context_uuid}},
'topoId': {'uuid': self.topology_uuid},
},
'device': devices,
'link': links,
}
import json, logging, sys, time
from redis.client import Redis
from .tools.RedisTools import dump_keys
from .Context import Context
from .OperationalStatus import OperationalStatus
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
def main():
redis_client = Redis('127.0.0.1', 31926)
LOGGER.info('Cleaning up...')
LOGGER.info(' keys before = {}'.format(str(redis_client.keys())))
keys = redis_client.keys()
for key in keys:
key_name = key.decode('UTF-8')
if(key_name == 'topology'): continue
redis_client.delete(key_name)
LOGGER.info(' keys after = {}'.format(str(redis_client.keys())))
with Context('ctx-test', redis_client) as context:
topology = context.topology('base-topo').create()
device_1 = topology.device('dev1').create(type='ROADM', config='<config/>', operational_status=OperationalStatus.ENABLED)
endpoint_dev1_to_dev2 = device_1.endpoint('to-dev2').create(type='WDM')
endpoint_dev1_to_dev3 = device_1.endpoint('to-dev3').create(type='WDM')
endpoint_dev1_to_dev4 = device_1.endpoint('to-dev4').create(type='WDM')
device_2 = topology.device('dev2').create(type='ROADM', config='<config/>', operational_status=OperationalStatus.ENABLED)
endpoint_dev2_to_dev1 = device_2.endpoint('to-dev1').create(type='WDM')
endpoint_dev2_to_dev3 = device_2.endpoint('to-dev3').create(type='WDM')
endpoint_dev2_to_dev4 = device_2.endpoint('to-dev4').create(type='WDM')
device_3 = topology.device('dev3').create(type='ROADM', config='<config/>', operational_status=OperationalStatus.ENABLED)
endpoint_dev3_to_dev1 = device_3.endpoint('to-dev1').create(type='WDM')
endpoint_dev3_to_dev2 = device_3.endpoint('to-dev2').create(type='WDM')
endpoint_dev3_to_dev4 = device_3.endpoint('to-dev4').create(type='WDM')
device_4 = topology.device('dev4').create(type='ROADM', config='<config/>', operational_status=OperationalStatus.ENABLED)
endpoint_dev4_to_dev1 = device_4.endpoint('to-dev1').create(type='WDM')
endpoint_dev4_to_dev2 = device_4.endpoint('to-dev2').create(type='WDM')
endpoint_dev4_to_dev3 = device_4.endpoint('to-dev3').create(type='WDM')
link_dev1_to_dev2 = topology.link('dev1/to-dev2 ==> dev2/to-dev1').create()
link_dev1_to_dev2.endpoint('dev1/to-dev2').create(endpoint_dev1_to_dev2)
link_dev1_to_dev2.endpoint('dev2/to-dev1').create(endpoint_dev2_to_dev1)
link_dev1_to_dev3 = topology.link('dev1/to-dev3 ==> dev3/to-dev1').create()
link_dev1_to_dev3.endpoint('dev1/to-dev3').create(endpoint_dev1_to_dev3)
link_dev1_to_dev3.endpoint('dev3/to-dev1').create(endpoint_dev3_to_dev1)
link_dev1_to_dev4 = topology.link('dev1/to-dev4 ==> dev4/to-dev1').create()
link_dev1_to_dev4.endpoint('dev1/to-dev4').create(endpoint_dev1_to_dev4)
link_dev1_to_dev4.endpoint('dev4/to-dev1').create(endpoint_dev4_to_dev1)
link_dev2_to_dev1 = topology.link('dev2/to-dev1 ==> dev1/to-dev2').create()
link_dev2_to_dev1.endpoint('dev2/to-dev1').create(endpoint_dev2_to_dev1)
link_dev2_to_dev1.endpoint('dev1/to-dev2').create(endpoint_dev1_to_dev2)
link_dev2_to_dev3 = topology.link('dev2/to-dev3 ==> dev3/to-dev2').create()
link_dev2_to_dev3.endpoint('dev2/to-dev3').create(endpoint_dev2_to_dev3)
link_dev2_to_dev3.endpoint('dev3/to-dev2').create(endpoint_dev3_to_dev2)
link_dev2_to_dev4 = topology.link('dev2/to-dev4 ==> dev4/to-dev2').create()
link_dev2_to_dev4.endpoint('dev2/to-dev4').create(endpoint_dev2_to_dev4)
link_dev2_to_dev4.endpoint('dev4/to-dev2').create(endpoint_dev4_to_dev2)
link_dev3_to_dev1 = topology.link('dev3/to-dev1 ==> dev1/to-dev3').create()
link_dev3_to_dev1.endpoint('dev3/to-dev1').create(endpoint_dev3_to_dev1)
link_dev3_to_dev1.endpoint('dev1/to-dev3').create(endpoint_dev1_to_dev3)
link_dev3_to_dev2 = topology.link('dev3/to-dev2 ==> dev2/to-dev3').create()
link_dev3_to_dev2.endpoint('dev3/to-dev2').create(endpoint_dev3_to_dev2)
link_dev3_to_dev2.endpoint('dev2/to-dev3').create(endpoint_dev2_to_dev3)
link_dev3_to_dev4 = topology.link('dev3/to-dev4 ==> dev4/to-dev3').create()
link_dev3_to_dev4.endpoint('dev3/to-dev4').create(endpoint_dev3_to_dev4)
link_dev3_to_dev4.endpoint('dev4/to-dev3').create(endpoint_dev4_to_dev3)
link_dev4_to_dev1 = topology.link('dev4/to-dev1 ==> dev1/to-dev4').create()
link_dev4_to_dev1.endpoint('dev4/to-dev1').create(endpoint_dev4_to_dev1)
link_dev4_to_dev1.endpoint('dev1/to-dev4').create(endpoint_dev1_to_dev4)
link_dev4_to_dev2 = topology.link('dev4/to-dev2 ==> dev2/to-dev4').create()
link_dev4_to_dev2.endpoint('dev4/to-dev2').create(endpoint_dev4_to_dev2)
link_dev4_to_dev2.endpoint('dev2/to-dev4').create(endpoint_dev2_to_dev4)
link_dev4_to_dev3 = topology.link('dev4/to-dev3 ==> dev3/to-dev4').create()
link_dev4_to_dev3.endpoint('dev4/to-dev3').create(endpoint_dev4_to_dev3)
link_dev4_to_dev3.endpoint('dev3/to-dev4').create(endpoint_dev3_to_dev4)
dump_keys(redis_client, LOGGER)
with Context('ctx-test', redis_client) as context:
t0 = time.time()
json_topology = context.topology('base-topo').dump()
t1 = time.time()
LOGGER.info(json.dumps(json_topology))
LOGGER.info('Dump from Redis elapsed: {}'.format(1000.0 * (t1-t0)))
return(0)
if __name__=='__main__':
sys.exit(main())
############################
# Redis internal structure #
############################
Note (1): for containers like topologies, devices, links, etc. two containers are defined:
list List is a sorted list containing the uuid's of th elements belonging to the parent element. It is used to
define the order of the elements and enable to iterate them deterministically.
set Set is an unordered set containing the uuid's of the elements belonging to the parent element. It is used to
check existence of elements within the parent in O(1).
Context structure
-----------------
context[<context_uuid>]/lock
String containing the mutex owner key of that user/process/thread that is currently managing the context.
Example:
context[ctx-test]/lock
dc56647a-9539-446e-9a61-cbc67de328e4
context[<context_uuid>]/topologies_<container>
Containers (see Note 1) with the topology_uuid's belonging to the context.
Examples:
context[ctx-test]/topologies_list
['base-topo', 'other-topo']
context[ctx-test]/topologies_set
{'base-topo', 'other-topo'}
Topology structure:
-------------------
context[<context_uuid>]/topology[<topology_uuid>]
Hash set containing the attributes for the topology.
NOTE: Currently not used.
Example: <none>
context[<context_uuid>]/topology[<topology_uuid>]/devices_<container>
Containers (see Note 1) with the device_uuid's belonging to the topology.
Examples:
context[ctx-test]/topology[base-topo]/device_list
['dev1', 'dev2', 'dev3', 'dev4']
context[ctx-test]/topology[base-topo]/device_set
{'dev2', 'dev3', 'dev4', 'dev1'}
context[<context_uuid>]/topology[<topology_uuid>]/links_<container>
Containers (see Note 1) with the link_uuid's belonging to the topology.
Examples:
context[ctx-test]/topology[base-topo]/link_list
['dev1/to-dev2 ==> dev2/to-dev1', 'dev1/to-dev3 ==> dev3/to-dev1', 'dev2/to-dev1 ==> dev1/to-dev2', ...]
context[ctx-test]/topology[base-topo]/link_set
{'dev2/to-dev1 ==> dev1/to-dev2', 'dev1/to-dev2 ==> dev2/to-dev1', 'dev1/to-dev3 ==> dev3/to-dev1', ...}
Device structure:
-----------------
context[<context_uuid>]/topology[<topology_uuid>]/device[<device_uuid>]
Hash set containing the attributes for the device.
Defined attributes are:
device_type : string
device_config : string
device_operational_status: string "0"/"1"
Example: {'device_type': 'ROADM', 'device_config': '<config/>', 'device_operational_status': '1'}
context[<context_uuid>]/topology[<topology_uuid>]/device[<device_uuid>]/endpoints_<container>
Containers (see Note 1) with the endpoints_uuid's belonging to the device.
Examples:
context[ctx-test]/topology[base-topo]/device[dev1]/endpoints_list
['to-dev2', 'to-dev3', 'to-dev4']
context[ctx-test]/topology[base-topo]/device[dev1]/endpoints_set
{'to-dev3', 'to-dev2', 'to-dev4'}
Device Endpoint structure:
--------------------------
context[<context_uuid>]/topology[<topology_uuid>]/device[<device_uuid>]/endpoint[<endpoint_uuid>]
Hash set containing the attributes for the endpoint.
Defined attributes are:
port_type: string
Example: {'port_type': 'WDM'}
Link structure:
---------------
context[<context_uuid>]/topology[<topology_uuid>]/link[<link_uuid>]
Hash set containing the attributes for the link.
NOTE: Currently not used.
Example: <none>
context[<context_uuid>]/topology[<topology_uuid>]/link[<link_uuid>]/endpoints_<container>
Containers (see Note 1) with the link_endpoint_uuid's belonging to the link.
Examples:
context[ctx-test]/topology[base-topo]/link[dev2/to-dev1 ==> dev1/to-dev2]/endpoints_list
['dev2/to-dev1', 'dev1/to-dev2']
context[ctx-test]/topology[base-topo]/link[dev2/to-dev1 ==> dev1/to-dev2]/endpoints_set
{'dev2/to-dev1', 'dev1/to-dev2'}
Link Endpoint structure:
------------------------
context[<context_uuid>]/topology[<topology_uuid>]/link[<link_uuid>]/endpoint[<link_endpoint_uuid>]
Hash set containing the attributes for the link_endpoint.
Defined attributes are:
device_uuid: string
endpoint_uuid: string
Example:
context[ctx-test]/topology[base-topo]/link[dev1/to-dev2 ==> dev2/to-dev1]/endpointdev1/to-dev2
{'device_uuid': 'dev1', 'endpoint_uuid': 'to-dev2'}
import copy
from redis.client import Redis
class EntityAttributes:
def __init__(self, parent, entity_key_attributes, validators, transcoders={}):
self.__parent = parent
self.__redis_client : Redis = self.__parent.get_context().get_redis_client()
self.__entity_key_attributes = entity_key_attributes.format(**self.__parent.__dict__)
self.__validators = validators
self.__transcoders = transcoders
def validate(self, update_attributes, remove_attributes, attribute_name):
remove_attributes.discard(attribute_name)
value = update_attributes.pop(attribute_name, None)
validator = self.__validators.get(attribute_name)
if not validator(value): raise AttributeError('{} is invalid'.format(attribute_name))
def transcode(self, attribute_name, attribute_value):
transcoder_set = self.__transcoders.get(attribute_name, {})
transcoder = transcoder_set.get(type(attribute_value))
return attribute_value if transcoder is None else transcoder(attribute_value)
def get(self, attributes=[]):
if len(attributes) == 0: # retrieve all
keys_values = self.__redis_client.hgetall(self.__entity_key_attributes).items()
else: # retrieve selected
names = list(attributes)
keys_values = zip(names, self.__redis_client.hmget(self.__entity_key_attributes, attributes))
attributes = {}
for key,value in keys_values:
str_key = key.decode('UTF-8') if isinstance(key, bytes) else key
str_value = value.decode('UTF-8') if isinstance(value, bytes) else value
attributes[str_key] = self.transcode(str_key, str_value)
return attributes
def update(self, update_attributes={}, remove_attributes=[]):
remove_attributes = set(remove_attributes)
copy_update_attributes = copy.deepcopy(update_attributes)
copy_remove_attributes = copy.deepcopy(remove_attributes)
for attribute_name in self.__validators.keys():
self.validate(copy_update_attributes, copy_remove_attributes, attribute_name)
attribute_value = update_attributes.get(attribute_name)
if attribute_value is None: continue
update_attributes[attribute_name] = self.transcode(attribute_name, attribute_value)
if len(copy_update_attributes) > 0:
raise AttributeError('Unexpected update_attributes: {}'.format(str(copy_update_attributes)))
if len(copy_remove_attributes) > 0:
raise AttributeError('Unexpected remove_attributes: {}'.format(str(copy_remove_attributes)))
if len(remove_attributes) > 0:
self.__redis_client.hdel(self.__entity_key_attributes, remove_attributes)
if len(update_attributes) > 0:
self.__redis_client.hmset(self.__entity_key_attributes, update_attributes)
return self
def delete(self, attributes=[]):
if len(attributes) == 0: # delete all
self.__redis_client.delete(self.__entity_key_attributes)
else: # delete selected
self.__redis_client.hdel(self.__entity_key_attributes, attributes)
from redis.client import Redis
class EntityCollection:
def __init__(self, parent, entity_key):
self.__parent = parent
self.__redis_client : Redis = self.__parent.get_context().get_redis_client()
self.__entity_key_list = entity_key.format(container_name='_list', **self.__parent.__dict__)
self.__entity_key_set = entity_key.format(container_name='_set', **self.__parent.__dict__)
def add(self, entity_uuid):
if self.__redis_client.sismember(self.__entity_key_set, entity_uuid) == 1: return
self.__redis_client.sadd(self.__entity_key_set, entity_uuid)
self.__redis_client.rpush(self.__entity_key_list, entity_uuid)
def get(self):
return list(map(lambda m: m.decode('UTF-8'), self.__redis_client.lrange(self.__entity_key_list, 0, -1)))
def contains(self, entity_uuid):
return self.__redis_client.sismember(self.__entity_key_set, entity_uuid) == 1
def delete(self, entity_uuid):
if self.__redis_client.sismember(self.__entity_key_set, entity_uuid) == 0: return
self.__redis_client.srem(self.__entity_key_set, entity_uuid)
self.__redis_client.lrem(self.__entity_key_list, 1, entity_uuid)
import random, time, uuid
from typing import Set, Union
from redis.client import Redis
KEY_LOCK = '{}/lock'
MIN_WAIT_TIME = 0.01
class Mutex:
def __init__(self, redis_client: Redis) -> None:
if not isinstance(redis_client, Redis):
raise AttributeError('redis_client must be an instance of redis.client.Redis')
self.redis_client = redis_client
self.script_release = None
self.script_refresh_expire = None
self.__register_scripts()
def __register_scripts(self):
# Script mutex_release
# Description: atomic script to release a set of mutex keys, only if all mutex keys are owned by the caller.
# if owner_key matches key stored in all mutexes, remove all mutexes and return 1. if some key
# does not match, do nothing and return 0.
# Keys: set of entity_keys to be released
# Args: owner_key
# Ret : 1 if all keys have been released, 0 otherwise (no action performed)
# Use : acquired = (int(self.script_release(keys=['mutex1', 'mutex2'], args=[owner_key])) == 1)
self.script_release = self.redis_client.register_script('\n'.join([
"for _,key in ipairs(KEYS) do",
" local owner_key = redis.call('get', key)",
" if owner_key ~= ARGV[1] then return 0 end",
"end",
"for _,key in ipairs(KEYS) do",
" redis.call('del', key)",
"end",
"return 1",
]))
# Script mutex_refresh_expire
# Description: atomic script to refresh expiracy of a set of mutex keys, only if all of them are owned by the
# caller. if owner_key matches key stored in all mutexes, refresh expiracy on all mutexes and
# return 1. if some key does not match, do nothing and return 0.
# Keys: set of entity_keys to be refreshed
# Args: owner_key, expiracy_seconds
# Ret : 1 if all keys have been refreshed, 0 otherwise (no action performed)
# Use : done = (int(self.script_refresh_expire(keys=['mutex1', 'mutex2'], args=[owner_key, seconds])) == 1)
self.script_refresh_expire = self.redis_client.register_script('\n'.join([
"for _,key in ipairs(KEYS) do",
" local owner_key = redis.call('get', key)",
" if owner_key ~= ARGV[1] then return 0 end",
"end",
"for _,key in ipairs(KEYS) do",
" redis.call('expire', key, ARGV[2])",
"end",
"return 1",
]))
def acquire(self, entity_key_or_keys : Union[str, Set[str]], owner_key : Union[str, None] = None,
blocking : bool = True, timeout : Union[float, int] = 5,
expiracy_seconds : Union[float, int, None] = None):
# Atomically set all entity_keys or none of them.
# entity_key_or_keys contains either a string with a specific entity key or a set with all entity keys to be
# set atomically.
# owner_key enables to specify the desired key to use to mark the mutex. When releasing, the owner_key must be
# correct, otherwise, the key will not be released. It can also be used to check if mutex is still owned by
# oneself or was lost and acquired by another party. If set to None, a random key is generated and returned
# together with the acquired boolean value.
# blocking defines wether the acquisition should be blocking, meaning that acquisition will be retired with
# random increments until timeout timeout is elapsed.
# Optionally, an expiracy_seconds period can be specified in expiracy_seconds. If mutex is not released after
# that period of time, the mutex will be released automatically.
# If mutex(es) is(are) acquired, the method returns True and the owner_key used to create the lock; otherwise,
# False and None owner_key are returned.
owner_key = owner_key or str(uuid.uuid4())
entity_keys = entity_key_or_keys if isinstance(entity_key_or_keys, set) else {str(entity_key_or_keys)}
entity_key_map = {KEY_LOCK.format(entity_key):owner_key for entity_key in entity_keys}
acquired = False
if blocking:
remaining_wait_time = timeout
while not acquired:
acquired = (self.redis_client.msetnx(entity_key_map) == 1)
if acquired: break
if remaining_wait_time < MIN_WAIT_TIME: return False, None
wait_time = remaining_wait_time * random.random()
remaining_wait_time -= wait_time
time.sleep(wait_time)
else:
acquired = (self.redis_client.msetnx(entity_key_map) == 1)
if not acquired: return False, None
if expiracy_seconds is not None:
pipeline = self.redis_client.pipeline()
for entity_key in entity_key_map.keys(): pipeline.expire(entity_key, expiracy_seconds)
pipeline.execute()
return True, owner_key
def release(self, entity_key_or_keys : Union[str, Set[str]], owner_key : str):
# release mutex keys only if all of them are owned by the caller
# return True if succeeded, False (nothing changed) otherwise
entity_keys = entity_key_or_keys if isinstance(entity_key_or_keys, set) else {str(entity_key_or_keys)}
entity_keys = {KEY_LOCK.format(entity_key) for entity_key in entity_keys}
return int(self.script_release(keys=list(entity_keys), args=[owner_key])) == 1
def acquired(self, entity_key : str, owner_key : str):
# check if a mutex is owned by the owner with owner_key
value = self.redis_client.get(KEY_LOCK.format(entity_key))
if(value is None): return(False)
return str(value) == owner_key
def get_ttl(self, entity_key : str):
# check a mutex's time to live
return self.redis_client.ttl(KEY_LOCK.format(entity_key))
def refresh_expiracy(self, entity_key_or_keys : Union[str, Set[str]], owner_key : str,
expiracy_seconds : Union[float, int]):
# refresh expiracy on specified mutex keys only if all of them are owned by the caller
# return True if succeeded, False (nothing changed) otherwise
entity_keys = entity_key_or_keys if isinstance(entity_key_or_keys, set) else {str(entity_key_or_keys)}
entity_keys = {KEY_LOCK.format(entity_key) for entity_key in entity_keys}
return int(self.script_refresh_expire(keys=entity_keys, args=[owner_key, expiracy_seconds])) == 1
def dump_keys(redis_client, logger):
logger.info('Dump keys...')
keys = redis_client.keys()
logger.info(' keys = {}'.format(str(keys)))
for key_name in keys:
key_name = key_name.decode('UTF-8')
if not key_name.startswith('context'): continue
key_type = redis_client.type(key_name)
if key_type is not None: key_type = key_type.decode('UTF-8')
key_content = {
'hash' : lambda key: {k.decode('UTF-8'):v.decode('UTF-8') for k,v in redis_client.hgetall(key).items()},
'list' : lambda key: [m.decode('UTF-8') for m in redis_client.lrange(key, 0, -1)],
'set' : lambda key: {m.decode('UTF-8') for m in redis_client.smembers(key)},
'string': lambda key: redis_client.get(key).decode('UTF-8'),
}.get(key_type, lambda key: 'UNSUPPORTED_TYPE')
logger.info(' key {} {}: {}'.format(key_type, key_name, key_content(key_name)))
from typing import Dict
class _Entity:
def __init__(self, entity_uuid, parent=None):
if (parent is None) or (not isinstance(parent, _Entity)):
raise AttributeError('parent must be an instance of _Entity')
self._entity_uuid = entity_uuid
self._parent = parent
self._context = self._parent.get_context()
def get_uuid(self): return(self._entity_uuid)
def get_parent(self): return(self._parent)
def get_context(self): return(self._context)
def load(self):
raise NotImplementedError()
def create(self):
raise NotImplementedError()
def delete(self):
raise NotImplementedError()
def dump(self) -> Dict:
raise NotImplementedError()
variables:
IMAGE_NAME: 'context' # name of the microservice
IMAGE_NAME_TEST: 'context-test' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
# build the Docker image
build context:
stage: build
script:
- docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# tags the Docker image
tag context:
stage: build
script:
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# push the Docker image to the gitlab Docker registry
push context:
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# test if the Docker image can be pulled from the gitlab registry
test context pull:
stage: test
needs:
- push context
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
# test if the Docker image can be executed
test context run:
stage: test
needs:
- build context
before_script:
- if docker network list | grep teraflowbridge; then echo "teraflowbridge is already created"; else docker network create -d bridge teraflowbridge; fi
script:
- docker run -d -p 1010:1010 --name context --network=teraflowbridge --rm "$IMAGE_NAME:$IMAGE_TAG"
- docker ps > deploy_test_report.txt
after_script:
- docker stop context
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
artifacts:
when: always
paths:
- deploy_test_report.txt
expire_in: 1 day
# apply unit test to the context component
test context pytest:
stage: test
needs:
- build context
script:
- docker build -t "$IMAGE_NAME_TEST:$IMAGE_TAG" -f ./src/$IMAGE_NAME/tests/Dockerfile ./src/ > pytest_report.txt
rules:
- changes:
- src/$IMAGE_NAME/**
- .gitlab-ci.yml
artifacts:
when: always
paths:
- pytest_report.txt
expire_in: 1 day
# Deployment of the monitoring service in Kubernetes Cluster
deploy context:
stage: deploy
needs:
- build context
- test context run
script:
- kubectl apply -f "manisfests/contextservice.yaml"
when: manual
import logging
# gRPC settings
SERVICE_PORT = 7070
MAX_WORKERS = 10
GRACE_PERIOD = 60
LOG_LEVEL = logging.WARNING
# Prometheus settings
METRICS_PORT = 8080
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment