diff --git a/deploy/crdb.sh b/deploy/crdb.sh index 3fcbb5cfaa1cbfcfdef14e372a871a7dd545887b..42b49fe984d08c8fb2cae14e68f0a6d2a7a726dd 100755 --- a/deploy/crdb.sh +++ b/deploy/crdb.sh @@ -175,6 +175,7 @@ function crdb_drop_databases_single() { --execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs" ) echo "Found TFS databases: ${DATABASES}" | tr '\n' ' ' + echo for DB_NAME in $DATABASES; do echo "Dropping TFS database: $DB_NAME" @@ -369,6 +370,7 @@ function crdb_drop_databases_cluster() { --execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs" ) echo "Found TFS databases: ${DATABASES}" | tr '\n' ' ' + echo for DB_NAME in $DATABASES; do echo "Dropping TFS database: $DB_NAME" diff --git a/deploy/tfs.sh b/deploy/tfs.sh index 65c1e8de28f2045b2ac78938b84d3c33e282025e..6c0ddcb63e71abd2e713e4809aeb7795b43053fa 100755 --- a/deploy/tfs.sh +++ b/deploy/tfs.sh @@ -344,11 +344,10 @@ for COMPONENT in $TFS_COMPONENTS; do VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-gateway:" "$MANIFEST" | cut -d ":" -f4) sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-gateway:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" else + VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4) if [ "$TFS_SKIP_BUILD" != "YES" ]; then IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') - VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4) else - VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4) IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$VERSION" | sed 's,//,/,g' | sed 's,http:/,,g') fi sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" diff --git a/manifests/analyticsservice.yaml b/manifests/analyticsservice.yaml index 61666ead951c73e4034110b00a51743d33bd4ce2..536bb185286ba5444ad22d17d00706a066172e4c 100644 --- a/manifests/analyticsservice.yaml +++ b/manifests/analyticsservice.yaml @@ -98,11 +98,11 @@ spec: selector: app: analyticsservice ports: - - name: frontend-grpc + - name: grpc protocol: TCP port: 30080 targetPort: 30080 - - name: backend-grpc + - name: grpc-backend protocol: TCP port: 30090 targetPort: 30090 diff --git a/manifests/telemetryservice.yaml b/manifests/telemetryservice.yaml index cd35d2698816bcfc5bc2030506eb2897a85708f6..86d864157838513dd68f10679d44d11b074c422c 100644 --- a/manifests/telemetryservice.yaml +++ b/manifests/telemetryservice.yaml @@ -98,11 +98,11 @@ spec: selector: app: telemetryservice ports: - - name: frontend-grpc + - name: grpc protocol: TCP port: 30050 targetPort: 30050 - - name: backend-grpc + - name: grpc-backend protocol: TCP port: 30060 targetPort: 30060 diff --git a/my_deploy.sh b/my_deploy.sh index 344ca44ee335e73dcc7b8f8c9ca71ead7d90880f..555173cf5ece4833bea8829ae7be593599b3dde2 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -20,7 +20,7 @@ export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" # Set the list of components, separated by spaces, you want to build images for, and deploy. -export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator" +export TFS_COMPONENTS="context device pathcomp service slice nbi webui" # Uncomment to activate Monitoring (old) #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" @@ -83,6 +83,9 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene # export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}" #fi +# Uncomment to activate Load Generator +#export TFS_COMPONENTS="${TFS_COMPONENTS} load_generator" + # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" diff --git a/scripts/show_logs_automation.sh b/scripts/show_logs_automation.sh index 8a0e417d9a7ddf1ffe0b4e4529606683ae600ecd..26684298091403f4dc737fc0d1ca5b05d82ad374 100755 --- a/scripts/show_logs_automation.sh +++ b/scripts/show_logs_automation.sh @@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} # Automated steps start here ######################################################################################################################## -kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/ztpservice +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/automationservice diff --git a/src/analytics/frontend/client/AnalyticsFrontendClient.py b/src/analytics/frontend/client/AnalyticsFrontendClient.py index 90e95d661d46f24ae5ffaeb7bcfa19b7e1f36526..809c957ea48a07a657fe1edc244c9c0f125e9058 100644 --- a/src/analytics/frontend/client/AnalyticsFrontendClient.py +++ b/src/analytics/frontend/client/AnalyticsFrontendClient.py @@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class AnalyticsFrontendClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND) - if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) + if not host: host = get_service_host(ServiceNameEnum.ANALYTICS) + if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICS) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None diff --git a/src/analytics/frontend/service/AnalyticsFrontendService.py b/src/analytics/frontend/service/AnalyticsFrontendService.py index 42a7fc9b60418c1c0fc5af6f320ae5c330ce8871..8d2536fe091459d6026941f4eae52f58f7cd3f3a 100644 --- a/src/analytics/frontend/service/AnalyticsFrontendService.py +++ b/src/analytics/frontend/service/AnalyticsFrontendService.py @@ -20,7 +20,7 @@ from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import Anal class AnalyticsFrontendService(GenericGrpcService): def __init__(self, cls_name: str = __name__): - port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) + port = get_service_port_grpc(ServiceNameEnum.ANALYTICS) super().__init__(port, cls_name=cls_name) self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl() diff --git a/src/analytics/frontend/tests/test_frontend.py b/src/analytics/frontend/tests/test_frontend.py index 48ab4dac5a5dfbdec688fc5c346f95d41e32c81c..74fef6c79cc2328b65671b392220ae86106e9d5d 100644 --- a/src/analytics/frontend/tests/test_frontend.py +++ b/src/analytics/frontend/tests/test_frontend.py @@ -41,9 +41,9 @@ from apscheduler.triggers.interval import IntervalTrigger LOCAL_HOST = '127.0.0.1' -ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)) -os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT) +ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICS)) +os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT) LOGGER = logging.getLogger(__name__) diff --git a/src/automation/service/EventEngine.py b/src/automation/service/EventEngine.py new file mode 100644 index 0000000000000000000000000000000000000000..26c2b28cbe35230beec90dd9df4112d4ad131876 --- /dev/null +++ b/src/automation/service/EventEngine.py @@ -0,0 +1,169 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json, logging, queue, threading +from typing import Dict, Optional +from automation.service.Tools import create_kpi_descriptor, start_collector +from common.proto.context_pb2 import ( + ConfigActionEnum, DeviceEvent, DeviceOperationalStatusEnum, Empty, ServiceEvent +) +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.tools.grpc.BaseEventCollector import BaseEventCollector +from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient + +LOGGER = logging.getLogger(__name__) + +DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED +DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED +DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED +DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED} + +KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN + +class EventCollector(BaseEventCollector): + pass + +class EventDispatcher(BaseEventDispatcher): + def __init__( + self, events_queue : queue.PriorityQueue, + terminate : Optional[threading.Event] = None + ) -> None: + super().__init__(events_queue, terminate) + self._context_client = ContextClient() + self._kpi_manager_client = KpiManagerClient() + self._telemetry_client = TelemetryFrontendClient() + self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict() + + def dispatch_device_create(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + self._device_activate_monitoring(device_event) + + def dispatch_device_update(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + self._device_activate_monitoring(device_event) + + def dispatch_device_remove(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_service_create(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + + def dispatch_service_update(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + + def dispatch_service_remove(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + + def _device_activate_monitoring(self, device_event : DeviceEvent) -> None: + device_id = device_event.device_id + device_uuid = device_id.device_uuid.uuid + device = self._context_client.GetDevice(device_id) + + device_op_status = device.device_operational_status + if device_op_status != DEVICE_OP_STATUS_ENABLED: + LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device))) + return + + enabled_endpoint_names = set() + for config_rule in device.device_config.config_rules: + if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue + if config_rule.WhichOneof('config_rule') != 'custom': continue + str_resource_key = str(config_rule.custom.resource_key) + if not str_resource_key.startswith('/interface['): continue + json_resource_value = json.loads(config_rule.custom.resource_value) + if 'name' not in json_resource_value: continue + if 'enabled' not in json_resource_value: continue + if not json_resource_value['enabled']: continue + enabled_endpoint_names.add(json_resource_value['name']) + + endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict()) + for endpoint in device.device_endpoints: + endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid + endpoint_name_or_uuid = endpoint.name + if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0: + endpoint_name_or_uuid = endpoint_uuid + + endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False) + endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names) + + if not endpoint_was_monitored and endpoint_is_enabled: + # activate + for kpi_sample_type in endpoint.kpi_sample_types: + if kpi_sample_type == KPISAMPLETYPE_UNKNOWN: continue + + kpi_id = create_kpi_descriptor( + self._kpi_manager_client, kpi_sample_type, + device_id=device.device_id, + endpoint_id=endpoint.endpoint_id, + ) + + duration_seconds = 86400 + interval_seconds = 10 + collector_id = start_collector( + self._telemetry_client, kpi_id, + duration_seconds, interval_seconds + ) + + endpoints_monitored[endpoint_uuid] = True + else: + MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \ + ' endpoint_was_monitored={:s} endpoint_is_enabled={:s}' + LOGGER.warning(MSG.format( + grpc_message_to_json_string(device_event), grpc_message_to_json_string(device), + grpc_message_to_json_string(endpoint), str(endpoint_was_monitored), + str(endpoint_is_enabled) + )) + +class EventEngine: + def __init__( + self, terminate : Optional[threading.Event] = None + ) -> None: + self._terminate = threading.Event() if terminate is None else terminate + + self._context_client = ContextClient() + self._event_collector = EventCollector(terminate=self._terminate) + self._event_collector.install_collector( + self._context_client.GetDeviceEvents, Empty(), + log_events_received=True + ) + self._event_collector.install_collector( + self._context_client.GetServiceEvents, Empty(), + log_events_received=True + ) + + self._event_dispatcher = EventDispatcher( + self._event_collector.get_events_queue(), + terminate=self._terminate + ) + + def start(self) -> None: + self._context_client.connect() + self._event_collector.start() + self._event_dispatcher.start() + + def stop(self) -> None: + self._terminate.set() + self._event_dispatcher.stop() + self._event_collector.stop() + self._context_client.close() diff --git a/src/automation/service/Tools.py b/src/automation/service/Tools.py new file mode 100644 index 0000000000000000000000000000000000000000..6a63475ca23c576a6fe946d6d149b70465ff1e1f --- /dev/null +++ b/src/automation/service/Tools.py @@ -0,0 +1,64 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging, uuid +from typing import Optional +from common.proto.context_pb2 import ConnectionId, DeviceId, EndPointId, LinkId, ServiceId, SliceId +from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId +from common.proto.kpi_sample_types_pb2 import KpiSampleType +from common.proto.telemetry_frontend_pb2 import Collector, CollectorId +from kpi_manager.client.KpiManagerClient import KpiManagerClient +from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient + +LOGGER = logging.getLogger(__name__) + +def create_kpi_descriptor( + kpi_manager_client : KpiManagerClient, + kpi_sample_type : KpiSampleType, + device_id : Optional[DeviceId ] = None, + endpoint_id : Optional[EndPointId ] = None, + service_id : Optional[ServiceId ] = None, + slice_id : Optional[SliceId ] = None, + connection_id : Optional[ConnectionId] = None, + link_id : Optional[LinkId ] = None, +) -> KpiId: + kpi_descriptor = KpiDescriptor() + kpi_descriptor.kpi_id.kpi_id.uuid = str(uuid.uuid4()) + kpi_descriptor.kpi_description = '' + kpi_descriptor.kpi_sample_type = kpi_sample_type + + if device_id is not None: kpi_descriptor.device_id .CopyFrom(device_id ) + if endpoint_id is not None: kpi_descriptor.endpoint_id .CopyFrom(endpoint_id ) + if service_id is not None: kpi_descriptor.service_id .CopyFrom(service_id ) + if slice_id is not None: kpi_descriptor.slice_id .CopyFrom(slice_id ) + if connection_id is not None: kpi_descriptor.connection_id.CopyFrom(connection_id) + if link_id is not None: kpi_descriptor.link_id .CopyFrom(link_id ) + + kpi_id : KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor) + return kpi_id + +def start_collector( + telemetry_client : TelemetryFrontendClient, + kpi_id : KpiId, + duration_seconds : float, + interval_seconds : float +) -> CollectorId: + collector = Collector() + collector.collector_id.collector_id.uuid = str(uuid.uuid4()) + collector.kpi_id.CopyFrom(kpi_id) + collector.duration_s = duration_seconds + collector.interval_s = interval_seconds + collector_id : CollectorId = telemetry_client.StartCollector(collector) + return collector_id diff --git a/src/automation/service/__main__.py b/src/automation/service/__main__.py index 39d8beaffd959744b83d7e1ace78da2b1b800a21..3baa0bd30b19fb624c5dcf0b236642704e42ab9f 100644 --- a/src/automation/service/__main__.py +++ b/src/automation/service/__main__.py @@ -14,7 +14,13 @@ import logging, signal, sys, threading from prometheus_client import start_http_server -from common.Settings import get_log_level, get_metrics_port +from automation.service.EventEngine import EventEngine +from common.Constants import ServiceNameEnum +from common.Settings import ( + ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC, + get_env_var_name, get_log_level, get_metrics_port, + wait_for_environment_variables +) from .AutomationService import AutomationService LOG_LEVEL = get_log_level() @@ -29,6 +35,22 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused def main(): LOGGER.info('Starting...') + + wait_for_environment_variables([ + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_HOST ), + get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_PORT_GRPC), + ]) + signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -36,7 +58,11 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - # Starting context service + # Start Event Collection+Dispatching Engine + event_engine = EventEngine(terminate=terminate) + event_engine.start() + + # Starting Automation service grpc_service = AutomationService() grpc_service.start() @@ -45,6 +71,7 @@ def main(): LOGGER.info('Terminating...') grpc_service.stop() + event_engine.stop() LOGGER.info('Bye') return 0 diff --git a/src/common/Constants.py b/src/common/Constants.py index 33a5d5047c8969118417a8b06a13cb8fb6fbf7df..bd4b9031fb1f33a5ad6c46e537961796bd3e2dae 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -66,9 +66,9 @@ class ServiceNameEnum(Enum): KPIMANAGER = 'kpi-manager' KPIVALUEAPI = 'kpi-value-api' KPIVALUEWRITER = 'kpi-value-writer' - TELEMETRYFRONTEND = 'telemetry-frontend' + TELEMETRY = 'telemetry' TELEMETRYBACKEND = 'telemetry-backend' - ANALYTICSFRONTEND = 'analytics-frontend' + ANALYTICS = 'analytics' ANALYTICSBACKEND = 'analytics-backend' QOSPROFILE = 'qos-profile' @@ -105,9 +105,9 @@ DEFAULT_SERVICE_GRPC_PORTS = { ServiceNameEnum.KPIMANAGER .value : 30010, ServiceNameEnum.KPIVALUEAPI .value : 30020, ServiceNameEnum.KPIVALUEWRITER .value : 30030, - ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, + ServiceNameEnum.TELEMETRY .value : 30050, ServiceNameEnum.TELEMETRYBACKEND .value : 30060, - ServiceNameEnum.ANALYTICSFRONTEND .value : 30080, + ServiceNameEnum.ANALYTICS .value : 30080, ServiceNameEnum.ANALYTICSBACKEND .value : 30090, ServiceNameEnum.AUTOMATION .value : 30200, diff --git a/src/common/tools/grpc/BaseEventCollector.py b/src/common/tools/grpc/BaseEventCollector.py new file mode 100644 index 0000000000000000000000000000000000000000..04dfb654963da1ae4f83a8a14feaaa8c17d1f128 --- /dev/null +++ b/src/common/tools/grpc/BaseEventCollector.py @@ -0,0 +1,136 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# See usage example below + +import grpc, logging, queue, threading, time +from typing import Any, Callable, List, Optional +from common.proto.context_pb2 import Empty +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +class CollectorThread(threading.Thread): + def __init__( + self, subscription_func : Callable, events_queue = queue.PriorityQueue, + terminate = threading.Event, log_events_received: bool = False + ) -> None: + super().__init__(daemon=False) + self._subscription_func = subscription_func + self._events_queue = events_queue + self._terminate = terminate + self._log_events_received = log_events_received + self._stream = None + + def cancel(self) -> None: + if self._stream is None: return + self._stream.cancel() + + def run(self) -> None: + while not self._terminate.is_set(): + self._stream = self._subscription_func() + try: + for event in self._stream: + if self._log_events_received: + str_event = grpc_message_to_json_string(event) + LOGGER.info('[_collect] event: {:s}'.format(str_event)) + timestamp = event.event.timestamp.timestamp + self._events_queue.put_nowait((timestamp, event)) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member + LOGGER.info('[_collect] UNAVAILABLE... retrying...') + time.sleep(0.5) + continue + elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member + break + else: + raise # pragma: no cover + +class BaseEventCollector: + def __init__( + self, terminate : Optional[threading.Event] = None + ) -> None: + self._events_queue = queue.PriorityQueue() + self._terminate = threading.Event() if terminate is None else terminate + self._collector_threads : List[CollectorThread] = list() + + def install_collector( + self, subscription_method : Callable, request_message : Any, + log_events_received : bool = False + ) -> None: + self._collector_threads.append(CollectorThread( + lambda: subscription_method(request_message), + self._events_queue, self._terminate, log_events_received + )) + + def start(self): + self._terminate.clear() + for collector_thread in self._collector_threads: + collector_thread.start() + + def stop(self): + self._terminate.set() + + for collector_thread in self._collector_threads: + collector_thread.cancel() + + for collector_thread in self._collector_threads: + collector_thread.join() + + def get_events_queue(self) -> queue.PriorityQueue: + return self._events_queue + + def get_event(self, block : bool = True, timeout : float = 0.1): + try: + _,event = self._events_queue.get(block=block, timeout=timeout) + return event + except queue.Empty: # pylint: disable=catching-non-exception + return None + + def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None): + events = [] + if count is None: + while not self._terminate.is_set(): + event = self.get_event(block=block, timeout=timeout) + if event is None: break + events.append(event) + else: + while len(events) < count: + if self._terminate.is_set(): break + event = self.get_event(block=block, timeout=timeout) + if event is None: continue + events.append(event) + return sorted(events, key=lambda e: e.event.timestamp.timestamp) + +def main() -> None: + logging.basicConfig(level=logging.INFO) + + context_client = ContextClient() + context_client.connect() + + event_collector = BaseEventCollector() + event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True) + event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True) + event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True) + event_collector.start() + + time.sleep(60) + + event_collector.stop() + context_client.close() + +if __name__ == '__main__': + main() diff --git a/src/common/tools/grpc/BaseEventDispatcher.py b/src/common/tools/grpc/BaseEventDispatcher.py new file mode 100644 index 0000000000000000000000000000000000000000..c9ec292c994bb4728043bf3bfed73e176f4f748a --- /dev/null +++ b/src/common/tools/grpc/BaseEventDispatcher.py @@ -0,0 +1,119 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# See usage example below + +import logging, queue, threading, time +from typing import Any, Callable, Optional +from common.proto.context_pb2 import DeviceEvent, Empty, EventTypeEnum, LinkEvent +from common.tools.grpc.BaseEventCollector import BaseEventCollector +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) + +class BaseEventDispatcher(threading.Thread): + def __init__( + self, events_queue : queue.PriorityQueue, + terminate : Optional[threading.Event] = None + ) -> None: + super().__init__(daemon=True) + self._events_queue = events_queue + self._terminate = threading.Event() if terminate is None else terminate + + def stop(self): + self._terminate.set() + + def _get_event(self, block : bool = True, timeout : Optional[float] = 0.5) -> Optional[Any]: + try: + _, event = self._events_queue.get(block=block, timeout=timeout) + return event + except queue.Empty: + return None + + def _get_dispatcher(self, event : Any) -> Optional[Callable]: + object_name = str(event.__class__.__name__).lower().replace('event', '') + event_type = EventTypeEnum.Name(event.event.event_type).lower().replace('eventtype_', '') + + method_name = 'dispatch_{:s}_{:s}'.format(object_name, event_type) + dispatcher = getattr(self, method_name, None) + if dispatcher is not None: return dispatcher + + method_name = 'dispatch_{:s}'.format(object_name) + dispatcher = getattr(self, method_name, None) + if dispatcher is not None: return dispatcher + + method_name = 'dispatch' + dispatcher = getattr(self, method_name, None) + if dispatcher is not None: return dispatcher + + return None + + def run(self) -> None: + while not self._terminate.is_set(): + event = self._get_event() + if event is None: continue + + dispatcher = self._get_dispatcher(event) + if dispatcher is None: + MSG = 'No dispatcher available for Event({:s})' + LOGGER.warning(MSG.format(grpc_message_to_json_string(event))) + continue + + dispatcher(event) + +class MyEventDispatcher(BaseEventDispatcher): + def dispatch_device_create(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_device_update(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_device_remove(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_link(self, link_event : LinkEvent) -> None: + MSG = 'Processing Link Create/Update/Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(link_event))) + + def dispatch(self, event : Any) -> None: + MSG = 'Processing any other Event: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(event))) + +def main() -> None: + logging.basicConfig(level=logging.INFO) + + context_client = ContextClient() + context_client.connect() + + event_collector = BaseEventCollector() + event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True) + event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True) + event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True) + event_collector.start() + + event_dispatcher = MyEventDispatcher(event_collector.get_events_queue()) + event_dispatcher.start() + + time.sleep(60) + + event_dispatcher.stop() + event_collector.stop() + context_client.close() + +if __name__ == '__main__': + main() diff --git a/src/common/tools/grpc/ExampleEventEngine.py b/src/common/tools/grpc/ExampleEventEngine.py new file mode 100644 index 0000000000000000000000000000000000000000..f27792497db09467c0225f07d036adc8c5b5ed84 --- /dev/null +++ b/src/common/tools/grpc/ExampleEventEngine.py @@ -0,0 +1,101 @@ +# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging, threading, time +from typing import Optional +from common.proto.context_pb2 import DeviceEvent, Empty, ServiceEvent +from common.tools.grpc.BaseEventCollector import BaseEventCollector +from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher +from common.tools.grpc.Tools import grpc_message_to_json_string +from context.client.ContextClient import ContextClient + +LOGGER = logging.getLogger(__name__) + +class EventCollector(BaseEventCollector): + pass + +class EventDispatcher(BaseEventDispatcher): + def dispatch_device_create(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_device_update(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_device_remove(self, device_event : DeviceEvent) -> None: + MSG = 'Processing Device Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(device_event))) + + def dispatch_service_create(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Create: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + + def dispatch_service_update(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Update: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + + def dispatch_service_remove(self, service_event : ServiceEvent) -> None: + MSG = 'Processing Service Remove: {:s}' + LOGGER.info(MSG.format(grpc_message_to_json_string(service_event))) + +class ExampleEventEngine: + def __init__( + self, terminate : Optional[threading.Event] = None + ) -> None: + self._terminate = threading.Event() if terminate is None else terminate + + self._context_client = ContextClient() + self._event_collector = EventCollector(terminate=self._terminate) + self._event_collector.install_collector( + self._context_client.GetDeviceEvents, Empty(), + log_events_received=True + ) + self._event_collector.install_collector( + self._context_client.GetLinkEvents, Empty(), + log_events_received=True + ) + self._event_collector.install_collector( + self._context_client.GetServiceEvents, Empty(), + log_events_received=True + ) + + self._event_dispatcher = EventDispatcher( + self._event_collector.get_events_queue(), + terminate=self._terminate + ) + + def start(self) -> None: + self._context_client.connect() + self._event_collector.start() + self._event_dispatcher.start() + + def stop(self) -> None: + self._terminate.set() + self._event_dispatcher.stop() + self._event_collector.stop() + self._context_client.close() + +def main() -> None: + logging.basicConfig(level=logging.INFO) + + event_engine = ExampleEventEngine() + event_engine.start() + + time.sleep(60) + + event_engine.stop() + +if __name__ == '__main__': + main() diff --git a/src/device/service/DeviceServiceServicerImpl.py b/src/device/service/DeviceServiceServicerImpl.py index ebbf19607a7c591f3414d0a9b276930a6b7b1c00..7546c225e67fd3122ec845b3154606eddb7cd9ff 100644 --- a/src/device/service/DeviceServiceServicerImpl.py +++ b/src/device/service/DeviceServiceServicerImpl.py @@ -251,8 +251,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): device_id = context_client.SetDevice(device) device = context_client.GetDevice(device_id) - if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED: - device.device_operational_status = request.device_operational_status + ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST) + environment_variables = set(os.environ.keys()) + if ztp_service_host in environment_variables: + # ZTP component is deployed; accept status updates + if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED: + device.device_operational_status = request.device_operational_status + else: + # ZTP is not deployed; activated during AddDevice and not modified + pass t4 = time.time() # TODO: use of datastores (might be virtual ones) to enable rollbacks diff --git a/src/nbi/Dockerfile b/src/nbi/Dockerfile index c5fc8d32491d7576e93534c75c264e79c37a36c6..ad111c2834586e0a949ea0396afb80124c7ed2d7 100644 --- a/src/nbi/Dockerfile +++ b/src/nbi/Dockerfile @@ -16,9 +16,24 @@ FROM python:3.9-slim # Install dependencies RUN apt-get --yes --quiet --quiet update && \ - apt-get --yes --quiet --quiet install wget g++ git && \ + apt-get --yes --quiet --quiet install wget g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \ rm -rf /var/lib/apt/lists/* +# Download, build and install libyang. Note that APT package is outdated +# - Ref: https://github.com/CESNET/libyang +# - Ref: https://github.com/CESNET/libyang-python/ +RUN mkdir -p /var/libyang +RUN git clone https://github.com/CESNET/libyang.git /var/libyang +WORKDIR /var/libyang +RUN git fetch +RUN git checkout v2.1.148 +RUN mkdir -p /var/libyang/build +WORKDIR /var/libyang/build +RUN cmake -D CMAKE_BUILD_TYPE:String="Release" .. +RUN make +RUN make install +RUN ldconfig + # Set Python to show logs as they occur ENV PYTHONUNBUFFERED=0 @@ -53,24 +68,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto RUN rm *.proto RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; -# Download, build and install libyang. Note that APT package is outdated -# - Ref: https://github.com/CESNET/libyang -# - Ref: https://github.com/CESNET/libyang-python/ -RUN apt-get --yes --quiet --quiet update && \ - apt-get --yes --quiet --quiet install build-essential cmake libpcre2-dev python3-dev python3-cffi && \ - rm -rf /var/lib/apt/lists/* -RUN mkdir -p /var/libyang -RUN git clone https://github.com/CESNET/libyang.git /var/libyang -WORKDIR /var/libyang -RUN git fetch -RUN git checkout v2.1.148 -RUN mkdir -p /var/libyang/build -WORKDIR /var/libyang/build -RUN cmake -D CMAKE_BUILD_TYPE:String="Release" .. -RUN make -RUN make install -RUN ldconfig - # Create component sub-folders, get specific Python packages RUN mkdir -p /var/teraflow/nbi WORKDIR /var/teraflow/nbi diff --git a/src/telemetry/backend/service/TelemetryBackendService.py b/src/telemetry/backend/service/TelemetryBackendService.py index 79a35d343860d19992518c0e8b29e427e5cbbef4..81ef24481cffc70c6b33bbfbf19d57b062729891 100755 --- a/src/telemetry/backend/service/TelemetryBackendService.py +++ b/src/telemetry/backend/service/TelemetryBackendService.py @@ -106,7 +106,7 @@ class TelemetryBackendService(GenericGrpcService): Method receives collector request and initiates collecter backend. """ # print("Initiating backend for collector: ", collector_id) - LOGGER.info("Initiating backend for collector: ", collector_id) + LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id))) start_time = time.time() while not stop_event.is_set(): if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend @@ -165,9 +165,9 @@ class TelemetryBackendService(GenericGrpcService): Args: err (KafkaError): Kafka error object. msg (Message): Kafka message object. """ - if err: - LOGGER.debug('Message delivery failed: {:}'.format(err)) + if err: + LOGGER.error('Message delivery failed: {:}'.format(err)) # print(f'Message delivery failed: {err}') - else: - LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) - # print(f'Message delivered to topic {msg.topic()}') + #else: + # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic())) + # # print(f'Message delivered to topic {msg.topic()}') diff --git a/src/telemetry/frontend/client/TelemetryFrontendClient.py b/src/telemetry/frontend/client/TelemetryFrontendClient.py index cd36ecd45933ad10758e408cf03c1bf834d27ba6..afcf241530a41f1f4ab1729379a4e5196c25d04f 100644 --- a/src/telemetry/frontend/client/TelemetryFrontendClient.py +++ b/src/telemetry/frontend/client/TelemetryFrontendClient.py @@ -29,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, class TelemetryFrontendClient: def __init__(self, host=None, port=None): - if not host: host = get_service_host(ServiceNameEnum.TELEMETRYFRONTEND) - if not port: port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) + if not host: host = get_service_host(ServiceNameEnum.TELEMETRY) + if not port: port = get_service_port_grpc(ServiceNameEnum.TELEMETRY) self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) self.channel = None diff --git a/src/telemetry/frontend/service/TelemetryFrontendService.py b/src/telemetry/frontend/service/TelemetryFrontendService.py index abd361aa0082e2de1d1f5fa7e81a336f3091af9a..49def20a1ce3cee1062d1e582fd8ec28308652b7 100644 --- a/src/telemetry/frontend/service/TelemetryFrontendService.py +++ b/src/telemetry/frontend/service/TelemetryFrontendService.py @@ -21,7 +21,7 @@ from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import Tele class TelemetryFrontendService(GenericGrpcService): def __init__(self, cls_name: str = __name__) -> None: - port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) + port = get_service_port_grpc(ServiceNameEnum.TELEMETRY) super().__init__(port, cls_name=cls_name) self.telemetry_frontend_servicer = TelemetryFrontendServiceServicerImpl() diff --git a/src/telemetry/frontend/tests/test_frontend.py b/src/telemetry/frontend/tests/test_frontend.py index c3f8091c83f56fd4a134ec092b1e22723040595d..988d76af0380302cd6351d46eccf6159bf1dc5ab 100644 --- a/src/telemetry/frontend/tests/test_frontend.py +++ b/src/telemetry/frontend/tests/test_frontend.py @@ -36,9 +36,9 @@ from telemetry.frontend.service.TelemetryFrontendServiceServicerImpl import Tele LOCAL_HOST = '127.0.0.1' -TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND)) -os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) -os.environ[get_env_var_name(ServiceNameEnum.TELEMETRYFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) +TELEMETRY_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.TELEMETRY)) +os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) +os.environ[get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(TELEMETRY_FRONTEND_PORT) LOGGER = logging.getLogger(__name__)