Commit d9750f58 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

feat: Enhance CollectorWorker with HTTP Basic Auth and update request handling...

feat: Enhance CollectorWorker with HTTP Basic Auth and update request handling in stream and simap polling
parent 3a6f95e9
Loading
Loading
Loading
Loading
+3 −3
Original line number Original line Diff line number Diff line
@@ -160,7 +160,7 @@ def set_simap_network(context_client: ContextClient, simap_client: SimapClient,
            link.update(
            link.update(
                'sdp1', endpoints[0], 'sdp2', endpoints[1],
                'sdp1', endpoints[0], 'sdp2', endpoints[1],
                supporting_link_ids=[
                supporting_link_ids=[
                    ('admin', 'L1'), ('admin', 'L3'), ('agg', 'AggNet-L1')
                    ('admin', 'L1'), ('agg', 'AggNet-L1')
                ]
                ]
            )
            )
        except (KeyError, IndexError, ValueError) as e:
        except (KeyError, IndexError, ValueError) as e:
@@ -194,7 +194,7 @@ def set_simap_network(context_client: ContextClient, simap_client: SimapClient,
            link.update(
            link.update(
                'sdp1', endpoints[0], 'sdp2', endpoints[1],
                'sdp1', endpoints[0], 'sdp2', endpoints[1],
                supporting_link_ids=[
                supporting_link_ids=[
                    ('trans-pkt', 'Trans-L1'), ('admin', 'L13')
                    ('trans-pkt', 'Trans-L1'), ('admin', 'L13'), ('admin', 'L3')
                ]
                ]
            )
            )
        except (KeyError, IndexError, ValueError) as e:
        except (KeyError, IndexError, ValueError) as e:
+4 −2
Original line number Original line Diff line number Diff line
@@ -14,6 +14,7 @@




import json, math, requests, threading, time
import json, math, requests, threading, time
from requests.auth import HTTPBasicAuth
from requests.exceptions import ReadTimeout
from requests.exceptions import ReadTimeout
from typing import Optional
from typing import Optional
from .data.AggregationCache import AggregationCache, LinkSample
from .data.AggregationCache import AggregationCache, LinkSample
@@ -31,6 +32,7 @@ CONTROLLER_TO_ADDRESS_PORT = {


WAIT_LOOP_GRANULARITY = 0.5
WAIT_LOOP_GRANULARITY = 0.5


AUTH = HTTPBasicAuth('admin', 'admin')


class CollectorWorker(_Worker):
class CollectorWorker(_Worker):
    def __init__(
    def __init__(
@@ -73,7 +75,7 @@ class CollectorWorker(_Worker):
            # NOTE: Trick: we set 1-second read_timeout to force the loop to give control
            # NOTE: Trick: we set 1-second read_timeout to force the loop to give control
            # back and be able to check termination events.
            # back and be able to check termination events.
            # , timeout=(10, 1)
            # , timeout=(10, 1)
            with session.get(stream_url, stream=True) as reply:
            with session.get(stream_url, stream=True, auth=AUTH) as reply:
                reply.raise_for_status()
                reply.raise_for_status()


                it_lines = reply.iter_lines(decode_unicode=True, chunk_size=1024)
                it_lines = reply.iter_lines(decode_unicode=True, chunk_size=1024)
@@ -140,7 +142,7 @@ class CollectorWorker(_Worker):
            MSG = '[direct_simap_polling] Requesting "{:s}"...'
            MSG = '[direct_simap_polling] Requesting "{:s}"...'
            self._logger.info(MSG.format(str(simap_url)))
            self._logger.info(MSG.format(str(simap_url)))


            with requests.get(simap_url, timeout=10) as reply:
            with requests.get(simap_url, timeout=10, auth=AUTH) as reply:
                reply.raise_for_status()
                reply.raise_for_status()
                data = reply.json()
                data = reply.json()