Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
P4 service handler for the TeraFlowSDN controller.
import anytree, json, logging
from typing import Any, Dict, List, Optional, Tuple, Union
from common.proto.context_pb2 import ConfigActionEnum, ConfigRule, DeviceId, Service
from common.tools.object_factory.ConfigRule import json_config_rule, json_config_rule_delete, json_config_rule_set
from common.tools.object_factory.Device import json_device_id
from common.type_checkers.Checkers import chk_type, chk_length
from service.service.service_handler_api._ServiceHandler import _ServiceHandler
from service.service.service_handler_api.AnyTreeTools import TreeNode, delete_subnode, get_subnode, set_subnode_value
from service.service.task_scheduler.TaskExecutor import TaskExecutor
LOGGER = logging.getLogger(__name__)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def create_rule_set(endpoint_a, endpoint_b):
return json_config_rule_set(
'table',
{
'table-name': 'IngressPipeImpl.l2_exact_table',
'match-fields': [
{
'match-field': 'standard_metadata.ingress_port',
'match-value': endpoint_a
}
],
'action-name': 'IngressPipeImpl.set_egress_port',
'action-params': [
{
'action-param': 'port',
'action-value': endpoint_b
}
]
}
)
def create_rule_del(endpoint_a, endpoint_b):
return json_config_rule_delete(
'table',
{
'table-name': 'IngressPipeImpl.l2_exact_table',
'match-fields': [
{
'match-field': 'standard_metadata.ingress_port',
'match-value': endpoint_a
}
],
'action-name': 'IngressPipeImpl.set_egress_port',
'action-params': [
{
'action-param': 'port',
'action-value': endpoint_b
}
]
}
)
class P4ServiceHandler(_ServiceHandler):
def __init__(self,
service: Service,
task_executor : TaskExecutor,
""" Initialize Driver.
Parameters:
service
The service instance (gRPC message) to be managed.
task_executor
An instance of Task Executor providing access to the
service handlers factory, the context and device clients,
and an internal cache of already-loaded gRPC entities.
**settings
Extra settings required by the service handler.
self.__service = service
self.__task_executor = task_executor # pylint: disable=unused-private-member
def SetEndpoint(
self, endpoints : List[Tuple[str, str, Optional[str]]],
connection_uuid : Optional[str] = None
) -> List[Union[bool, Exception]]:
""" Create/Update service endpoints form a list.
Parameters:
endpoints: List[Tuple[str, str, Optional[str]]]
List of tuples, each containing a device_uuid,
endpoint_uuid and, optionally, the topology_uuid
of the endpoint to be added.
connection_uuid : Optional[str]
If specified, is the UUID of the connection this endpoint is associated to.
Returns:
results: List[Union[bool, Exception]]
List of results for endpoint changes requested.
Return values must be in the same order as the requested
endpoints. If an endpoint is properly added, True must be
returned; otherwise, the Exception that is raised during
the processing must be returned.
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
chk_type('endpoints', endpoints, list)
if len(endpoints) == 0: return []
service_uuid = self.__service.service_id.service_uuid.uuid
history = {}
results = []
index = {}
i = 0
for endpoint in endpoints:
device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid by now
if device_uuid in history:
try:
matched_endpoint_uuid = history.pop(device_uuid)
device = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid)))
del device.device_config.config_rules[:]
# One way
rule = create_rule_set(matched_endpoint_uuid, endpoint_uuid)
device.device_config.config_rules.append(ConfigRule(**rule))
# The other way
rule = create_rule_set(endpoint_uuid, matched_endpoint_uuid)
device.device_config.config_rules.append(ConfigRule(**rule))
self.__task_executor.configure_device(device)
results.append(True)
results[index[device_uuid]] = True
except Exception as e:
LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint)))
results.append(e)
else:
history[device_uuid] = endpoint_uuid
index[device_uuid] = i
results.append(False)
i = i+1
return results
def DeleteEndpoint(
self, endpoints : List[Tuple[str, str, Optional[str]]],
connection_uuid : Optional[str] = None
) -> List[Union[bool, Exception]]:
""" Delete service endpoints form a list.
Parameters:
endpoints: List[Tuple[str, str, Optional[str]]]
List of tuples, each containing a device_uuid,
endpoint_uuid, and the topology_uuid of the endpoint
to be removed.
connection_uuid : Optional[str]
If specified, is the UUID of the connection this endpoint is associated to.
Returns:
results: List[Union[bool, Exception]]
List of results for endpoint deletions requested.
Return values must be in the same order as the requested
endpoints. If an endpoint is properly deleted, True must be
returned; otherwise, the Exception that is raised during
the processing must be returned.
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
chk_type('endpoints', endpoints, list)
if len(endpoints) == 0: return []
service_uuid = self.__service.service_id.service_uuid.uuid
history = {}
results = []
index = {}
i = 0
for endpoint in endpoints:
device_uuid, endpoint_uuid = endpoint[0:2] # ignore topology_uuid by now
if device_uuid in history:
try:
matched_endpoint_uuid = history.pop(device_uuid)
device = self.__task_executor.get_device(DeviceId(**json_device_id(device_uuid)))
del device.device_config.config_rules[:]
# One way
rule = create_rule_del(matched_endpoint_uuid, endpoint_uuid)
device.device_config.config_rules.append(ConfigRule(**rule))
# The other way
rule = create_rule_del(endpoint_uuid, matched_endpoint_uuid)
device.device_config.config_rules.append(ConfigRule(**rule))
self.__task_executor.configure_device(device)
results.append(True)
results[index[device_uuid]] = True
except Exception as e:
LOGGER.exception('Unable to SetEndpoint({:s})'.format(str(endpoint)))
results.append(e)
else:
history[device_uuid] = endpoint_uuid
index[device_uuid] = i
results.append(False)
i = i+1
return results
def SetConstraint(self, constraints: List[Tuple[str, Any]]) \
-> List[Union[bool, Exception]]:
""" Create/Update service constraints.
Parameters:
constraints: List[Tuple[str, Any]]
List of tuples, each containing a constraint_type and the
new constraint_value to be set.
Returns:
results: List[Union[bool, Exception]]
List of results for constraint changes requested.
Return values must be in the same order as the requested
constraints. If a constraint is properly set, True must be
returned; otherwise, the Exception that is raised during
the processing must be returned.
chk_type('constraints', constraints, list)
if len(constraints) == 0: return []
msg = '[SetConstraint] Method not implemented. Constraints({:s}) are being ignored.'
LOGGER.warning(msg.format(str(constraints)))
return [True for _ in range(len(constraints))]
def DeleteConstraint(self, constraints: List[Tuple[str, Any]]) \
-> List[Union[bool, Exception]]:
""" Delete service constraints.
Parameters:
constraints: List[Tuple[str, Any]]
List of tuples, each containing a constraint_type pointing
to the constraint to be deleted, and a constraint_value
containing possible additionally required values to locate
the constraint to be removed.
Returns:
results: List[Union[bool, Exception]]
List of results for constraint deletions requested.
Return values must be in the same order as the requested
constraints. If a constraint is properly deleted, True must
be returned; otherwise, the Exception that is raised during
the processing must be returned.
chk_type('constraints', constraints, list)
if len(constraints) == 0: return []
msg = '[DeleteConstraint] Method not implemented. Constraints({:s}) are being ignored.'
LOGGER.warning(msg.format(str(constraints)))
return [True for _ in range(len(constraints))]
def SetConfig(self, resources: List[Tuple[str, Any]]) \
-> List[Union[bool, Exception]]:
""" Create/Update configuration for a list of service resources.
Parameters:
resources: List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing to
the resource to be modified, and a resource_value
containing the new value to be set.
Returns:
results: List[Union[bool, Exception]]
List of results for resource key changes requested.
Return values must be in the same order as the requested
resource keys. If a resource is properly set, True must be
returned; otherwise, the Exception that is raised during
the processing must be returned.
chk_type('resources', resources, list)
if len(resources) == 0: return []
msg = '[SetConfig] Method not implemented. Resources({:s}) are being ignored.'
LOGGER.warning(msg.format(str(resources)))
return [True for _ in range(len(resources))]
def DeleteConfig(self, resources: List[Tuple[str, Any]]) \
-> List[Union[bool, Exception]]:
""" Delete configuration for a list of service resources.
Parameters:
resources: List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing to
the resource to be modified, and a resource_value containing
possible additionally required values to locate the value
to be removed.
Returns:
results: List[Union[bool, Exception]]
List of results for resource key deletions requested.
Return values must be in the same order as the requested
resource keys. If a resource is properly deleted, True must
be returned; otherwise, the Exception that is raised during
the processing must be returned.
chk_type('resources', resources, list)
if len(resources) == 0: return []
msg = '[SetConfig] Method not implemented. Resources({:s}) are being ignored.'
LOGGER.warning(msg.format(str(resources)))
return [True for _ in range(len(resources))]