Commit 65216531 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Pre-merge cleanup

parent 3d5f1aa3
Loading
Loading
Loading
Loading
+16 −16
Original line number Diff line number Diff line
@@ -22,28 +22,28 @@ stages:
# include the individual .gitlab-ci.yml of each micro-service and tests
include:
  #- local: '/manifests/.gitlab-ci.yml'
  ###- local: '/src/monitoring/.gitlab-ci.yml'
  ###- local: '/src/nbi/.gitlab-ci.yml'
  ###- local: '/src/context/.gitlab-ci.yml'
  ###- local: '/src/device/.gitlab-ci.yml'
  ###- local: '/src/service/.gitlab-ci.yml'
  ###- local: '/src/dbscanserving/.gitlab-ci.yml'
  ###- local: '/src/opticalattackmitigator/.gitlab-ci.yml'
  ###- local: '/src/opticalattackdetector/.gitlab-ci.yml'
  ###- local: '/src/opticalattackmanager/.gitlab-ci.yml'
  ###- local: '/src/ztp/.gitlab-ci.yml'
  ###- local: '/src/policy/.gitlab-ci.yml'
  ###- local: '/src/forecaster/.gitlab-ci.yml'
  - local: '/src/monitoring/.gitlab-ci.yml'
  - local: '/src/nbi/.gitlab-ci.yml'
  - local: '/src/context/.gitlab-ci.yml'
  - local: '/src/device/.gitlab-ci.yml'
  - local: '/src/service/.gitlab-ci.yml'
  - local: '/src/dbscanserving/.gitlab-ci.yml'
  - local: '/src/opticalattackmitigator/.gitlab-ci.yml'
  - local: '/src/opticalattackdetector/.gitlab-ci.yml'
  - local: '/src/opticalattackmanager/.gitlab-ci.yml'
  - local: '/src/ztp/.gitlab-ci.yml'
  - local: '/src/policy/.gitlab-ci.yml'
  - local: '/src/forecaster/.gitlab-ci.yml'
  #- local: '/src/webui/.gitlab-ci.yml'
  #- local: '/src/l3_distributedattackdetector/.gitlab-ci.yml'
  #- local: '/src/l3_centralizedattackdetector/.gitlab-ci.yml'
  #- local: '/src/l3_attackmitigator/.gitlab-ci.yml'
  ###- local: '/src/slice/.gitlab-ci.yml'
  - local: '/src/slice/.gitlab-ci.yml'
  #- local: '/src/interdomain/.gitlab-ci.yml'
  ###- local: '/src/pathcomp/.gitlab-ci.yml'
  - local: '/src/pathcomp/.gitlab-ci.yml'
  #- local: '/src/dlt/.gitlab-ci.yml'
  ###- local: '/src/load_generator/.gitlab-ci.yml'
  ###- local: '/src/bgpls_speaker/.gitlab-ci.yml'
  - local: '/src/load_generator/.gitlab-ci.yml'
  - local: '/src/bgpls_speaker/.gitlab-ci.yml'

  # This should be last one: end-to-end integration tests
  - local: '/src/tests/.gitlab-ci.yml'
+3 −25
Original line number Diff line number Diff line
@@ -34,12 +34,10 @@
#           driver.configure(settings)
#           self.mutex_queues.signal_done(device_uuid)

import logging, threading
import threading
from queue import Queue, Empty
from typing import Dict

LOGGER = logging.getLogger(__name__)

class MutexQueues:
    def __init__(self) -> None:
        # lock to protect dictionary updates
