diff --git a/manifests/cachingservice.yaml b/manifests/cachingservice.yaml new file mode 100644 index 0000000000000000000000000000000000000000..81bb001a406dac00d0b4b8b5a2232fac446a95e3 --- /dev/null +++ b/manifests/cachingservice.yaml @@ -0,0 +1,57 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cachingservice +spec: + selector: + matchLabels: + app: cachingservice + template: + metadata: + labels: + app: cachingservice + spec: + containers: + - name: redis + image: redis:7.0-alpine + env: + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-secrets + key: REDIS_PASSWORD + ports: + - containerPort: 6379 + name: client + command: ["redis-server"] + args: + - --requirepass + - $(REDIS_PASSWORD) +--- +apiVersion: v1 +kind: Service +metadata: + name: cachingservice +spec: + type: ClusterIP + selector: + app: cachingservice + ports: + - name: redis + port: 6379 + targetPort: 6379 diff --git a/manifests/dbscanservingservice.yaml b/manifests/dbscanservingservice.yaml index ae920143454da2c63bccc6eb74ea75670bad6eff..25175cd48af18e6e8137f041a8682279be65ad5a 100644 --- a/manifests/dbscanservingservice.yaml +++ b/manifests/dbscanservingservice.yaml @@ -31,33 +31,61 @@ spec: image: labs.etsi.org:5050/tfs/controller/dbscanserving:latest imagePullPolicy: Always ports: - - containerPort: 10006 + - containerPort: 10008 + - containerPort: 9192 env: - name: LOG_LEVEL value: "DEBUG" readinessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10006"] + command: ["/bin/grpc_health_probe", "-addr=:10008"] livenessProbe: exec: - command: ["/bin/grpc_health_probe", "-addr=:10006"] + command: ["/bin/grpc_health_probe", "-addr=:10008"] resources: requests: cpu: 250m - memory: 512Mi + memory: 128Mi limits: - cpu: 700m + cpu: 1000m memory: 1024Mi --- apiVersion: v1 kind: Service metadata: name: dbscanservingservice + labels: + app: dbscanservingservice spec: type: ClusterIP selector: app: dbscanservingservice ports: - name: grpc - port: 10006 - targetPort: 10006 + port: 10008 + targetPort: 10008 + - name: metrics + port: 9192 + targetPort: 9192 +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: dbscanservingservice-hpa +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: dbscanservingservice + minReplicas: 1 + maxReplicas: 20 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 80 + #behavior: + # scaleDown: + # stabilizationWindowSeconds: 30 diff --git a/manifests/servicemonitors.yaml b/manifests/servicemonitors.yaml index ec929f757cdf5468a7db7a7c1f1e755611d5327b..eaceab4ee875a00a7ba7175e7d6f95733dc784a4 100644 --- a/manifests/servicemonitors.yaml +++ b/manifests/servicemonitors.yaml @@ -359,3 +359,32 @@ spec: any: false matchNames: - tfs # namespace where the app is running +--- +apiVersion: monitoring.coreos.com/v1 +kind: ServiceMonitor +metadata: + namespace: monitoring # namespace where prometheus is running + name: tfs-dbscanservingservice-metric + labels: + app: dbscanservingservice + #release: prometheus + #release: prom # name of the release + # ( VERY IMPORTANT: You need to know the correct release name by viewing + # the servicemonitor of Prometheus itself: Without the correct name, + # Prometheus cannot identify the metrics of the Flask app as the target.) +spec: + selector: + matchLabels: + # Target app service + #namespace: tfs + app: dbscanservingservice # same as above + #release: prometheus # same as above + endpoints: + - port: metrics # named port in target app + scheme: http + path: /metrics # path to scrape + interval: 5s # scrape interval + namespaceSelector: + any: false + matchNames: + - tfs # namespace where the app is running diff --git a/src/dbscanserving/.gitlab-ci.yml b/src/dbscanserving/.gitlab-ci.yml index 0acb3e0a7ffb339e800092020fcb2616b3044b15..ae3aa231214f26f38c5c9f98633bc02898151a52 100644 --- a/src/dbscanserving/.gitlab-ci.yml +++ b/src/dbscanserving/.gitlab-ci.yml @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ build dbscanserving: before_script: - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY script: - - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile ./src/ + - docker build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile . - docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" after_script: @@ -51,11 +51,11 @@ unit test dbscanserving: - if docker container ls | grep $IMAGE_NAME; then docker rm -f $IMAGE_NAME; else echo "$IMAGE_NAME image is not in the system"; fi script: - docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG" - - docker run --name $IMAGE_NAME -d -p 10006:10006 -v "$PWD/src/$IMAGE_NAME/tests:/opt/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG + - docker run --name $IMAGE_NAME -d -p 10008:10008 -v "$PWD/src/$IMAGE_NAME/tests:/home/${IMAGE_NAME}/results" --network=teraflowbridge --rm $CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG - sleep 5 - docker ps -a - docker logs $IMAGE_NAME - - docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report.xml" + - docker exec -i $IMAGE_NAME bash -c "coverage run -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/home/${IMAGE_NAME}/results/${IMAGE_NAME}_report.xml" - docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing" coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/' after_script: diff --git a/src/dbscanserving/Config.py b/src/dbscanserving/Config.py index e7f7904b0b12cb58cf82d97d2247e57ed339b247..5e462dac105205a3140ddb7b3a9a95c5feee3478 100644 --- a/src/dbscanserving/Config.py +++ b/src/dbscanserving/Config.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,9 +18,7 @@ import logging LOG_LEVEL = logging.DEBUG # gRPC settings -GRPC_SERVICE_PORT = 10006 -GRPC_MAX_WORKERS = 10 -GRPC_GRACE_PERIOD = 60 +GRPC_SERVICE_PORT = 10008 # Prometheus settings METRICS_PORT = 9192 diff --git a/src/dbscanserving/Dockerfile b/src/dbscanserving/Dockerfile index 8328ff4d5ecf8dd373bb5b5e4534c941d6307146..8c57ddb349cf15e307ba833ebb8feb7dc84ee4a8 100644 --- a/src/dbscanserving/Dockerfile +++ b/src/dbscanserving/Dockerfile @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM python:3-slim +FROM python:3.9-slim # Install dependencies RUN apt-get --yes --quiet --quiet update && \ @@ -27,22 +27,56 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.2.0 && \ wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ chmod +x /bin/grpc_health_probe +# Creating a user for security reasons +RUN groupadd -r teraflow && useradd -u 1001 --no-log-init -r -m -g teraflow teraflow +USER teraflow + +RUN mkdir -p /home/teraflow/controller/common + +# set working directory +WORKDIR /home/teraflow/controller + +# Get Python packages per module +ENV VIRTUAL_ENV=/home/teraflow/venv +RUN python3 -m venv ${VIRTUAL_ENV} +ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" + # Get generic Python packages -RUN python3 -m pip install --upgrade pip setuptools wheel pip-tools +RUN python3 -m pip install --upgrade pip +RUN python3 -m pip install --upgrade setuptools wheel +RUN python3 -m pip install --upgrade pip-tools + +# Get common Python packages +# Note: this step enables sharing the previous Docker build steps among all the Python components +COPY --chown=teraflow:teraflow common_requirements.in common_requirements.in +RUN pip-compile --quiet --output-file=common_requirements.txt common_requirements.in +RUN python3 -m pip install -r common_requirements.txt + +# Add common files into working directory +WORKDIR /home/teraflow/controller/common +COPY --chown=teraflow:teraflow src/common/. ./ +RUN rm -rf proto -# Set working directory -WORKDIR /var/teraflow +# Create proto sub-folder, copy .proto files, and generate Python code +WORKDIR /home/teraflow/controller/common/proto +RUN touch __init__.py +COPY --chown=teraflow:teraflow proto/*.proto ./ +RUN python3 -m grpc_tools.protoc -I=. --python_out=. --grpc_python_out=. *.proto +RUN rm *.proto +RUN find . -type f -exec sed -i -E 's/(import\ .*)_pb2/from . \1_pb2/g' {} \; # Create module sub-folders -RUN mkdir -p /var/teraflow/dbscanserving +RUN mkdir -p /home/teraflow/controller/dbscanserving +WORKDIR /home/teraflow/controller # Get Python packages per module -COPY dbscanserving/requirements.in dbscanserving/requirements.in -RUN pip-compile --output-file=dbscanserving/requirements.txt dbscanserving/requirements.in +COPY --chown=teraflow:teraflow ./src/dbscanserving/requirements.in dbscanserving/requirements.in +# consider common and specific requirements to avoid inconsistencies with dependencies +RUN pip-compile --quiet --output-file=dbscanserving/requirements.txt dbscanserving/requirements.in common_requirements.in RUN python3 -m pip install -r dbscanserving/requirements.txt -COPY common/. common -COPY dbscanserving/. dbscanserving +# Add component files into working directory +COPY --chown=teraflow:teraflow ./src/dbscanserving/. dbscanserving # Start dbscanserving service ENTRYPOINT ["python", "-m", "dbscanserving.service"] diff --git a/src/dbscanserving/__init__.py b/src/dbscanserving/__init__.py index 1549d9811aa5d1c193a44ad45d0d7773236c0612..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dbscanserving/__init__.py +++ b/src/dbscanserving/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dbscanserving/client/DbscanServingClient.py b/src/dbscanserving/client/DbscanServingClient.py index ce259d0f25dc2515ed49537f5527443cb552c7c1..362f0975a0f8c4e7bfd670ce228a647df91db7af 100644 --- a/src/dbscanserving/client/DbscanServingClient.py +++ b/src/dbscanserving/client/DbscanServingClient.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,37 +12,62 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc, logging -from common.tools.client.RetryDecorator import retry, delay_exponential -from dbscanserving.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse -from dbscanserving.proto.dbscanserving_pb2_grpc import DetectorStub +import logging +from typing import Counter +import grpc + +from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse +from common.proto.dbscanserving_pb2_grpc import DetectorStub +from common.Settings import get_log_level, get_setting +from common.tools.client.RetryDecorator import delay_exponential, retry + +log_level = get_log_level() +logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 DELAY_FUNCTION = delay_exponential(initial=0.01, increment=2.0, maximum=5.0) -RETRY_DECORATOR = retry(max_retries=MAX_RETRIES, delay_function=DELAY_FUNCTION, prepare_method_name='connect') +RETRY_DECORATOR = retry( + max_retries=MAX_RETRIES, + delay_function=DELAY_FUNCTION, + prepare_method_name="connect", +) + class DbscanServingClient: - def __init__(self, address, port): - self.endpoint = '{:s}:{:s}'.format(str(address), str(port)) - LOGGER.debug('Creating channel to {:s}...'.format(str(self.endpoint))) + def __init__(self, host=None, port=None): + if not host: + host = get_setting("DBSCANSERVINGSERVICE_SERVICE_HOST") + if not port: + port = get_setting("DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC") + self.endpoint = "{:s}:{:s}".format(str(host), str(port)) + LOGGER.debug("Creating channel to {:s}...".format(str(self.endpoint))) self.channel = None self.stub = None self.connect() - LOGGER.debug('Channel created') + LOGGER.debug("Channel created") def connect(self): self.channel = grpc.insecure_channel(self.endpoint) self.stub = DetectorStub(self.channel) def close(self): - if(self.channel is not None): self.channel.close() + if self.channel is not None: + self.channel.close() self.channel = None self.stub = None @RETRY_DECORATOR - def Detect(self, request : DetectionRequest) -> DetectionResponse: - LOGGER.debug('Detect request: {:s}'.format(str(request))) - response = self.stub.Detect(request) - LOGGER.debug('Detect result: {:s}'.format(str(response))) + def Detect(self, request: DetectionRequest) -> DetectionResponse: + LOGGER.debug( + "Detect request with {} samples and {} features".format( + request.num_samples, request.num_features + ) + ) + response: DetectionResponse = self.stub.Detect(request) + LOGGER.debug( + "Detect result with {} cluster indices [{}]".format( + len(response.cluster_indices), Counter(response.cluster_indices) + ) + ) return response diff --git a/src/dbscanserving/client/__init__.py b/src/dbscanserving/client/__init__.py index 1549d9811aa5d1c193a44ad45d0d7773236c0612..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dbscanserving/client/__init__.py +++ b/src/dbscanserving/client/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dbscanserving/genproto.sh b/src/dbscanserving/genproto.sh old mode 100755 new mode 100644 index 449cffdebc5207dc17868b428504475bfab606d6..6c480c673d4081a4e7db1c2ff9741208794ddedd --- a/src/dbscanserving/genproto.sh +++ b/src/dbscanserving/genproto.sh @@ -1,6 +1,6 @@ #!/bin/bash -eu # -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ cd $(dirname $0) rm -rf proto/*.py rm -rf proto/__pycache__ tee proto/__init__.py << EOF > /dev/null -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/dbscanserving/proto/__init__.py b/src/dbscanserving/proto/__init__.py deleted file mode 100644 index 1549d9811aa5d1c193a44ad45d0d7773236c0612..0000000000000000000000000000000000000000 --- a/src/dbscanserving/proto/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - diff --git a/src/dbscanserving/proto/dbscanserving_pb2.py b/src/dbscanserving/proto/dbscanserving_pb2.py deleted file mode 100644 index f2d6c37c7c567394d3b12392a1fa4e0f748f6625..0000000000000000000000000000000000000000 --- a/src/dbscanserving/proto/dbscanserving_pb2.py +++ /dev/null @@ -1,244 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: dbscanserving.proto -"""Generated protocol buffer code.""" -from google.protobuf.internal import enum_type_wrapper -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='dbscanserving.proto', - package='dbscanserving', - syntax='proto3', - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x13\x64\x62scanserving.proto\x12\rdbscanserving\"\x1a\n\x06Sample\x12\x10\n\x08\x66\x65\x61tures\x18\x01 \x03(\x02\"\xc2\x01\n\x10\x44\x65tectionRequest\x12\x0b\n\x03\x65ps\x18\x01 \x01(\x02\x12\x13\n\x0bmin_samples\x18\x02 \x01(\x05\x12%\n\x06metric\x18\x03 \x01(\x0e\x32\x15.dbscanserving.Metric\x12\x13\n\x0bnum_samples\x18\x04 \x01(\x05\x12\x14\n\x0cnum_features\x18\x05 \x01(\x05\x12&\n\x07samples\x18\x06 \x03(\x0b\x32\x15.dbscanserving.Sample\x12\x12\n\nidentifier\x18\x07 \x01(\x05\",\n\x11\x44\x65tectionResponse\x12\x17\n\x0f\x63luster_indices\x18\x01 \x03(\x05*\x17\n\x06Metric\x12\r\n\tEUCLIDEAN\x10\x00\x32W\n\x08\x44\x65tector\x12K\n\x06\x44\x65tect\x12\x1f.dbscanserving.DetectionRequest\x1a .dbscanserving.DetectionResponseb\x06proto3' -) - -_METRIC = _descriptor.EnumDescriptor( - name='Metric', - full_name='dbscanserving.Metric', - filename=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - values=[ - _descriptor.EnumValueDescriptor( - name='EUCLIDEAN', index=0, number=0, - serialized_options=None, - type=None, - create_key=_descriptor._internal_create_key), - ], - containing_type=None, - serialized_options=None, - serialized_start=309, - serialized_end=332, -) -_sym_db.RegisterEnumDescriptor(_METRIC) - -Metric = enum_type_wrapper.EnumTypeWrapper(_METRIC) -EUCLIDEAN = 0 - - - -_SAMPLE = _descriptor.Descriptor( - name='Sample', - full_name='dbscanserving.Sample', - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name='features', full_name='dbscanserving.Sample.features', index=0, - number=1, type=2, cpp_type=6, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=38, - serialized_end=64, -) - - -_DETECTIONREQUEST = _descriptor.Descriptor( - name='DetectionRequest', - full_name='dbscanserving.DetectionRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name='eps', full_name='dbscanserving.DetectionRequest.eps', index=0, - number=1, type=2, cpp_type=6, label=1, - has_default_value=False, default_value=float(0), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='min_samples', full_name='dbscanserving.DetectionRequest.min_samples', index=1, - number=2, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='metric', full_name='dbscanserving.DetectionRequest.metric', index=2, - number=3, type=14, cpp_type=8, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='num_samples', full_name='dbscanserving.DetectionRequest.num_samples', index=3, - number=4, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='num_features', full_name='dbscanserving.DetectionRequest.num_features', index=4, - number=5, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='samples', full_name='dbscanserving.DetectionRequest.samples', index=5, - number=6, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='identifier', full_name='dbscanserving.DetectionRequest.identifier', index=6, - number=7, type=5, cpp_type=1, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=67, - serialized_end=261, -) - - -_DETECTIONRESPONSE = _descriptor.Descriptor( - name='DetectionResponse', - full_name='dbscanserving.DetectionResponse', - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name='cluster_indices', full_name='dbscanserving.DetectionResponse.cluster_indices', index=0, - number=1, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - serialized_options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=263, - serialized_end=307, -) - -_DETECTIONREQUEST.fields_by_name['metric'].enum_type = _METRIC -_DETECTIONREQUEST.fields_by_name['samples'].message_type = _SAMPLE -DESCRIPTOR.message_types_by_name['Sample'] = _SAMPLE -DESCRIPTOR.message_types_by_name['DetectionRequest'] = _DETECTIONREQUEST -DESCRIPTOR.message_types_by_name['DetectionResponse'] = _DETECTIONRESPONSE -DESCRIPTOR.enum_types_by_name['Metric'] = _METRIC -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Sample = _reflection.GeneratedProtocolMessageType('Sample', (_message.Message,), { - 'DESCRIPTOR' : _SAMPLE, - '__module__' : 'dbscanserving_pb2' - # @@protoc_insertion_point(class_scope:dbscanserving.Sample) - }) -_sym_db.RegisterMessage(Sample) - -DetectionRequest = _reflection.GeneratedProtocolMessageType('DetectionRequest', (_message.Message,), { - 'DESCRIPTOR' : _DETECTIONREQUEST, - '__module__' : 'dbscanserving_pb2' - # @@protoc_insertion_point(class_scope:dbscanserving.DetectionRequest) - }) -_sym_db.RegisterMessage(DetectionRequest) - -DetectionResponse = _reflection.GeneratedProtocolMessageType('DetectionResponse', (_message.Message,), { - 'DESCRIPTOR' : _DETECTIONRESPONSE, - '__module__' : 'dbscanserving_pb2' - # @@protoc_insertion_point(class_scope:dbscanserving.DetectionResponse) - }) -_sym_db.RegisterMessage(DetectionResponse) - - - -_DETECTOR = _descriptor.ServiceDescriptor( - name='Detector', - full_name='dbscanserving.Detector', - file=DESCRIPTOR, - index=0, - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_start=334, - serialized_end=421, - methods=[ - _descriptor.MethodDescriptor( - name='Detect', - full_name='dbscanserving.Detector.Detect', - index=0, - containing_service=None, - input_type=_DETECTIONREQUEST, - output_type=_DETECTIONRESPONSE, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), -]) -_sym_db.RegisterServiceDescriptor(_DETECTOR) - -DESCRIPTOR.services_by_name['Detector'] = _DETECTOR - -# @@protoc_insertion_point(module_scope) diff --git a/src/dbscanserving/proto/dbscanserving_pb2_grpc.py b/src/dbscanserving/proto/dbscanserving_pb2_grpc.py deleted file mode 100644 index 895ced1484df2101bb055f28b6a6d3631e7e68da..0000000000000000000000000000000000000000 --- a/src/dbscanserving/proto/dbscanserving_pb2_grpc.py +++ /dev/null @@ -1,66 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -from . import dbscanserving_pb2 as dbscanserving__pb2 - - -class DetectorStub(object): - """Missing associated documentation comment in .proto file.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.Detect = channel.unary_unary( - '/dbscanserving.Detector/Detect', - request_serializer=dbscanserving__pb2.DetectionRequest.SerializeToString, - response_deserializer=dbscanserving__pb2.DetectionResponse.FromString, - ) - - -class DetectorServicer(object): - """Missing associated documentation comment in .proto file.""" - - def Detect(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_DetectorServicer_to_server(servicer, server): - rpc_method_handlers = { - 'Detect': grpc.unary_unary_rpc_method_handler( - servicer.Detect, - request_deserializer=dbscanserving__pb2.DetectionRequest.FromString, - response_serializer=dbscanserving__pb2.DetectionResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'dbscanserving.Detector', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class Detector(object): - """Missing associated documentation comment in .proto file.""" - - @staticmethod - def Detect(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/dbscanserving.Detector/Detect', - dbscanserving__pb2.DetectionRequest.SerializeToString, - dbscanserving__pb2.DetectionResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/dbscanserving/requirements.in b/src/dbscanserving/requirements.in index 4499fc694c408b17bba4278a509e23ff6e8cf101..d5e06028d853376200623c16ebcf4992f4ae60c2 100644 --- a/src/dbscanserving/requirements.in +++ b/src/dbscanserving/requirements.in @@ -1,22 +1 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -grpcio-health-checking -prometheus-client -pytest -pytest-benchmark -grpcio scikit-learn -coverage \ No newline at end of file diff --git a/src/dbscanserving/requirements.txt b/src/dbscanserving/requirements.txt deleted file mode 100644 index 067cdbc2620fabe5805f26055690212a0648d5d8..0000000000000000000000000000000000000000 --- a/src/dbscanserving/requirements.txt +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# -# This file is autogenerated by pip-compile with python 3.9 -# To update, run: -# -# pip-compile --output-file=requirements.txt requirements.in -# -attrs==21.2.0 - # via pytest -grpcio==1.42.0 - # via - # -r requirements.in - # grpcio-health-checking -grpcio-health-checking==1.42.0 - # via -r requirements.in -iniconfig==1.1.1 - # via pytest -joblib==1.1.0 - # via scikit-learn -numpy==1.21.4 - # via - # scikit-learn - # scipy -packaging==21.3 - # via pytest -pluggy==1.0.0 - # via pytest -prometheus-client==0.12.0 - # via -r requirements.in -protobuf==3.19.1 - # via grpcio-health-checking -py==1.11.0 - # via pytest -py-cpuinfo==8.0.0 - # via pytest-benchmark -pyparsing==3.0.6 - # via packaging -pytest==6.2.5 - # via - # -r requirements.in - # pytest-benchmark -pytest-benchmark==3.4.1 - # via -r requirements.in -scikit-learn==1.0.1 - # via -r requirements.in -scipy==1.7.3 - # via scikit-learn -six==1.16.0 - # via grpcio -threadpoolctl==3.0.0 - # via scikit-learn -toml==0.10.2 - # via pytest diff --git a/src/dbscanserving/service/DbscanService.py b/src/dbscanserving/service/DbscanService.py index 632e4056050bfc44da069d7b706eb7b70a16bb34..3511e4e5fdcf85085dc783dc73688b4422c5736e 100644 --- a/src/dbscanserving/service/DbscanService.py +++ b/src/dbscanserving/service/DbscanService.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,23 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -import grpc import logging from concurrent import futures -from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH + +import grpc +from grpc_health.v1.health import OVERALL_HEALTH, HealthServicer from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server -from dbscanserving.proto.dbscanserving_pb2_grpc import add_DetectorServicer_to_server -from dbscanserving.service.DbscanServiceServicerImpl import DbscanServiceServicerImpl -from dbscanserving.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD -BIND_ADDRESS = '0.0.0.0' +from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, + DEFAULT_GRPC_MAX_WORKERS) +from common.proto.dbscanserving_pb2_grpc import add_DetectorServicer_to_server +from dbscanserving.Config import GRPC_SERVICE_PORT +from dbscanserving.service.DbscanServiceServicerImpl import \ + DbscanServiceServicerImpl + +BIND_ADDRESS = "0.0.0.0" LOGGER = logging.getLogger(__name__) + class DbscanService: def __init__( - self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS, - grace_period=GRPC_GRACE_PERIOD): + self, + address=BIND_ADDRESS, + port=GRPC_SERVICE_PORT, + grace_period=DEFAULT_GRPC_GRACE_PERIOD, + max_workers=DEFAULT_GRPC_MAX_WORKERS, + ): self.address = address self.port = port @@ -41,30 +51,41 @@ class DbscanService: self.server = None def start(self): - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port)) - LOGGER.debug('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( - str(self.endpoint), str(self.max_workers))) + self.endpoint = "{:s}:{:s}".format(str(self.address), str(self.port)) + LOGGER.debug( + "Starting Service (tentative endpoint: {:s}, max_workers: {:s})...".format( + str(self.endpoint), str(self.max_workers) + ) + ) self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers) - self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) + self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,)) self.dbscan_servicer = DbscanServiceServicerImpl() add_DetectorServicer_to_server(self.dbscan_servicer, self.server) self.health_servicer = HealthServicer( - experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) + experimental_non_blocking=True, + experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1), + ) add_HealthServicer_to_server(self.health_servicer, self.server) port = self.server.add_insecure_port(self.endpoint) - self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port)) - LOGGER.info('Listening on {:s}...'.format(self.endpoint)) + self.endpoint = "{:s}:{:s}".format(str(self.address), str(port)) + LOGGER.info("Listening on {:s}...".format(self.endpoint)) self.server.start() - self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member + self.health_servicer.set( + OVERALL_HEALTH, HealthCheckResponse.SERVING + ) # pylint: disable=maybe-no-member - LOGGER.debug('Service started') + LOGGER.debug("Service started") def stop(self): - LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period))) + LOGGER.debug( + "Stopping service (grace period {:s} seconds)...".format( + str(self.grace_period) + ) + ) self.health_servicer.enter_graceful_shutdown() self.server.stop(self.grace_period) - LOGGER.debug('Service stopped') + LOGGER.debug("Service stopped") diff --git a/src/dbscanserving/service/DbscanServiceServicerImpl.py b/src/dbscanserving/service/DbscanServiceServicerImpl.py index e71aac7b4c5899f9f5a01c5427666e8e34944b99..42e3fc956a2602575ff61865293e1a469d8f449b 100644 --- a/src/dbscanserving/service/DbscanServiceServicerImpl.py +++ b/src/dbscanserving/service/DbscanServiceServicerImpl.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,32 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os, grpc, logging +import logging + +import grpc from sklearn.cluster import DBSCAN + +from common.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse +from common.proto.dbscanserving_pb2_grpc import DetectorServicer from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method -from dbscanserving.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse -from dbscanserving.proto.dbscanserving_pb2_grpc import DetectorServicer LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('DbscanServing', 'RPC') +METRICS_POOL = MetricsPool('DBSCANServing', 'RPC') class DbscanServiceServicerImpl(DetectorServicer): - def __init__(self): - LOGGER.debug('Creating Servicer...') - LOGGER.debug('Servicer Created') + LOGGER.debug("Creating Servicer...") + LOGGER.debug("Servicer Created") @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def Detect(self, request : DetectionRequest, context : grpc.ServicerContext) -> DetectionResponse: + def Detect( + self, request: DetectionRequest, context: grpc.ServicerContext + ) -> DetectionResponse: if request.num_samples != len(request.samples): - context.set_details("The sample dimension declared does not match with the number of samples received.") - LOGGER.debug(f"The sample dimension declared does not match with the number of samples received. Declared: {request.num_samples} - Received: {len(request.samples)}") + context.set_details( + f"The sample dimension declared ({request.num_samples}) does not match with the number of samples received ({len(request.samples)})." + ) + LOGGER.debug( + f"The sample dimension declared does not match with the number of samples received. Declared: {request.num_samples} - Received: {len(request.samples)}" + ) context.set_code(grpc.StatusCode.INVALID_ARGUMENT) return DetectionResponse() # TODO: implement the validation of the features dimension - clusters = DBSCAN(eps=request.eps, min_samples=request.min_samples).fit_predict([[x for x in sample.features] for sample in request.samples]) + clusters = DBSCAN(eps=request.eps, min_samples=request.min_samples).fit_predict( + [[x for x in sample.features] for sample in request.samples] + ) response = DetectionResponse() for cluster in clusters: response.cluster_indices.append(cluster) diff --git a/src/dbscanserving/service/__init__.py b/src/dbscanserving/service/__init__.py index 1549d9811aa5d1c193a44ad45d0d7773236c0612..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dbscanserving/service/__init__.py +++ b/src/dbscanserving/service/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dbscanserving/service/__main__.py b/src/dbscanserving/service/__main__.py index 391229969bb719760897c706e280cfa4566ba7f9..154effde2e171022aedc11ced909f44c46335ed6 100644 --- a/src/dbscanserving/service/__main__.py +++ b/src/dbscanserving/service/__main__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,53 +12,66 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os, logging, signal, sys, time, threading, multiprocessing +import logging +import signal +import sys +import threading + from prometheus_client import start_http_server -from common.Settings import get_setting -from dbscanserving.Config import ( - GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, LOG_LEVEL, METRICS_PORT) + +from common.Constants import (DEFAULT_GRPC_GRACE_PERIOD, + DEFAULT_GRPC_MAX_WORKERS) +from common.Settings import get_log_level, get_metrics_port, get_setting +from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService terminate = threading.Event() LOGGER = None -def signal_handler(signal, frame): # pylint: disable=redefined-outer-name - LOGGER.warning('Terminate signal received') + +def signal_handler(signal, frame): # pylint: disable=redefined-outer-name + LOGGER.warning("Terminate signal received") terminate.set() -def main(): - global LOGGER # pylint: disable=global-statement - service_port = get_setting('DBSCANSERVICE_SERVICE_PORT_GRPC', default=GRPC_SERVICE_PORT) - max_workers = get_setting('MAX_WORKERS', default=GRPC_MAX_WORKERS ) - grace_period = get_setting('GRACE_PERIOD', default=GRPC_GRACE_PERIOD) - log_level = get_setting('LOG_LEVEL', default=LOG_LEVEL ) - metrics_port = get_setting('METRICS_PORT', default=METRICS_PORT ) +def main(): + global LOGGER # pylint: disable=global-statement + log_level = get_log_level() logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - signal.signal(signal.SIGINT, signal_handler) + service_port = get_setting( + "DBSCANSERVICE_SERVICE_PORT_GRPC", default=GRPC_SERVICE_PORT + ) + grace_period = get_setting("GRACE_PERIOD", default=DEFAULT_GRPC_GRACE_PERIOD) + max_workers = get_setting("MAX_WORKERS", default=DEFAULT_GRPC_MAX_WORKERS) + + signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - LOGGER.info('Starting...') + LOGGER.info("Starting...") # Start metrics server + metrics_port = get_metrics_port() start_http_server(metrics_port) # Starting CentralizedCybersecurity service grpc_service = DbscanService( - port=service_port, max_workers=max_workers, grace_period=grace_period) + port=service_port, max_workers=max_workers, grace_period=grace_period + ) grpc_service.start() # Wait for Ctrl+C or termination signal - while not terminate.wait(timeout=0.1): pass + while not terminate.wait(timeout=0.1): + pass - LOGGER.info('Terminating...') + LOGGER.info("Terminating...") grpc_service.stop() - LOGGER.info('Bye') + LOGGER.info("Bye") return 0 -if __name__ == '__main__': + +if __name__ == "__main__": sys.exit(main()) diff --git a/src/dbscanserving/tests/__init__.py b/src/dbscanserving/tests/__init__.py index 1549d9811aa5d1c193a44ad45d0d7773236c0612..9953c820575d42fa88351cc8de022d880ba96e6a 100644 --- a/src/dbscanserving/tests/__init__.py +++ b/src/dbscanserving/tests/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - diff --git a/src/dbscanserving/tests/test_unitary.py b/src/dbscanserving/tests/test_unitary.py index b23a1a726634465a418b3b3eeb4bc9f0ac6d3f87..877727f0b81b4fe24b0d28f12f10e58f072ba223 100644 --- a/src/dbscanserving/tests/test_unitary.py +++ b/src/dbscanserving/tests/test_unitary.py @@ -1,4 +1,4 @@ -# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,32 +12,51 @@ # See the License for the specific language governing permissions and # limitations under the License. -import random, logging, pytest, numpy -from dbscanserving.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD +import logging +import os +import random +from unittest.mock import patch + +import pytest + +from common.proto.dbscanserving_pb2 import (DetectionRequest, + DetectionResponse, Sample) from dbscanserving.client.DbscanServingClient import DbscanServingClient +from dbscanserving.Config import GRPC_SERVICE_PORT from dbscanserving.service.DbscanService import DbscanService -from dbscanserving.proto.dbscanserving_pb2 import DetectionRequest, DetectionResponse, Sample -port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports +port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) -@pytest.fixture(scope='session') + +@pytest.fixture(scope="session") def dbscanserving_service(): - _service = DbscanService( - port=port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD) + _service = DbscanService(port=port) _service.start() yield _service _service.stop() -@pytest.fixture(scope='session') + +@pytest.fixture(scope="session") def dbscanserving_client(): - _client = DbscanServingClient(address='127.0.0.1', port=port) - yield _client + with patch.dict( + os.environ, + { + "DBSCANSERVINGSERVICE_SERVICE_HOST": "127.0.0.1", + "DBSCANSERVINGSERVICE_SERVICE_PORT_GRPC": str(port), + }, + clear=True, + ): + _client = DbscanServingClient() + yield _client _client.close() -def test_detection_correct(dbscanserving_service, dbscanserving_client: DbscanServingClient): + +def test_detection_correct( + dbscanserving_service, dbscanserving_client: DbscanServingClient +): request: DetectionRequest = DetectionRequest() request.num_samples = 310 @@ -48,25 +67,28 @@ def test_detection_correct(dbscanserving_service, dbscanserving_client: DbscanSe for _ in range(200): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(0., 10.)) + grpc_sample.features.append(random.uniform(0.0, 10.0)) request.samples.append(grpc_sample) - + for _ in range(100): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(50., 60.)) + grpc_sample.features.append(random.uniform(50.0, 60.0)) request.samples.append(grpc_sample) - + for _ in range(10): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(5000., 6000.)) + grpc_sample.features.append(random.uniform(5000.0, 6000.0)) request.samples.append(grpc_sample) response: DetectionResponse = dbscanserving_client.Detect(request) assert len(response.cluster_indices) == 310 -def test_detection_incorrect(dbscanserving_service, dbscanserving_client: DbscanServingClient): + +def test_detection_incorrect( + dbscanserving_service, dbscanserving_client: DbscanServingClient +): request: DetectionRequest = DetectionRequest() request.num_samples = 210 @@ -77,25 +99,28 @@ def test_detection_incorrect(dbscanserving_service, dbscanserving_client: Dbscan for _ in range(200): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(0., 10.)) + grpc_sample.features.append(random.uniform(0.0, 10.0)) request.samples.append(grpc_sample) - + for _ in range(100): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(50., 60.)) + grpc_sample.features.append(random.uniform(50.0, 60.0)) request.samples.append(grpc_sample) - + for _ in range(10): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(5000., 6000.)) + grpc_sample.features.append(random.uniform(5000.0, 6000.0)) request.samples.append(grpc_sample) with pytest.raises(Exception): - response: DetectionResponse = dbscanserving_client.Detect(request) + _: DetectionResponse = dbscanserving_client.Detect(request) -def test_detection_clusters(dbscanserving_service, dbscanserving_client: DbscanServingClient): + +def test_detection_clusters( + dbscanserving_service, dbscanserving_client: DbscanServingClient +): request: DetectionRequest = DetectionRequest() request.num_samples = 310 @@ -106,19 +131,19 @@ def test_detection_clusters(dbscanserving_service, dbscanserving_client: DbscanS for _ in range(200): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(0., 10.)) + grpc_sample.features.append(random.uniform(0.0, 10.0)) request.samples.append(grpc_sample) - + for _ in range(100): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(50., 60.)) + grpc_sample.features.append(random.uniform(50.0, 60.0)) request.samples.append(grpc_sample) - + for _ in range(10): grpc_sample = Sample() for __ in range(100): - grpc_sample.features.append(random.uniform(5000., 6000.)) + grpc_sample.features.append(random.uniform(5000.0, 6000.0)) request.samples.append(grpc_sample) response: DetectionResponse = dbscanserving_client.Detect(request)