Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc, json, logging, operator, sqlalchemy, threading, uuid
from sqlalchemy.orm import Session, contains_eager, selectinload, sessionmaker
from sqlalchemy.dialects.postgresql import UUID, insert
from sqlalchemy_cockroachdb import run_transaction
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union
from common.message_broker.MessageBroker import MessageBroker
from common.orm.backend.Tools import key_to_str
from common.proto.context_pb2 import (
Connection, ConnectionEvent, ConnectionId, ConnectionIdList, ConnectionList,
Context, ContextEvent, ContextId, ContextIdList, ContextList,
Device, DeviceEvent, DeviceId, DeviceIdList, DeviceList,
Empty, EventTypeEnum,
Link, LinkEvent, LinkId, LinkIdList, LinkList,
Service, ServiceEvent, ServiceId, ServiceIdList, ServiceList,
Slice, SliceEvent, SliceId, SliceIdList, SliceList,
Topology, TopologyEvent, TopologyId, TopologyIdList, TopologyList,
ConfigActionEnum, Constraint)
from common.proto.policy_pb2 import PolicyRuleIdList, PolicyRuleId, PolicyRuleList, PolicyRule
from common.proto.context_pb2_grpc import ContextServiceServicer
from common.proto.context_policy_pb2_grpc import ContextPolicyServiceServicer
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.rpc_method_wrapper.ServiceExceptions import InvalidArgumentException, NotFoundException
from common.tools.grpc.Tools import grpc_message_to_json, grpc_message_to_json_string
from context.service.Database import Database
from context.service.database.ConfigModel import (
ConfigModel, ORM_ConfigActionEnum, ConfigRuleModel, grpc_config_rules_to_raw, update_config)
from context.service.database.ConnectionModel import ConnectionModel, set_path
from context.service.database.ConstraintModel import (
ConstraintModel, ConstraintsModel, Union_ConstraintModel, CONSTRAINT_PARSERS, set_constraints)
from context.service.database.ContextModel import ContextModel
from context.service.database.DeviceModel import (
DeviceModel, grpc_to_enum__device_operational_status, set_drivers, grpc_to_enum__device_driver, DriverModel)
from context.service.database.EndPointModel import EndPointModel, KpiSampleTypeModel, set_kpi_sample_types
from context.service.database.Events import notify_event
from context.service.database.KpiSampleType import grpc_to_enum__kpi_sample_type
from context.service.database.LinkModel import LinkModel
from context.service.database.PolicyRuleModel import PolicyRuleModel
from context.service.database.RelationModels import (
ConnectionSubServiceModel, LinkEndPointModel, ServiceEndPointModel, SliceEndPointModel, SliceServiceModel,
SliceSubSliceModel, TopologyDeviceModel, TopologyLinkModel)
from context.service.database.ServiceModel import (
ServiceModel, grpc_to_enum__service_status, grpc_to_enum__service_type)
from context.service.database.SliceModel import SliceModel, grpc_to_enum__slice_status
from context.service.database.TopologyModel import TopologyModel
from .Constants import (
CONSUME_TIMEOUT, TOPIC_CONNECTION, TOPIC_CONTEXT, TOPIC_DEVICE, TOPIC_LINK, TOPIC_SERVICE, TOPIC_SLICE,
TOPIC_TOPOLOGY)
LOGGER = logging.getLogger(__name__)
SERVICE_NAME = 'Context'
METHOD_NAMES = [
'ListConnectionIds', 'ListConnections', 'GetConnection', 'SetConnection', 'RemoveConnection', 'GetConnectionEvents',
'ListContextIds', 'ListContexts', 'GetContext', 'SetContext', 'RemoveContext', 'GetContextEvents',
'ListTopologyIds', 'ListTopologies', 'GetTopology', 'SetTopology', 'RemoveTopology', 'GetTopologyEvents',
'ListDeviceIds', 'ListDevices', 'GetDevice', 'SetDevice', 'RemoveDevice', 'GetDeviceEvents',
'ListLinkIds', 'ListLinks', 'GetLink', 'SetLink', 'RemoveLink', 'GetLinkEvents',
'ListServiceIds', 'ListServices', 'GetService', 'SetService', 'RemoveService', 'GetServiceEvents',
'ListSliceIds', 'ListSlices', 'GetSlice', 'SetSlice', 'RemoveSlice', 'GetSliceEvents',
'ListPolicyRuleIds', 'ListPolicyRules', 'GetPolicyRule', 'SetPolicyRule', 'RemovePolicyRule',
'UnsetService', 'UnsetSlice',
]
METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class ContextServiceServicerImpl(ContextServiceServicer, ContextPolicyServiceServicer):
def __init__(self, db_engine : sqlalchemy.engine.Engine, messagebroker : MessageBroker) -> None:
LOGGER.debug('Creating Servicer...')
self.db_engine = db_engine
#self.lock = threading.Lock()
#session = sessionmaker(bind=db_engine, expire_on_commit=False)
#self.session = session
#self.database = Database(session)
self.messagebroker = messagebroker
LOGGER.debug('Servicer Created')
# ----- Context ----------------------------------------------------------------------------------------------------
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContextIds(self, request: Empty, context : grpc.ServicerContext) -> ContextIdList:
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
return [obj.dump_id() for obj in obj_list]
return ContextIdList(context_ids=run_transaction(sessionmaker(bind=self.db_engine), callback))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def ListContexts(self, request: Empty, context : grpc.ServicerContext) -> ContextList:
def callback(session : Session) -> List[Dict]:
obj_list : List[ContextModel] = session.query(ContextModel).all()
return [obj.dump() for obj in obj_list]
return ContextList(contexts=run_transaction(sessionmaker(bind=self.db_engine), callback))
@safe_and_metered_rpc_method(METRICS, LOGGER)
def GetContext(self, request: ContextId, context : grpc.ServicerContext) -> Context:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))
def callback(session : Session) -> Optional[Dict]:
obj : Optional[ContextModel] = \
session.query(ContextModel).filter_by(context_uuid=context_uuid).one_or_none()
return None if obj is None else obj.dump()
obj = run_transaction(sessionmaker(bind=self.db_engine), callback)
if obj is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
return Context(**obj)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def SetContext(self, request: Context, context : grpc.ServicerContext) -> ContextId:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_id.context_uuid.uuid))
context_name = request.context_id.context_uuid.uuid
for i, topology_id in enumerate(request.topology_ids):
topology_context_uuid = topology_id.context_id.context_uuid.uuid
if topology_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.topology_ids[{:d}].context_id.context_uuid.uuid'.format(i), topology_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
for i, service_id in enumerate(request.service_ids):
service_context_uuid = service_id.context_id.context_uuid.uuid
if service_context_uuid != context_uuid:
raise InvalidArgumentException(
'request.service_ids[{:d}].context_id.context_uuid.uuid'.format(i), service_context_uuid,
['should be == {:s}({:s})'.format('request.context_id.context_uuid.uuid', context_uuid)])
def callback(session : Session) -> Tuple[Optional[Dict], bool]:
obj : Optional[ContextModel] = \
session.query(ContextModel).with_for_update().filter_by(context_uuid=context_uuid).one_or_none()
updated = obj is not None
obj = ContextModel(context_uuid=context_uuid, context_name=context_name)
session.merge(obj)
session.commit()
obj = session.get(ContextModel, {'context_uuid': context_uuid})
return (None if obj is None else obj.dump_id()), updated
obj_id,updated = run_transaction(sessionmaker(bind=self.db_engine), callback)
if obj_id is None: raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#event_type = EventTypeEnum.EVENTTYPE_UPDATE if updated else EventTypeEnum.EVENTTYPE_CREATE
#notify_event(self.messagebroker, TOPIC_CONTEXT, event_type, {'context_id': obj_id})
return ContextId(**obj_id)
@safe_and_metered_rpc_method(METRICS, LOGGER)
def RemoveContext(self, request: ContextId, context : grpc.ServicerContext) -> Empty:
context_uuid = str(uuid.uuid5(uuid.NAMESPACE_OID, request.context_uuid.uuid))
def callback(session : Session) -> bool:
num_deleted = session.query(ContextModel).filter_by(context_uuid=context_uuid).delete()
return num_deleted > 0
deleted = run_transaction(sessionmaker(bind=self.db_engine), callback)
#if deleted:
# notify_event(self.messagebroker, TOPIC_CONTEXT, EventTypeEnum.EVENTTYPE_REMOVE, {'context_id': request})
return Empty()
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def GetContextEvents(self, request: Empty, context : grpc.ServicerContext) -> Iterator[ContextEvent]:
# for message in self.messagebroker.consume({TOPIC_CONTEXT}, consume_timeout=CONSUME_TIMEOUT):
# yield ContextEvent(**json.loads(message.content))
# ----- Topology ---------------------------------------------------------------------------------------------------
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def ListTopologyIds(self, request: ContextId, context : grpc.ServicerContext) -> TopologyIdList:
# context_uuid = request.context_uuid.uuid
#
# with self.session() as session:
# result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(context_uuid=context_uuid).one_or_none()
# if not result:
# raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#
# db_topologies = result.topology
# return TopologyIdList(topology_ids=[db_topology.dump_id() for db_topology in db_topologies])
#
# @safe_and_metered_rpc_method(METRICS, LOGGER)
# def ListTopologies(self, request: ContextId, context : grpc.ServicerContext) -> TopologyList:
# context_uuid = request.context_uuid.uuid
#
# with self.session() as session:
# result = session.query(ContextModel).options(selectinload(ContextModel.topology)).filter_by(
# context_uuid=context_uuid).one_or_none()
# if not result:
# raise NotFoundException(ContextModel.__name__.replace('Model', ''), context_uuid)
#
# db_topologies = result.topology
# return TopologyList(topologies=[db_topology.dump() for db_topology in db_topologies])
#
# @safe_and_metered_rpc_method(METRICS, LOGGER)
Loading
Loading full blame…