Skip to content
Snippets Groups Projects
Commit 416038a7 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

DLT:

- Extended DltGatewayClient to support configuration through env vars
- Added DltEventsCollector to DltConnector
- Implemented RecordDevice in DltConnectorServicer
parent 2444817c
No related branches found
No related tags found
2 merge requests!54Release 2.0.0,!24Integrate NFV-SDN'22 demo
...@@ -30,8 +30,10 @@ DEFAULT_HTTP_BIND_ADDRESS = '0.0.0.0' ...@@ -30,8 +30,10 @@ DEFAULT_HTTP_BIND_ADDRESS = '0.0.0.0'
DEFAULT_METRICS_PORT = 9192 DEFAULT_METRICS_PORT = 9192
# Default context and topology UUIDs # Default context and topology UUIDs
DEFAULT_CONTEXT_UUID = 'admin' DEFAULT_CONTEXT_UUID = 'admin'
DEFAULT_TOPOLOGY_UUID = 'admin' DEFAULT_TOPOLOGY_UUID = 'admin' # contains the detailed local topology
DOMAINS_TOPOLOGY_UUID = 'domains' # contains the abstracted domains (abstracted local + abstracted remotes)
AGGREGATED_TOPOLOGY_UUID = 'aggregated' # contains the aggregated view (detailed local + abstracted remotes)
# Default service names # Default service names
class ServiceNameEnum(Enum): class ServiceNameEnum(Enum):
...@@ -50,7 +52,7 @@ class ServiceNameEnum(Enum): ...@@ -50,7 +52,7 @@ class ServiceNameEnum(Enum):
WEBUI = 'webui' WEBUI = 'webui'
# Used for test and debugging only # Used for test and debugging only
DLT_GATEWAY = 'dlt-gateway' DLT_GATEWAY = 'dltgateway'
# Default gRPC service ports # Default gRPC service ports
DEFAULT_SERVICE_GRPC_PORTS = { DEFAULT_SERVICE_GRPC_PORTS = {
......
...@@ -11,3 +11,14 @@ ...@@ -11,3 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
DEFAULT_DLT_GATEWAY_HOST = '127.0.0.1'
DEFAULT_DLT_GATEWAY_PORT = '50051'
# Find IP:port of gateway container as follows:
# - first check env vars DLT_GATEWAY_HOST & DLT_GATEWAY_PORT
# - if not set, use DEFAULT_DLT_GATEWAY_HOST & DEFAULT_DLT_GATEWAY_PORT
DLT_GATEWAY_HOST = str(os.environ.get('DLT_GATEWAY_HOST', DEFAULT_DLT_GATEWAY_HOST))
DLT_GATEWAY_PORT = int(os.environ.get('DLT_GATEWAY_PORT', DEFAULT_DLT_GATEWAY_PORT))
...@@ -14,14 +14,13 @@ ...@@ -14,14 +14,13 @@
from typing import Iterator from typing import Iterator
import grpc, logging import grpc, logging
from common.Constants import ServiceNameEnum
from common.Settings import get_service_host, get_service_port_grpc
from common.proto.context_pb2 import Empty, TeraFlowController from common.proto.context_pb2 import Empty, TeraFlowController
from common.proto.dlt_gateway_pb2 import ( from common.proto.dlt_gateway_pb2 import (
DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription) DltPeerStatus, DltPeerStatusList, DltRecord, DltRecordEvent, DltRecordId, DltRecordStatus, DltRecordSubscription)
from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub from common.proto.dlt_gateway_pb2_grpc import DltGatewayServiceStub
from common.tools.client.RetryDecorator import retry, delay_exponential from common.tools.client.RetryDecorator import retry, delay_exponential
from common.tools.grpc.Tools import grpc_message_to_json_string from common.tools.grpc.Tools import grpc_message_to_json_string
from dlt.connector.Config import DLT_GATEWAY_HOST, DLT_GATEWAY_PORT
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
MAX_RETRIES = 15 MAX_RETRIES = 15
...@@ -30,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, ...@@ -30,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class DltGatewayClient: class DltGatewayClient:
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.DLT) if not host: host = DLT_GATEWAY_HOST
if not port: port = get_service_port_grpc(ServiceNameEnum.DLT) if not port: port = DLT_GATEWAY_PORT
self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint)) LOGGER.debug('Creating channel to {:s}...'.format(self.endpoint))
self.channel = None self.channel = None
......
...@@ -12,10 +12,16 @@ ...@@ -12,10 +12,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import grpc, logging import copy, grpc, logging
from typing import Optional
from common.Constants import DEFAULT_CONTEXT_UUID
from common.proto.dlt_gateway_pb2 import DltRecord, DltRecordId, DltRecordOperationEnum, DltRecordTypeEnum
from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId from common.proto.context_pb2 import DeviceId, Empty, ServiceId, SliceId
from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer from common.proto.dlt_connector_pb2_grpc import DltConnectorServiceServicer
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from dlt.connector.client.DltGatewayClient import DltGatewayClient
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
...@@ -31,8 +37,26 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES) ...@@ -31,8 +37,26 @@ METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
def __init__(self): def __init__(self):
LOGGER.debug('Creating Servicer...') LOGGER.debug('Creating Servicer...')
self._own_domain_uuid : Optional[str] = None
LOGGER.debug('Servicer Created') LOGGER.debug('Servicer Created')
@property
def own_domain_id(self) -> str:
if self._own_domain_uuid is not None: return self._own_domain_uuid
context_client = ContextClient()
existing_context_ids = context_client.ListContextIds(Empty())
existing_context_uuids = {context_id.context_uuid.uuid for context_id in existing_context_ids.context_ids}
# Detect local context name (will be used as abstracted device name); exclude DEFAULT_CONTEXT_UUID
existing_non_admin_context_uuids = copy.deepcopy(existing_context_uuids)
existing_non_admin_context_uuids.discard(DEFAULT_CONTEXT_UUID)
if len(existing_non_admin_context_uuids) != 1:
MSG = 'Unable to identify own domain name. Existing Contexts({:s})'
raise Exception(MSG.format(str(existing_context_uuids)))
self._own_domain_uuid = existing_non_admin_context_uuids.pop()
return self._own_domain_uuid
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordAll(self, request : Empty, context : grpc.ServicerContext) -> Empty: def RecordAll(self, request : Empty, context : grpc.ServicerContext) -> Empty:
return Empty() return Empty()
...@@ -43,6 +67,33 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer): ...@@ -43,6 +67,33 @@ class DltConnectorServiceServicerImpl(DltConnectorServiceServicer):
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
def RecordDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty: def RecordDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
context_client = ContextClient()
device = context_client.GetDevice(request)
dltgateway_client = DltGatewayClient()
dlt_record_id = DltRecordId()
dlt_record_id.domain_uuid.uuid = self.own_domain_id
dlt_record_id.type = DltRecordTypeEnum.DLTRECORDTYPE_DEVICE
dlt_record_id.record_uuid.uuid = device.device_id.device_uuid.uuid
LOGGER.info('[RecordDevice] sent dlt_record_id = {:s}'.format(grpc_message_to_json_string(dlt_record_id)))
dlt_record = dltgateway_client.GetFromDlt(dlt_record_id)
LOGGER.info('[RecordDevice] recv dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))
exists = False
dlt_record = DltRecord()
dlt_record.record_id.CopyFrom(dlt_record_id)
dlt_record.operation = \
DltRecordOperationEnum.DLTRECORDOPERATION_UPDATE \
if exists else \
DltRecordOperationEnum.DLTRECORDOPERATION_ADD
dlt_record.data_json = grpc_message_to_json_string(device)
LOGGER.info('[RecordDevice] sent dlt_record = {:s}'.format(grpc_message_to_json_string(dlt_record)))
dlt_record_status = dltgateway_client.RecordToDlt(dlt_record)
LOGGER.info('[RecordDevice] recv dlt_record_status = {:s}'.format(grpc_message_to_json_string(dlt_record_status)))
return Empty() return Empty()
@safe_and_metered_rpc_method(METRICS, LOGGER) @safe_and_metered_rpc_method(METRICS, LOGGER)
......
...@@ -18,6 +18,8 @@ from common.Constants import ServiceNameEnum ...@@ -18,6 +18,8 @@ from common.Constants import ServiceNameEnum
from common.Settings import ( from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port, ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables) wait_for_environment_variables)
from dlt.connector.client.DltEventsCollector import DltEventsCollector
from dlt.connector.client.DltGatewayClient import DltGatewayClient
from .DltConnectorService import DltConnectorService from .DltConnectorService import DltConnectorService
terminate = threading.Event() terminate = threading.Event()
...@@ -48,6 +50,10 @@ def main(): ...@@ -48,6 +50,10 @@ def main():
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
dlt_gateway_client = DltGatewayClient()
dlt_events_collector = DltEventsCollector(dlt_gateway_client, log_events_received=True)
dlt_events_collector.start()
# Starting DLT connector service # Starting DLT connector service
grpc_service = DltConnectorService() grpc_service = DltConnectorService()
grpc_service.start() grpc_service.start()
...@@ -56,6 +62,7 @@ def main(): ...@@ -56,6 +62,7 @@ def main():
while not terminate.wait(timeout=0.1): pass while not terminate.wait(timeout=0.1): pass
LOGGER.info('Terminating...') LOGGER.info('Terminating...')
dlt_events_collector.stop()
grpc_service.stop() grpc_service.stop()
LOGGER.info('Bye') LOGGER.info('Bye')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment