Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Lluis Gifre Renom
committed
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from typing import Any, List, Optional, Tuple, Union
from common.orm.Database import Database
from context.client.ContextClient import ContextClient
from device.client.DeviceClient import DeviceClient
from service.service.database.ServiceModel import ServiceModel
class _ServiceHandler:
def __init__(
self, db_service : ServiceModel, database : Database, context_client : ContextClient,
device_client : DeviceClient, **settings
) -> None:
""" Initialize Driver.
Parameters:
db_service
The service instance from the local in-memory database.
database
The instance of the local in-memory database.
context_client
An instance of context client to be used to retrieve information from the service and the devices.
device_client
An instance of device client to be used to configure the devices.
**settings
Extra settings required by the service handler.
"""
raise NotImplementedError()
def SetEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]:
""" Set endpoints from a list.
Parameters:
endpoints : List[Tuple[str, str, Optional[str]]]
List of tuples, each containing a device_uuid, endpoint_uuid and, optionally, the topology_uuid
of the endpoint to be added.
Returns:
results : List[Union[bool, Exception]]
List of results for endpoint changes requested. Return values must be in the same order than
endpoints requested. If an endpoint is properly added, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()
def DeleteEndpoint(self, endpoints : List[Tuple[str, str, Optional[str]]]) -> List[Union[bool, Exception]]:
""" Delete endpoints form a list.
Parameters:
endpoints : List[Tuple[str, str, Optional[str]]]
List of tuples, each containing a device_uuid, endpoint_uuid, and the topology_uuid of the endpoint
to be removed.
Returns:
results : List[Union[bool, Exception]]
List of results for endpoint deletions requested. Return values must be in the same order than
endpoints requested. If an endpoint is properly deleted, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()
def SetConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
""" Create/Update constraints.
Parameters:
constraints : List[Tuple[str, Any]]
List of tuples, each containing a constraint_type and the new constraint_value to be set.
Returns:
results : List[Union[bool, Exception]]
List of results for constraint changes requested. Return values must be in the same order than
constraints requested. If a constraint is properly set, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()
def DeleteConstraint(self, constraints : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
""" Delete constraints.
Parameters:
constraints : List[Tuple[str, Any]]
List of tuples, each containing a constraint_type pointing to the constraint to be deleted, and a
constraint_value containing possible additionally required values to locate the constraint to be
removed.
Returns:
results : List[Union[bool, Exception]]
List of results for constraint deletions requested. Return values must be in the same order than
constraints requested. If a constraint is properly deleted, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()
def SetConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
""" Create/Update configuration for a list of resources.
Parameters:
resources : List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing the resource to be modified, and a
resource_value containing the new value to be set.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key changes requested. Return values must be in the same order than
resource keys requested. If a resource is properly set, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()
def DeleteConfig(self, resources : List[Tuple[str, Any]]) -> List[Union[bool, Exception]]:
""" Delete configuration for a list of resources.
Parameters:
resources : List[Tuple[str, Any]]
List of tuples, each containing a resource_key pointing the resource to be modified, and a
resource_value containing possible additionally required values to locate the value to be removed.
Returns:
results : List[Union[bool, Exception]]
List of results for resource key deletions requested. Return values must be in the same order than
resource keys requested. If a resource is properly deleted, True must be retrieved; otherwise, the
Exception that is raised during the processing must be retrieved.
"""
raise NotImplementedError()