Skip to content
Snippets Groups Projects
Commit 7196288b authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Automation:

- Added new EventEngine to activate automatically monitoring of device endpoints
parent 58d5f619
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!282Resolve "(CTTC) Auto-start telemetry collection when a device endpoint is activated"
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, threading
from typing import Dict, Optional
from automation.service.Tools import create_kpi_descriptor, start_collector
from common.proto.context_pb2 import (
ConfigActionEnum, DeviceEvent, DeviceOperationalStatusEnum, Empty, ServiceEvent
)
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
class EventCollector(BaseEventCollector):
pass
class EventDispatcher(BaseEventDispatcher):
def __init__(
self, events_queue : queue.PriorityQueue,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(events_queue, terminate)
self._context_client = ContextClient()
self._kpi_manager_client = KpiManagerClient()
self._telemetry_client = TelemetryFrontendClient()
self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_service_create(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_update(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_remove(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def _device_activate_monitoring(self, device_event : DeviceEvent) -> None:
device_id = device_event.device_id
device_uuid = device_id.device_uuid.uuid
device = self._context_client.GetDevice(device_id)
device_op_status = device.device_operational_status
if device_op_status != DEVICE_OP_STATUS_ENABLED:
LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device)))
return
enabled_endpoint_names = set()
for config_rule in device.device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
str_resource_key = str(config_rule.custom.resource_key)
if not str_resource_key.startswith('/interface['): continue
json_resource_value = json.loads(config_rule.custom.resource_value)
if 'name' not in json_resource_value: continue
if 'enabled' not in json_resource_value: continue
if not json_resource_value['enabled']: continue
enabled_endpoint_names.add(json_resource_value['name'])
endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_name_or_uuid = endpoint.name
if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
endpoint_name_or_uuid = endpoint_uuid
endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False)
endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names)
if not endpoint_was_monitored and endpoint_is_enabled:
# activate
for kpi_sample_type in endpoint.kpi_sample_types:
if kpi_sample_type == KPISAMPLETYPE_UNKNOWN: continue
kpi_id = create_kpi_descriptor(
self._kpi_manager_client, kpi_sample_type,
device_id=device.device_id,
endpoint_id=endpoint.endpoint_id,
)
duration_seconds = 86400
interval_seconds = 10
collector_id = start_collector(
self._telemetry_client, kpi_id,
duration_seconds, interval_seconds
)
endpoints_monitored[endpoint_uuid] = True
else:
MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \
' endpoint_was_monitored={:s} endpoint_is_enabled={:s}'
LOGGER.warning(MSG.format(
grpc_message_to_json_string(device_event), grpc_message_to_json_string(device),
grpc_message_to_json_string(endpoint), str(endpoint_was_monitored),
str(endpoint_is_enabled)
))
class EventEngine:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._terminate = threading.Event() if terminate is None else terminate
self._context_client = ContextClient()
self._event_collector = EventCollector(terminate=self._terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetServiceEvents, Empty(),
log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(),
terminate=self._terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_collector.start()
self._event_dispatcher.start()
def stop(self) -> None:
self._terminate.set()
self._event_dispatcher.stop()
self._event_collector.stop()
self._context_client.close()
import logging, uuid
from typing import Optional
from common.proto.context_pb2 import ConnectionId, DeviceId, EndPointId, LinkId, ServiceId, SliceId
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
def create_kpi_descriptor(
kpi_manager_client : KpiManagerClient,
kpi_sample_type : KpiSampleType,
device_id : Optional[DeviceId ] = None,
endpoint_id : Optional[EndPointId ] = None,
service_id : Optional[ServiceId ] = None,
slice_id : Optional[SliceId ] = None,
connection_id : Optional[ConnectionId] = None,
link_id : Optional[LinkId ] = None,
) -> KpiId:
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = str(uuid.uuid4())
kpi_descriptor.kpi_description = ''
kpi_descriptor.kpi_sample_type = kpi_sample_type
if device_id is not None: kpi_descriptor.device_id .CopyFrom(device_id )
if endpoint_id is not None: kpi_descriptor.endpoint_id .CopyFrom(endpoint_id )
if service_id is not None: kpi_descriptor.service_id .CopyFrom(service_id )
if slice_id is not None: kpi_descriptor.slice_id .CopyFrom(slice_id )
if connection_id is not None: kpi_descriptor.connection_id.CopyFrom(connection_id)
if link_id is not None: kpi_descriptor.link_id .CopyFrom(link_id )
kpi_id : KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
return kpi_id
def start_collector(
telemetry_client : TelemetryFrontendClient,
kpi_id : KpiId,
duration_seconds : float,
interval_seconds : float
) -> CollectorId:
collector = Collector()
collector.collector_id.collector_id.uuid = str(uuid.uuid4())
collector.kpi_id.CopyFrom(kpi_id)
collector.duration_s = duration_seconds
collector.interval_s = interval_seconds
collector_id : CollectorId = telemetry_client.StartCollector(collector)
return collector_id
......@@ -14,7 +14,13 @@
import logging, signal, sys, threading
from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port
from automation.service.EventEngine import EventEngine
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .AutomationService import AutomationService
LOG_LEVEL = get_log_level()
......@@ -29,6 +35,22 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused
def main():
LOGGER.info('Starting...')
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
......@@ -36,7 +58,11 @@ def main():
metrics_port = get_metrics_port()
start_http_server(metrics_port)
# Starting context service
# Start Event Collection+Dispatching Engine
event_engine = EventEngine(terminate=terminate)
event_engine.start()
# Starting Automation service
grpc_service = AutomationService()
grpc_service.start()
......@@ -45,6 +71,7 @@ def main():
LOGGER.info('Terminating...')
grpc_service.stop()
event_engine.stop()
LOGGER.info('Bye')
return 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment