Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • 1ed5b8301c6e55ec2a65256b0e47832ed6aacf20
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/307-update-python-version-service
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

OpticalCentralizedAttackDetectorServiceServicerImpl.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    OpticalCentralizedAttackDetectorServiceServicerImpl.py 7.06 KiB
    # Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    import os, grpc, logging, random
    from influxdb import InfluxDBClient
    from common.rpc_method_wrapper.Decorator import create_metrics, safe_and_metered_rpc_method
    from context.client.ContextClient import ContextClient
    from context.Config import GRPC_SERVICE_PORT as CONTEXT_GRPC_SERVICE_PORT
    from monitoring.client.MonitoringClient import MonitoringClient
    from monitoring.Config import GRPC_SERVICE_PORT as MONITORING_GRPC_SERVICE_PORT
    from service.client.ServiceClient import ServiceClient
    from service.Config import GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT
    from dbscanserving.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample
    from dbscanserving.client.DbscanServingClient import DbscanServingClient
    from dbscanserving.Config import GRPC_SERVICE_PORT as DBSCANSERVING_GRPC_SERVICE_PORT
    from opticalattackmitigator.client.OpticalAttackMitigatorClient import OpticalAttackMitigatorClient
    from opticalattackmitigator.proto.optical_attack_mitigator_pb2 import AttackDescription, AttackResponse
    from opticalattackmitigator.Config import GRPC_SERVICE_PORT as ATTACK_MITIGATOR_GRPC_SERVICE_PORT
    from opticalcentralizedattackdetector.proto.context_pb2 import (Empty,
        Context,  ContextId,  ContextIdList,  ContextList,
        Service,  ServiceId,  ServiceIdList,  ServiceList
    )
    from opticalcentralizedattackdetector.proto.monitoring_pb2 import KpiList
    from opticalcentralizedattackdetector.proto.optical_centralized_attack_detector_pb2_grpc import (
        OpticalCentralizedAttackDetectorServiceServicer)
    from opticalcentralizedattackdetector.Config import (
        CONTEXT_SERVICE_ADDRESS, SERVICE_SERVICE_ADDRESS, INFERENCE_SERVICE_ADDRESS, MONITORING_SERVICE_ADDRESS,
        ATTACK_MITIGATOR_SERVICE_ADDRESS)
    
    
    LOGGER = logging.getLogger(__name__)
    
    SERVICE_NAME = 'OpticalCentralizedAttackDetector'
    METHOD_NAMES = ['NotifyServiceUpdate', 'DetectAttack', 'ReportSummarizedKpi', 'ReportKpi']
    METRICS = create_metrics(SERVICE_NAME, METHOD_NAMES)
    
    INFLUXDB_HOSTNAME = os.environ.get("INFLUXDB_HOSTNAME")
    INFLUXDB_USER = os.environ.get("INFLUXDB_USER")
    INFLUXDB_PASSWORD = os.environ.get("INFLUXDB_PASSWORD")
    INFLUXDB_DATABASE = os.environ.get("INFLUXDB_DATABASE")
    context_client: ContextClient = ContextClient(address=CONTEXT_SERVICE_ADDRESS, port=CONTEXT_GRPC_SERVICE_PORT)
    influxdb_client: InfluxDBClient = InfluxDBClient(host=MONITORING_SERVICE_ADDRESS, port=8086, username=INFLUXDB_USER, password=INFLUXDB_PASSWORD, database=INFLUXDB_DATABASE)
    monitoring_client: MonitoringClient = MonitoringClient(server=MONITORING_SERVICE_ADDRESS, port=MONITORING_GRPC_SERVICE_PORT)
    dbscanserving_client: DbscanServingClient = DbscanServingClient(address=INFERENCE_SERVICE_ADDRESS, port=DBSCANSERVING_GRPC_SERVICE_PORT)
    service_client: ServiceClient = ServiceClient(SERVICE_SERVICE_ADDRESS, SERVICE_GRPC_SERVICE_PORT)
    attack_mitigator_client: OpticalAttackMitigatorClient = OpticalAttackMitigatorClient(address=ATTACK_MITIGATOR_SERVICE_ADDRESS, port=ATTACK_MITIGATOR_GRPC_SERVICE_PORT)
    
    
    class OpticalCentralizedAttackDetectorServiceServicerImpl(OpticalCentralizedAttackDetectorServiceServicer):
    
        def __init__(self):
            LOGGER.debug('Creating Servicer...')
            LOGGER.debug('Servicer Created')
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def NotifyServiceUpdate(self, request : Service, context : grpc.ServicerContext) -> Empty:
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def DetectAttack(self, request : Empty, context : grpc.ServicerContext) -> Empty:
            
            # retrieve list with current contexts
            # import pdb; pdb.set_trace()
            context_ids: ContextIdList = context_client.ListContextIds(Empty())
    
            # for each context, retrieve list of current services
            services = []
            for context_id in context_ids.context_ids:
    
                context_services: ServiceIdList = context_client.ListServices(context_id)
                for service in context_services.services:
                    services.append(service)
    
            # get monitoring data for each of the current services
            results = influxdb_client.query('select * from samples;')
    
            for service in services:
                for endpoint in service.service_endpoint_ids:
                    # get instant KPI for this endpoint
                    LOGGER.warning(f'service: {service.service_id.service_uuid.uuid}\t endpoint: {endpoint.endpoint_uuid.uuid}\tdevice: {endpoint.device_id.device_uuid.uuid}')
                    # how to get all KPIs for a particular device?
                    points = results.get_points(tags={'device_id': endpoint.device_id.device_uuid.uuid})
                    print('points:', points)
                    for point in points:
                        print('\t', point)
    
                    # run attack detection for every service
                    request: DetectionRequest = DetectionRequest()
    
                    request.num_samples = 310
                    request.num_features = 100
                    request.eps = 100.5
                    request.min_samples = 50
    
                    for _ in range(200):
                        grpc_sample = Sample()
                        for __ in range(100):
                            grpc_sample.features.append(random.uniform(0., 10.))
                        request.samples.append(grpc_sample)
                        
                    for _ in range(100):
                        grpc_sample = Sample()
                        for __ in range(100):
                            grpc_sample.features.append(random.uniform(50., 60.))
                        request.samples.append(grpc_sample)
                        
                    for _ in range(10):
                        grpc_sample = Sample()
                        for __ in range(100):
                            grpc_sample.features.append(random.uniform(5000., 6000.))
                        request.samples.append(grpc_sample)
    
                    response: DetectionResponse = dbscanserving_client.Detect(request)
    
                    if -1 in response.cluster_indices:  # attack detected
                        attack = AttackDescription()
                        attack.cs_id.uuid = service.service_id.service_uuid.uuid
                        response: AttackResponse = attack_mitigator_client.NotifyAttack(attack)
    
            # if attack is detected, run the attack mitigator
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ReportSummarizedKpi(self, request : KpiList, context : grpc.ServicerContext) -> Empty:
            return Empty()
    
        @safe_and_metered_rpc_method(METRICS, LOGGER)
        def ReportKpi(self, request : KpiList, context : grpc.ServicerContext) -> Empty:
            return Empty()