@@ -59,54 +57,34 @@ class MutexQueues:
                self.mutex_queues[queue_name_b] = self.mutex_queues.setdefault(queue_name_a, Queue())

    def wait_my_turn(self, queue_name : str) -> None:
        LOGGER.warning('[wait_my_turn] begin queue_name={:s}'.format(str(queue_name)))
        # create my mutex and enqueue it
        mutex = threading.Event()
        LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex)))
        with self.lock:
            LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues)))
            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
            first_in_queue = (queue.qsize() == 0)
            LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} first_in_queue={:s}'.format(str(queue_name), str(first_in_queue)))
            queue.put_nowait(mutex)

        # if I'm the first in the queue upon addition, means there are no running tasks
        # directly return without waiting
        if first_in_queue:
            LOGGER.warning('[wait_my_turn] end first_in_queue queue_name={:s}'.format(str(queue_name)))
            return
        if first_in_queue: return

        # otherwise, wait for my turn in the queue
        LOGGER.warning('[wait_my_turn] waiting queue_name={:s}'.format(str(queue_name)))
        mutex.wait()
        LOGGER.warning('[wait_my_turn] end wait queue_name={:s}'.format(str(queue_name)))

    def signal_done(self, queue_name : str) -> None:
        LOGGER.warning('[signal_done] begin queue_name={:s}'.format(str(queue_name)))
        # I'm done with my work
        with self.lock:
            LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} mutex_queues={:s}'.format(str(queue_name), str(self.mutex_queues)))
            queue : Queue = self.mutex_queues.setdefault(queue_name, Queue())
            LOGGER.warning('[wait_my_turn] [lock] queue_name={:s} queue={:s}'.format(str(queue_name), str(queue)))
            
            # remove myself from the queue
            try:
                LOGGER.warning('[wait_my_turn] [lock] before get queue_name={:s}'.format(str(queue_name)))
                mutex = queue.get(block=True, timeout=0.1)
                LOGGER.warning('[wait_my_turn] [lock] after get queue_name={:s} mutex={:s}'.format(str(queue_name), str(mutex)))
            except Empty:
                LOGGER.warning('[wait_my_turn] [lock] empty queue_name={:s}'.format(str(queue_name)))
                pass

            # if there are no other tasks queued, return
            if queue.qsize() == 0:
                LOGGER.warning('[wait_my_turn] end queue.qsize==0 queue_name={:s}'.format(str(queue_name)))
                return
            if queue.qsize() == 0: return

            # otherwise, signal the next task in the queue to start
            next_mutex : threading.Event = queue.queue[0]
            LOGGER.warning('[wait_my_turn] [lock] before set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex)))
            next_mutex.set()
            LOGGER.warning('[wait_my_turn] [lock] after set queue_name={:s} next_mutex={:s}'.format(str(queue_name), str(next_mutex)))

            LOGGER.warning('[signal_done] end set queue_name={:s}'.format(str(queue_name)))
+0 −23
Original line number Diff line number Diff line
@@ -92,13 +92,8 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

        t2 = time.time()

        LOGGER.warning('[AddDevice] before add_alias {:s} {:s}'.format(str(device_uuid), str(device_name)))
        self.mutex_queues.add_alias(device_uuid, device_name)
        LOGGER.warning('[AddDevice] after add_alias {:s} {:s}'.format(str(device_uuid), str(device_name)))

        LOGGER.warning('[AddDevice] before wait_my_turn {:s}'.format(str(device_uuid)))
        self.mutex_queues.wait_my_turn(device_uuid)
        LOGGER.warning('[AddDevice] after wait_my_turn {:s}'.format(str(device_uuid)))
        t3 = time.time()
        try:
            driver : _Driver = get_driver(self.driver_instance_cache, device)
@@ -209,9 +204,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

            return device_id
        finally:
            LOGGER.warning('[AddDevice] before signal_done {:s}'.format(str(device_uuid)))
            self.mutex_queues.signal_done(device_uuid)
            LOGGER.warning('[AddDevice] after signal_done {:s}'.format(str(device_uuid)))

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def ConfigureDevice(self, request : Device, context : grpc.ServicerContext) -> DeviceId:
@@ -219,9 +212,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
        device_id = request.device_id
        device_uuid = device_id.device_uuid.uuid

        LOGGER.warning('[ConfigureDevice] before wait_my_turn {:s}'.format(str(device_uuid)))
        self.mutex_queues.wait_my_turn(device_uuid)
        LOGGER.warning('[ConfigureDevice] after wait_my_turn {:s}'.format(str(device_uuid)))
        t1 = time.time()
        try:
            context_client = ContextClient()
@@ -305,17 +296,13 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

            return device_id
        finally:
            LOGGER.warning('[ConfigureDevice] before signal_done {:s}'.format(str(device_uuid)))
            self.mutex_queues.signal_done(device_uuid)
            LOGGER.warning('[ConfigureDevice] after signal_done {:s}'.format(str(device_uuid)))

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def DeleteDevice(self, request : DeviceId, context : grpc.ServicerContext) -> Empty:
        device_uuid = request.device_uuid.uuid

        LOGGER.warning('[DeleteDevice] before wait_my_turn {:s}'.format(str(device_uuid)))
        self.mutex_queues.wait_my_turn(device_uuid)
        LOGGER.warning('[DeleteDevice] after wait_my_turn {:s}'.format(str(device_uuid)))
        try:
            context_client = ContextClient()
            device = get_device(
@@ -330,17 +317,13 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
            context_client.RemoveDevice(request)
            return Empty()
        finally:
            LOGGER.warning('[DeleteDevice] before signal_done {:s}'.format(str(device_uuid)))
            self.mutex_queues.signal_done(device_uuid)
            LOGGER.warning('[DeleteDevice] after signal_done {:s}'.format(str(device_uuid)))

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def GetInitialConfig(self, request : DeviceId, context : grpc.ServicerContext) -> DeviceConfig:
        device_uuid = request.device_uuid.uuid

        LOGGER.warning('[GetInitialConfig] before wait_my_turn {:s}'.format(str(device_uuid)))
        self.mutex_queues.wait_my_turn(device_uuid)
        LOGGER.warning('[GetInitialConfig] after wait_my_turn {:s}'.format(str(device_uuid)))
        try:
            context_client = ContextClient()
            device = get_device(
@@ -363,9 +346,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

            return device_config
        finally:
            LOGGER.warning('[GetInitialConfig] before signal_done {:s}'.format(str(device_uuid)))
            self.mutex_queues.signal_done(device_uuid)
            LOGGER.warning('[GetInitialConfig] after signal_done {:s}'.format(str(device_uuid)))

    @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
    def MonitorDeviceKpi(self, request : MonitoringSettings, context : grpc.ServicerContext) -> Empty:
@@ -383,9 +364,7 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):
                raise OperationFailedException('MonitorDeviceKpi', extra_details=msg)
            device_uuid = kpi_details[0]

        LOGGER.warning('[MonitorDeviceKpi] before wait_my_turn {:s}'.format(str(device_uuid)))
        self.mutex_queues.wait_my_turn(device_uuid)
        LOGGER.warning('[MonitorDeviceKpi] after wait_my_turn {:s}'.format(str(device_uuid)))
        try:
            context_client = ContextClient()
            device = get_device(
@@ -404,6 +383,4 @@ class DeviceServiceServicerImpl(DeviceServiceServicer):

            return Empty()
        finally:
            LOGGER.warning('[MonitorDeviceKpi] before signal_done {:s}'.format(str(device_uuid)))
            self.mutex_queues.signal_done(device_uuid)
            LOGGER.warning('[MonitorDeviceKpi] after signal_done {:s}'.format(str(device_uuid)))
+1 −1
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@
include:
  - local: '/src/tests/ofc22/.gitlab-ci.yml'
  #- local: '/src/tests/oeccpsc22/.gitlab-ci.yml'
  #- local: '/src/tests/ecoc22/.gitlab-ci.yml'
  - local: '/src/tests/ecoc22/.gitlab-ci.yml'
  #- local: '/src/tests/nfvsdn22/.gitlab-ci.yml'
  #- local: '/src/tests/ofc23/.gitlab-ci.yml'
  #- local: '/src/tests/ofc24/.gitlab-ci.yml'
+6 −5
Original line number Diff line number Diff line
@@ -62,10 +62,10 @@ end2end_test ecoc22:
    #- yq -i '((select(.kind=="Deployment").spec.template.spec.containers.[] | select(.name=="server").env.[]) | select(.name=="LOG_LEVEL").value) |= "DEBUG"' manifests/sliceservice.yaml
    #- yq -i '((select(.kind=="Deployment").spec.template.spec.containers.[] | select(.name=="server").env.[]) | select(.name=="LOG_LEVEL").value) |= "DEBUG"' manifests/nbiservice.yaml
    - source src/tests/${TEST_NAME}/deploy_specs.sh
    #- export TFS_REGISTRY_IMAGES="${CI_REGISTRY_IMAGE}"
    #- export TFS_SKIP_BUILD=""
    #- export TFS_IMAGE_TAG="latest"
    #- echo "TFS_REGISTRY_IMAGES=${CI_REGISTRY_IMAGE}"
    - export TFS_REGISTRY_IMAGES="${CI_REGISTRY_IMAGE}"
    - export TFS_SKIP_BUILD="YES"
    - export TFS_IMAGE_TAG="latest"
    - echo "TFS_REGISTRY_IMAGES=${CI_REGISTRY_IMAGE}"

    # Deploy TeraFlowSDN
    - ./deploy/crdb.sh
@@ -86,13 +86,14 @@ end2end_test ecoc22:
      --volume "$PWD/tfs_runtime_env_vars.sh:/var/teraflow/tfs_runtime_env_vars.sh"
      --volume "$PWD/src/tests/${TEST_NAME}:/opt/results"
      $CI_REGISTRY_IMAGE/${TEST_NAME}:latest
  after_script:
    - source src/tests/${TEST_NAME}/deploy_specs.sh
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/contextservice -c server
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/deviceservice -c server
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/pathcompservice -c frontend
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/serviceservice -c server
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/sliceservice -c server
    - kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/nbiservice -c server
  after_script:
    - if docker ps -a | grep ${TEST_NAME}; then docker rm -f ${TEST_NAME}; fi
    - docker images --filter="dangling=true" --quiet | xargs -r docker rmi
  #coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
Loading