Skip to content
Snippets Groups Projects
Commit aa9b3c25 authored by Carlos Manso's avatar Carlos Manso
Browse files

Merge branch 'develop' of https://labs.etsi.org/rep/tfs/controller into...

Merge branch 'develop' of https://labs.etsi.org/rep/tfs/controller into feat/147-integrate-support-for-ip-e2e-optical-sdn-controllers-to-manage-hierarchical-virtual
parents 974e5f74 89020c87
No related branches found
No related tags found
2 merge requests!294Release TeraFlowSDN 4.0,!225Resolve "Integrate Support for IP-E2E-Optical SDN controllers to manage hierarchical virtual topologies"
Showing
with 673 additions and 49 deletions
...@@ -175,6 +175,7 @@ function crdb_drop_databases_single() { ...@@ -175,6 +175,7 @@ function crdb_drop_databases_single() {
--execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs" --execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs"
) )
echo "Found TFS databases: ${DATABASES}" | tr '\n' ' ' echo "Found TFS databases: ${DATABASES}" | tr '\n' ' '
echo
for DB_NAME in $DATABASES; do for DB_NAME in $DATABASES; do
echo "Dropping TFS database: $DB_NAME" echo "Dropping TFS database: $DB_NAME"
...@@ -369,6 +370,7 @@ function crdb_drop_databases_cluster() { ...@@ -369,6 +370,7 @@ function crdb_drop_databases_cluster() {
--execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs" --execute "SHOW DATABASES;" --format=tsv | awk '{print $1}' | grep "^tfs"
) )
echo "Found TFS databases: ${DATABASES}" | tr '\n' ' ' echo "Found TFS databases: ${DATABASES}" | tr '\n' ' '
echo
for DB_NAME in $DATABASES; do for DB_NAME in $DATABASES; do
echo "Dropping TFS database: $DB_NAME" echo "Dropping TFS database: $DB_NAME"
......
...@@ -344,11 +344,10 @@ for COMPONENT in $TFS_COMPONENTS; do ...@@ -344,11 +344,10 @@ for COMPONENT in $TFS_COMPONENTS; do
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-gateway:" "$MANIFEST" | cut -d ":" -f4) VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}-gateway:" "$MANIFEST" | cut -d ":" -f4)
sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-gateway:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT-gateway:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
else else
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4)
if [ "$TFS_SKIP_BUILD" != "YES" ]; then if [ "$TFS_SKIP_BUILD" != "YES" ]; then
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g') IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$TFS_IMAGE_TAG" | sed 's,//,/,g' | sed 's,http:/,,g')
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4)
else else
VERSION=$(grep -i "${GITLAB_REPO_URL}/${COMPONENT}:" "$MANIFEST" | cut -d ":" -f4)
IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$VERSION" | sed 's,//,/,g' | sed 's,http:/,,g') IMAGE_URL=$(echo "$TFS_REGISTRY_IMAGES/$COMPONENT:$VERSION" | sed 's,//,/,g' | sed 's,http:/,,g')
fi fi
sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST" sed -E -i "s#image: $GITLAB_REPO_URL/$COMPONENT:${VERSION}#image: $IMAGE_URL#g" "$MANIFEST"
......
...@@ -98,11 +98,11 @@ spec: ...@@ -98,11 +98,11 @@ spec:
selector: selector:
app: analyticsservice app: analyticsservice
ports: ports:
- name: frontend-grpc - name: grpc
protocol: TCP protocol: TCP
port: 30080 port: 30080
targetPort: 30080 targetPort: 30080
- name: backend-grpc - name: grpc-backend
protocol: TCP protocol: TCP
port: 30090 port: 30090
targetPort: 30090 targetPort: 30090
......
...@@ -98,11 +98,11 @@ spec: ...@@ -98,11 +98,11 @@ spec:
selector: selector:
app: telemetryservice app: telemetryservice
ports: ports:
- name: frontend-grpc - name: grpc
protocol: TCP protocol: TCP
port: 30050 port: 30050
targetPort: 30050 targetPort: 30050
- name: backend-grpc - name: grpc-backend
protocol: TCP protocol: TCP
port: 30060 port: 30060
targetPort: 30060 targetPort: 30060
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/" export TFS_REGISTRY_IMAGES="http://localhost:32000/tfs/"
# Set the list of components, separated by spaces, you want to build images for, and deploy. # Set the list of components, separated by spaces, you want to build images for, and deploy.
export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_generator" export TFS_COMPONENTS="context device pathcomp service slice nbi webui"
# Uncomment to activate Monitoring (old) # Uncomment to activate Monitoring (old)
#export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring" #export TFS_COMPONENTS="${TFS_COMPONENTS} monitoring"
...@@ -86,6 +86,9 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene ...@@ -86,6 +86,9 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene
# export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}" # export TFS_COMPONENTS="${BEFORE} qkd_app service ${AFTER}"
#fi #fi
# Uncomment to activate Load Generator
#export TFS_COMPONENTS="${TFS_COMPONENTS} load_generator"
# Set the tag you want to use for your images. # Set the tag you want to use for your images.
export TFS_IMAGE_TAG="dev" export TFS_IMAGE_TAG="dev"
......
...@@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} ...@@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"}
# Automated steps start here # Automated steps start here
######################################################################################################################## ########################################################################################################################
kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/ztpservice kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/automationservice
...@@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, ...@@ -28,8 +28,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class AnalyticsFrontendClient: class AnalyticsFrontendClient:
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.ANALYTICSFRONTEND) if not host: host = get_service_host(ServiceNameEnum.ANALYTICS)
if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) if not port: port = get_service_port_grpc(ServiceNameEnum.ANALYTICS)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None self.channel = None
......
...@@ -20,7 +20,7 @@ from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import Anal ...@@ -20,7 +20,7 @@ from analytics.frontend.service.AnalyticsFrontendServiceServicerImpl import Anal
class AnalyticsFrontendService(GenericGrpcService): class AnalyticsFrontendService(GenericGrpcService):
def __init__(self, cls_name: str = __name__): def __init__(self, cls_name: str = __name__):
port = get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND) port = get_service_port_grpc(ServiceNameEnum.ANALYTICS)
super().__init__(port, cls_name=cls_name) super().__init__(port, cls_name=cls_name)
self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl() self.analytics_frontend_servicer = AnalyticsFrontendServiceServicerImpl()
......
...@@ -41,9 +41,9 @@ from apscheduler.triggers.interval import IntervalTrigger ...@@ -41,9 +41,9 @@ from apscheduler.triggers.interval import IntervalTrigger
LOCAL_HOST = '127.0.0.1' LOCAL_HOST = '127.0.0.1'
ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICSFRONTEND)) ANALYTICS_FRONTEND_PORT = str(get_service_port_grpc(ServiceNameEnum.ANALYTICS))
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST) os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ANALYTICSFRONTEND, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT) os.environ[get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC)] = str(ANALYTICS_FRONTEND_PORT)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging, queue, threading
from typing import Dict, Optional
from automation.service.Tools import create_kpi_descriptor, start_collector
from common.proto.context_pb2 import (
ConfigActionEnum, DeviceEvent, DeviceOperationalStatusEnum, Empty, ServiceEvent
)
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
DEVICE_OP_STATUS_UNDEFINED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED
DEVICE_OP_STATUS_DISABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_DISABLED
DEVICE_OP_STATUS_ENABLED = DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_ENABLED
DEVICE_OP_STATUS_NOT_ENABLED = {DEVICE_OP_STATUS_UNDEFINED, DEVICE_OP_STATUS_DISABLED}
KPISAMPLETYPE_UNKNOWN = KpiSampleType.KPISAMPLETYPE_UNKNOWN
class EventCollector(BaseEventCollector):
pass
class EventDispatcher(BaseEventDispatcher):
def __init__(
self, events_queue : queue.PriorityQueue,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(events_queue, terminate)
self._context_client = ContextClient()
self._kpi_manager_client = KpiManagerClient()
self._telemetry_client = TelemetryFrontendClient()
self._device_endpoint_monitored : Dict[str, Dict[str, bool]] = dict()
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
self._device_activate_monitoring(device_event)
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_service_create(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_update(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_remove(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def _device_activate_monitoring(self, device_event : DeviceEvent) -> None:
device_id = device_event.device_id
device_uuid = device_id.device_uuid.uuid
device = self._context_client.GetDevice(device_id)
device_op_status = device.device_operational_status
if device_op_status != DEVICE_OP_STATUS_ENABLED:
LOGGER.debug('Ignoring Device not enabled: {:s}'.format(grpc_message_to_json_string(device)))
return
enabled_endpoint_names = set()
for config_rule in device.device_config.config_rules:
if config_rule.action != ConfigActionEnum.CONFIGACTION_SET: continue
if config_rule.WhichOneof('config_rule') != 'custom': continue
str_resource_key = str(config_rule.custom.resource_key)
if not str_resource_key.startswith('/interface['): continue
json_resource_value = json.loads(config_rule.custom.resource_value)
if 'name' not in json_resource_value: continue
if 'enabled' not in json_resource_value: continue
if not json_resource_value['enabled']: continue
enabled_endpoint_names.add(json_resource_value['name'])
endpoints_monitored = self._device_endpoint_monitored.setdefault(device_uuid, dict())
for endpoint in device.device_endpoints:
endpoint_uuid = endpoint.endpoint_id.endpoint_uuid.uuid
endpoint_name_or_uuid = endpoint.name
if endpoint_name_or_uuid is None or len(endpoint_name_or_uuid) == 0:
endpoint_name_or_uuid = endpoint_uuid
endpoint_was_monitored = endpoints_monitored.get(endpoint_uuid, False)
endpoint_is_enabled = (endpoint_name_or_uuid in enabled_endpoint_names)
if not endpoint_was_monitored and endpoint_is_enabled:
# activate
for kpi_sample_type in endpoint.kpi_sample_types:
if kpi_sample_type == KPISAMPLETYPE_UNKNOWN: continue
kpi_id = create_kpi_descriptor(
self._kpi_manager_client, kpi_sample_type,
device_id=device.device_id,
endpoint_id=endpoint.endpoint_id,
)
duration_seconds = 86400
interval_seconds = 10
collector_id = start_collector(
self._telemetry_client, kpi_id,
duration_seconds, interval_seconds
)
endpoints_monitored[endpoint_uuid] = True
else:
MSG = 'Not implemented condition: event={:s} device={:s} endpoint={:s}' + \
' endpoint_was_monitored={:s} endpoint_is_enabled={:s}'
LOGGER.warning(MSG.format(
grpc_message_to_json_string(device_event), grpc_message_to_json_string(device),
grpc_message_to_json_string(endpoint), str(endpoint_was_monitored),
str(endpoint_is_enabled)
))
class EventEngine:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._terminate = threading.Event() if terminate is None else terminate
self._context_client = ContextClient()
self._event_collector = EventCollector(terminate=self._terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetServiceEvents, Empty(),
log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(),
terminate=self._terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_collector.start()
self._event_dispatcher.start()
def stop(self) -> None:
self._terminate.set()
self._event_dispatcher.stop()
self._event_collector.stop()
self._context_client.close()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, uuid
from typing import Optional
from common.proto.context_pb2 import ConnectionId, DeviceId, EndPointId, LinkId, ServiceId, SliceId
from common.proto.kpi_manager_pb2 import KpiDescriptor, KpiId
from common.proto.kpi_sample_types_pb2 import KpiSampleType
from common.proto.telemetry_frontend_pb2 import Collector, CollectorId
from kpi_manager.client.KpiManagerClient import KpiManagerClient
from telemetry.frontend.client.TelemetryFrontendClient import TelemetryFrontendClient
LOGGER = logging.getLogger(__name__)
def create_kpi_descriptor(
kpi_manager_client : KpiManagerClient,
kpi_sample_type : KpiSampleType,
device_id : Optional[DeviceId ] = None,
endpoint_id : Optional[EndPointId ] = None,
service_id : Optional[ServiceId ] = None,
slice_id : Optional[SliceId ] = None,
connection_id : Optional[ConnectionId] = None,
link_id : Optional[LinkId ] = None,
) -> KpiId:
kpi_descriptor = KpiDescriptor()
kpi_descriptor.kpi_id.kpi_id.uuid = str(uuid.uuid4())
kpi_descriptor.kpi_description = ''
kpi_descriptor.kpi_sample_type = kpi_sample_type
if device_id is not None: kpi_descriptor.device_id .CopyFrom(device_id )
if endpoint_id is not None: kpi_descriptor.endpoint_id .CopyFrom(endpoint_id )
if service_id is not None: kpi_descriptor.service_id .CopyFrom(service_id )
if slice_id is not None: kpi_descriptor.slice_id .CopyFrom(slice_id )
if connection_id is not None: kpi_descriptor.connection_id.CopyFrom(connection_id)
if link_id is not None: kpi_descriptor.link_id .CopyFrom(link_id )
kpi_id : KpiId = kpi_manager_client.SetKpiDescriptor(kpi_descriptor)
return kpi_id
def start_collector(
telemetry_client : TelemetryFrontendClient,
kpi_id : KpiId,
duration_seconds : float,
interval_seconds : float
) -> CollectorId:
collector = Collector()
collector.collector_id.collector_id.uuid = str(uuid.uuid4())
collector.kpi_id.CopyFrom(kpi_id)
collector.duration_s = duration_seconds
collector.interval_s = interval_seconds
collector_id : CollectorId = telemetry_client.StartCollector(collector)
return collector_id
...@@ -14,7 +14,13 @@ ...@@ -14,7 +14,13 @@
import logging, signal, sys, threading import logging, signal, sys, threading
from prometheus_client import start_http_server from prometheus_client import start_http_server
from common.Settings import get_log_level, get_metrics_port from automation.service.EventEngine import EventEngine
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
get_env_var_name, get_log_level, get_metrics_port,
wait_for_environment_variables
)
from .AutomationService import AutomationService from .AutomationService import AutomationService
LOG_LEVEL = get_log_level() LOG_LEVEL = get_log_level()
...@@ -29,6 +35,22 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused ...@@ -29,6 +35,22 @@ def signal_handler(signal, frame): # pylint: disable=redefined-outer-name,unused
def main(): def main():
LOGGER.info('Starting...') LOGGER.info('Starting...')
wait_for_environment_variables([
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.CONTEXT, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.DEVICE, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.KPIMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.TELEMETRY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.ANALYTICS, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_HOST ),
get_env_var_name(ServiceNameEnum.POLICY, ENVVAR_SUFIX_SERVICE_PORT_GRPC),
])
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
...@@ -36,7 +58,11 @@ def main(): ...@@ -36,7 +58,11 @@ def main():
metrics_port = get_metrics_port() metrics_port = get_metrics_port()
start_http_server(metrics_port) start_http_server(metrics_port)
# Starting context service # Start Event Collection+Dispatching Engine
event_engine = EventEngine(terminate=terminate)
event_engine.start()
# Starting Automation service
grpc_service = AutomationService() grpc_service = AutomationService()
grpc_service.start() grpc_service.start()
...@@ -45,6 +71,7 @@ def main(): ...@@ -45,6 +71,7 @@ def main():
LOGGER.info('Terminating...') LOGGER.info('Terminating...')
grpc_service.stop() grpc_service.stop()
event_engine.stop()
LOGGER.info('Bye') LOGGER.info('Bye')
return 0 return 0
......
...@@ -67,9 +67,9 @@ class ServiceNameEnum(Enum): ...@@ -67,9 +67,9 @@ class ServiceNameEnum(Enum):
KPIMANAGER = 'kpi-manager' KPIMANAGER = 'kpi-manager'
KPIVALUEAPI = 'kpi-value-api' KPIVALUEAPI = 'kpi-value-api'
KPIVALUEWRITER = 'kpi-value-writer' KPIVALUEWRITER = 'kpi-value-writer'
TELEMETRYFRONTEND = 'telemetry-frontend' TELEMETRY = 'telemetry'
TELEMETRYBACKEND = 'telemetry-backend' TELEMETRYBACKEND = 'telemetry-backend'
ANALYTICSFRONTEND = 'analytics-frontend' ANALYTICS = 'analytics'
ANALYTICSBACKEND = 'analytics-backend' ANALYTICSBACKEND = 'analytics-backend'
QOSPROFILE = 'qos-profile' QOSPROFILE = 'qos-profile'
...@@ -107,9 +107,9 @@ DEFAULT_SERVICE_GRPC_PORTS = { ...@@ -107,9 +107,9 @@ DEFAULT_SERVICE_GRPC_PORTS = {
ServiceNameEnum.KPIMANAGER .value : 30010, ServiceNameEnum.KPIMANAGER .value : 30010,
ServiceNameEnum.KPIVALUEAPI .value : 30020, ServiceNameEnum.KPIVALUEAPI .value : 30020,
ServiceNameEnum.KPIVALUEWRITER .value : 30030, ServiceNameEnum.KPIVALUEWRITER .value : 30030,
ServiceNameEnum.TELEMETRYFRONTEND .value : 30050, ServiceNameEnum.TELEMETRY .value : 30050,
ServiceNameEnum.TELEMETRYBACKEND .value : 30060, ServiceNameEnum.TELEMETRYBACKEND .value : 30060,
ServiceNameEnum.ANALYTICSFRONTEND .value : 30080, ServiceNameEnum.ANALYTICS .value : 30080,
ServiceNameEnum.ANALYTICSBACKEND .value : 30090, ServiceNameEnum.ANALYTICSBACKEND .value : 30090,
ServiceNameEnum.AUTOMATION .value : 30200, ServiceNameEnum.AUTOMATION .value : 30200,
......
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# See usage example below
import grpc, logging, queue, threading, time
from typing import Any, Callable, List, Optional
from common.proto.context_pb2 import Empty
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
class CollectorThread(threading.Thread):
def __init__(
self, subscription_func : Callable, events_queue = queue.PriorityQueue,
terminate = threading.Event, log_events_received: bool = False
) -> None:
super().__init__(daemon=False)
self._subscription_func = subscription_func
self._events_queue = events_queue
self._terminate = terminate
self._log_events_received = log_events_received
self._stream = None
def cancel(self) -> None:
if self._stream is None: return
self._stream.cancel()
def run(self) -> None:
while not self._terminate.is_set():
self._stream = self._subscription_func()
try:
for event in self._stream:
if self._log_events_received:
str_event = grpc_message_to_json_string(event)
LOGGER.info('[_collect] event: {:s}'.format(str_event))
timestamp = event.event.timestamp.timestamp
self._events_queue.put_nowait((timestamp, event))
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE: # pylint: disable=no-member
LOGGER.info('[_collect] UNAVAILABLE... retrying...')
time.sleep(0.5)
continue
elif e.code() == grpc.StatusCode.CANCELLED: # pylint: disable=no-member
break
else:
raise # pragma: no cover
class BaseEventCollector:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._events_queue = queue.PriorityQueue()
self._terminate = threading.Event() if terminate is None else terminate
self._collector_threads : List[CollectorThread] = list()
def install_collector(
self, subscription_method : Callable, request_message : Any,
log_events_received : bool = False
) -> None:
self._collector_threads.append(CollectorThread(
lambda: subscription_method(request_message),
self._events_queue, self._terminate, log_events_received
))
def start(self):
self._terminate.clear()
for collector_thread in self._collector_threads:
collector_thread.start()
def stop(self):
self._terminate.set()
for collector_thread in self._collector_threads:
collector_thread.cancel()
for collector_thread in self._collector_threads:
collector_thread.join()
def get_events_queue(self) -> queue.PriorityQueue:
return self._events_queue
def get_event(self, block : bool = True, timeout : float = 0.1):
try:
_,event = self._events_queue.get(block=block, timeout=timeout)
return event
except queue.Empty: # pylint: disable=catching-non-exception
return None
def get_events(self, block : bool = True, timeout : float = 0.1, count : int = None):
events = []
if count is None:
while not self._terminate.is_set():
event = self.get_event(block=block, timeout=timeout)
if event is None: break
events.append(event)
else:
while len(events) < count:
if self._terminate.is_set(): break
event = self.get_event(block=block, timeout=timeout)
if event is None: continue
events.append(event)
return sorted(events, key=lambda e: e.event.timestamp.timestamp)
def main() -> None:
logging.basicConfig(level=logging.INFO)
context_client = ContextClient()
context_client.connect()
event_collector = BaseEventCollector()
event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True)
event_collector.start()
time.sleep(60)
event_collector.stop()
context_client.close()
if __name__ == '__main__':
main()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# See usage example below
import logging, queue, threading, time
from typing import Any, Callable, Optional
from common.proto.context_pb2 import DeviceEvent, Empty, EventTypeEnum, LinkEvent
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
class BaseEventDispatcher(threading.Thread):
def __init__(
self, events_queue : queue.PriorityQueue,
terminate : Optional[threading.Event] = None
) -> None:
super().__init__(daemon=True)
self._events_queue = events_queue
self._terminate = threading.Event() if terminate is None else terminate
def stop(self):
self._terminate.set()
def _get_event(self, block : bool = True, timeout : Optional[float] = 0.5) -> Optional[Any]:
try:
_, event = self._events_queue.get(block=block, timeout=timeout)
return event
except queue.Empty:
return None
def _get_dispatcher(self, event : Any) -> Optional[Callable]:
object_name = str(event.__class__.__name__).lower().replace('event', '')
event_type = EventTypeEnum.Name(event.event.event_type).lower().replace('eventtype_', '')
method_name = 'dispatch_{:s}_{:s}'.format(object_name, event_type)
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
method_name = 'dispatch_{:s}'.format(object_name)
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
method_name = 'dispatch'
dispatcher = getattr(self, method_name, None)
if dispatcher is not None: return dispatcher
return None
def run(self) -> None:
while not self._terminate.is_set():
event = self._get_event()
if event is None: continue
dispatcher = self._get_dispatcher(event)
if dispatcher is None:
MSG = 'No dispatcher available for Event({:s})'
LOGGER.warning(MSG.format(grpc_message_to_json_string(event)))
continue
dispatcher(event)
class MyEventDispatcher(BaseEventDispatcher):
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_link(self, link_event : LinkEvent) -> None:
MSG = 'Processing Link Create/Update/Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(link_event)))
def dispatch(self, event : Any) -> None:
MSG = 'Processing any other Event: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(event)))
def main() -> None:
logging.basicConfig(level=logging.INFO)
context_client = ContextClient()
context_client.connect()
event_collector = BaseEventCollector()
event_collector.install_collector(context_client.GetDeviceEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetLinkEvents, Empty(), log_events_received=True)
event_collector.install_collector(context_client.GetServiceEvents, Empty(), log_events_received=True)
event_collector.start()
event_dispatcher = MyEventDispatcher(event_collector.get_events_queue())
event_dispatcher.start()
time.sleep(60)
event_dispatcher.stop()
event_collector.stop()
context_client.close()
if __name__ == '__main__':
main()
# Copyright 2022-2024 ETSI OSG/SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging, threading, time
from typing import Optional
from common.proto.context_pb2 import DeviceEvent, Empty, ServiceEvent
from common.tools.grpc.BaseEventCollector import BaseEventCollector
from common.tools.grpc.BaseEventDispatcher import BaseEventDispatcher
from common.tools.grpc.Tools import grpc_message_to_json_string
from context.client.ContextClient import ContextClient
LOGGER = logging.getLogger(__name__)
class EventCollector(BaseEventCollector):
pass
class EventDispatcher(BaseEventDispatcher):
def dispatch_device_create(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_update(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_device_remove(self, device_event : DeviceEvent) -> None:
MSG = 'Processing Device Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(device_event)))
def dispatch_service_create(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Create: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_update(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Update: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
def dispatch_service_remove(self, service_event : ServiceEvent) -> None:
MSG = 'Processing Service Remove: {:s}'
LOGGER.info(MSG.format(grpc_message_to_json_string(service_event)))
class ExampleEventEngine:
def __init__(
self, terminate : Optional[threading.Event] = None
) -> None:
self._terminate = threading.Event() if terminate is None else terminate
self._context_client = ContextClient()
self._event_collector = EventCollector(terminate=self._terminate)
self._event_collector.install_collector(
self._context_client.GetDeviceEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetLinkEvents, Empty(),
log_events_received=True
)
self._event_collector.install_collector(
self._context_client.GetServiceEvents, Empty(),
log_events_received=True
)
self._event_dispatcher = EventDispatcher(
self._event_collector.get_events_queue(),
terminate=self._terminate
)
def start(self) -> None:
self._context_client.connect()
self._event_collector.start()
self._event_dispatcher.start()
def stop(self) -> None:
self._terminate.set()
self._event_dispatcher.stop()
self._event_collector.stop()
self._context_client.close()
def main() -> None:
logging.basicConfig(level=logging.INFO)
event_engine = ExampleEventEngine()
event_engine.start()
time.sleep(60)
event_engine.stop()
if __name__ == '__main__':
main()
...@@ -251,8 +251,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer): ...@@ -251,8 +251,15 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
device_id = context_client.SetDevice(device) device_id = context_client.SetDevice(device)
device = context_client.GetDevice(device_id) device = context_client.GetDevice(device_id)
if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED: ztp_service_host = get_env_var_name(ServiceNameEnum.ZTP, ENVVAR_SUFIX_SERVICE_HOST)
device.device_operational_status = request.device_operational_status environment_variables = set(os.environ.keys())
if ztp_service_host in environment_variables:
# ZTP component is deployed; accept status updates
if request.device_operational_status != DeviceOperationalStatusEnum.DEVICEOPERATIONALSTATUS_UNDEFINED:
device.device_operational_status = request.device_operational_status
else:
# ZTP is not deployed; activated during AddDevice and not modified
pass
t4 = time.time() t4 = time.time()
# TODO: use of datastores (might be virtual ones) to enable rollbacks # TODO: use of datastores (might be virtual ones) to enable rollbacks
......
...@@ -16,9 +16,24 @@ FROM python:3.9-slim ...@@ -16,9 +16,24 @@ FROM python:3.9-slim
# Install dependencies # Install dependencies
RUN apt-get --yes --quiet --quiet update && \ RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install wget g++ git && \ apt-get --yes --quiet --quiet install wget g++ git build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/* rm -rf /var/lib/apt/lists/*
# Download, build and install libyang. Note that APT package is outdated
# - Ref: https://github.com/CESNET/libyang
# - Ref: https://github.com/CESNET/libyang-python/
RUN mkdir -p /var/libyang
RUN git clone https://github.com/CESNET/libyang.git /var/libyang
WORKDIR /var/libyang
RUN git fetch
RUN git checkout v2.1.148
RUN mkdir -p /var/libyang/build
WORKDIR /var/libyang/build
RUN cmake -D CMAKE_BUILD_TYPE:String="Release" ..
RUN make
RUN make install
RUN ldconfig
# Set Python to show logs as they occur # Set Python to show logs as they occur
ENV PYTHONUNBUFFERED=0 ENV PYTHONUNBUFFERED=0
...@@ -53,24 +68,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto ...@@ -53,24 +68,6 @@ RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto
RUN rm *.proto RUN rm *.proto
RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \;
# Download, build and install libyang. Note that APT package is outdated
# - Ref: https://github.com/CESNET/libyang
# - Ref: https://github.com/CESNET/libyang-python/
RUN apt-get --yes --quiet --quiet update && \
apt-get --yes --quiet --quiet install build-essential cmake libpcre2-dev python3-dev python3-cffi && \
rm -rf /var/lib/apt/lists/*
RUN mkdir -p /var/libyang
RUN git clone https://github.com/CESNET/libyang.git /var/libyang
WORKDIR /var/libyang
RUN git fetch
RUN git checkout v2.1.148
RUN mkdir -p /var/libyang/build
WORKDIR /var/libyang/build
RUN cmake -D CMAKE_BUILD_TYPE:String="Release" ..
RUN make
RUN make install
RUN ldconfig
# Create component sub-folders, get specific Python packages # Create component sub-folders, get specific Python packages
RUN mkdir -p /var/teraflow/nbi RUN mkdir -p /var/teraflow/nbi
WORKDIR /var/teraflow/nbi WORKDIR /var/teraflow/nbi
......
...@@ -106,7 +106,7 @@ class TelemetryBackendService(GenericGrpcService): ...@@ -106,7 +106,7 @@ class TelemetryBackendService(GenericGrpcService):
Method receives collector request and initiates collecter backend. Method receives collector request and initiates collecter backend.
""" """
# print("Initiating backend for collector: ", collector_id) # print("Initiating backend for collector: ", collector_id)
LOGGER.info("Initiating backend for collector: ", collector_id) LOGGER.info("Initiating backend for collector: {:s}".format(str(collector_id)))
start_time = time.time() start_time = time.time()
while not stop_event.is_set(): while not stop_event.is_set():
if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend if int(collector['duration']) != -1 and time.time() - start_time >= collector['duration']: # condition to terminate backend
...@@ -165,9 +165,9 @@ class TelemetryBackendService(GenericGrpcService): ...@@ -165,9 +165,9 @@ class TelemetryBackendService(GenericGrpcService):
Args: err (KafkaError): Kafka error object. Args: err (KafkaError): Kafka error object.
msg (Message): Kafka message object. msg (Message): Kafka message object.
""" """
if err: if err:
LOGGER.debug('Message delivery failed: {:}'.format(err)) LOGGER.error('Message delivery failed: {:}'.format(err))
# print(f'Message delivery failed: {err}') # print(f'Message delivery failed: {err}')
else: #else:
LOGGER.info('Message delivered to topic {:}'.format(msg.topic())) # LOGGER.debug('Message delivered to topic {:}'.format(msg.topic()))
# print(f'Message delivered to topic {msg.topic()}') # # print(f'Message delivered to topic {msg.topic()}')
...@@ -29,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, ...@@ -29,8 +29,8 @@ RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION,
class TelemetryFrontendClient: class TelemetryFrontendClient:
def __init__(self, host=None, port=None): def __init__(self, host=None, port=None):
if not host: host = get_service_host(ServiceNameEnum.TELEMETRYFRONTEND) if not host: host = get_service_host(ServiceNameEnum.TELEMETRY)
if not port: port = get_service_port_grpc(ServiceNameEnum.TELEMETRYFRONTEND) if not port: port = get_service_port_grpc(ServiceNameEnum.TELEMETRY)
self.endpoint = '{:s}:{:s}'.format(str(host), str(port)) self.endpoint = '{:s}:{:s}'.format(str(host), str(port))
LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint)))
self.channel = None self.channel = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment