Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Commit 5d23881a authored by Manuel Angel Jimenez Quesada's avatar Manuel Angel Jimenez Quesada
Browse files

add unitary test inside ci/cd pipeline

parent a708871d
No related branches found
No related tags found
1 merge request!369Resolve "Add test to feature develop during #232"
...@@ -54,6 +54,7 @@ include: ...@@ -54,6 +54,7 @@ include:
- local: '/src/qos_profile/.gitlab-ci.yml' - local: '/src/qos_profile/.gitlab-ci.yml'
- local: '/src/vnt_manager/.gitlab-ci.yml' - local: '/src/vnt_manager/.gitlab-ci.yml'
- local: '/src/e2e_orchestrator/.gitlab-ci.yml' - local: '/src/e2e_orchestrator/.gitlab-ci.yml'
- local: '/src/ztp_server/.gitlab-ci.yml'
# This should be last one: end-to-end integration tests # This should be last one: end-to-end integration tests
- local: '/src/tests/.gitlab-ci.yml' - local: '/src/tests/.gitlab-ci.yml'
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Build, tag, and push the Docker image to the GitLab Docker registry
build ztp_server:
variables:
IMAGE_NAME: 'ztp_server' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker buildx build -t "$IMAGE_NAME:$IMAGE_TAG" -f ./src/$IMAGE_NAME/Dockerfile .
- docker tag "$IMAGE_NAME:$IMAGE_TAG" "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker push "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
after_script:
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
# Apply unit test to the component
unit_test ztp_server:
variables:
IMAGE_NAME: 'ztp_server' # name of the microservice
IMAGE_TAG: 'latest' # tag of the container image (production, development, etc)
stage: unit_test
needs:
- build ztp_server
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
- docker ps -aq | xargs -r docker rm -f
- >
if docker network list | grep teraflowbridge; then
echo "teraflowbridge is already created";
else
docker network create -d bridge teraflowbridge;
fi
- >
if docker container ls | grep $IMAGE_NAME; then
docker rm -f $IMAGE_NAME;
else
echo "$IMAGE_NAME image is not in the system";
fi
- docker container prune -f
script:
- docker pull "$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG"
- docker images --filter="dangling=true" --quiet | xargs -r docker rmi
- >
docker run --name $IMAGE_NAME -d -v "$PWD/src/$IMAGE_NAME/tests:/opt/results"
--network=teraflowbridge
--env LOG_LEVEL=DEBUG
--env FLASK_ENV=development"
$CI_REGISTRY_IMAGE/$IMAGE_NAME:$IMAGE_TAG
- while ! docker logs $IMAGE_NAME 2>&1 | grep -q 'Configured Resources:'; do sleep 1; done
- sleep 5 # Give extra time to container to get ready
- docker ps -a
- docker logs $IMAGE_NAME
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_core.py --junitxml=/opt/results/${IMAGE_NAME}_report_core.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage run --append -m pytest --log-level=INFO --verbose $IMAGE_NAME/tests/test_unitary.py --junitxml=/opt/results/${IMAGE_NAME}_report_unitary.xml"
- docker exec -i $IMAGE_NAME bash -c "coverage report --include='${IMAGE_NAME}/*' --show-missing"
coverage: '/TOTAL\s+\d+\s+\d+\s+(\d+%)/'
after_script:
- docker logs $IMAGE_NAME
- docker rm -f $IMAGE_NAME
- docker network rm teraflowbridge
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && ($CI_MERGE_REQUEST_TARGET_BRANCH_NAME == "develop" || $CI_MERGE_REQUEST_TARGET_BRANCH_NAME == $CI_DEFAULT_BRANCH)'
- if: '$CI_PIPELINE_SOURCE == "push" && $CI_COMMIT_BRANCH == "develop"'
- changes:
- src/common/**/*.py
- proto/*.proto
- src/$IMAGE_NAME/**/*.{py,in,yml}
- src/$IMAGE_NAME/Dockerfile
- src/$IMAGE_NAME/tests/*.py
- manifests/${IMAGE_NAME}service.yaml
- .gitlab-ci.yml
artifacts:
when: always
reports:
junit: src/$IMAGE_NAME/tests/${IMAGE_NAME}_report_*.xml
...@@ -28,7 +28,7 @@ class ZtpServerServiceServicerImpl(ZtpServerServiceServicer): ...@@ -28,7 +28,7 @@ class ZtpServerServiceServicerImpl(ZtpServerServiceServicer):
LOGGER.info('Servicer Created') LOGGER.info('Servicer Created')
@safe_and_metered_rpc_method(METRICS_POOL, LOGGER) @safe_and_metered_rpc_method(METRICS_POOL, LOGGER)
def GetZtpProvisioning(self, request : ProvisioningScriptName, context : grpc.ServicerContext) -> ProvisioningScript: def GetProvisioningScript(self, request : ProvisioningScriptName, context : grpc.ServicerContext) -> ProvisioningScript:
try: try:
provisioningPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', 'data', request.scriptname) provisioningPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..', 'data', request.scriptname)
with open(provisioningPath, 'r') as provisioning_file: with open(provisioningPath, 'r') as provisioning_file:
......
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from common.Constants import ServiceNameEnum
from common.Settings import get_service_baseurl_http, get_service_port_http
USERNAME = 'admin'
PASSWORD = 'admin'
LOCAL_HOST = '127.0.0.1'
MOCKSERVICE_PORT = 10000
ZTP_SERVICE_PORT = get_service_port_http(ServiceNameEnum.ZTP_SERVER) + MOCKSERVICE_PORT # avoid privileged ports
ZTP_SERVICE_PREFIX_URL = get_service_baseurl_http(ServiceNameEnum.ZTP_SERVER) or ''
ZTP_SERVICE_BASE_URL = 'http://{:s}:{:s}@{:s}:{:d}{:s}'.format(
USERNAME, PASSWORD, LOCAL_HOST, ZTP_SERVICE_PORT, ZTP_SERVICE_PREFIX_URL
)
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum, logging, os, pytest, requests, time # subprocess, threading
from typing import Any, Dict, List, Optional, Set, Union
from common.Constants import ServiceNameEnum
from common.Settings import (
ENVVAR_SUFIX_SERVICE_HOST, ENVVAR_SUFIX_SERVICE_PORT_GRPC,
ENVVAR_SUFIX_SERVICE_PORT_HTTP, get_env_var_name
)
from ztp_server.service.rest_server import RestServer
from ztp_server.service.rest_server.ztpServer_plugins.ztp_provisioning_api import register_ztp_provisioning
from .Constants import (
LOCAL_HOST, MOCKSERVICE_PORT, ZTP_SERVICE_BASE_URL, ZTP_SERVICE_PORT,
USERNAME, PASSWORD
)
os.environ[get_env_var_name(ServiceNameEnum.ZTP_SERVER, ENVVAR_SUFIX_SERVICE_HOST )] = str(LOCAL_HOST)
os.environ[get_env_var_name(ServiceNameEnum.ZTP_SERVER, ENVVAR_SUFIX_SERVICE_PORT_HTTP)] = str(ZTP_SERVICE_PORT)
@pytest.fixture(scope='session')
def ztp_server_application():
_rest_server = RestServer()
register_ztp_provisioning(_rest_server)
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
class RestRequestMethod(enum.Enum):
GET = 'get'
POST = 'post'
PUT = 'put'
PATCH = 'patch'
DELETE = 'delete'
EXPECTED_STATUS_CODES : Set[int] = {
requests.codes['OK' ],
requests.codes['CREATED' ],
requests.codes['ACCEPTED' ],
requests.codes['NO_CONTENT'],
}
def do_rest_request(
method : RestRequestMethod, url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
request_url = ZTP_SERVICE_BASE_URL + url
if logger is not None:
msg = 'Request: {:s} {:s}'.format(str(method.value).upper(), str(request_url))
if body is not None: msg += ' body={:s}'.format(str(body))
logger.warning(msg)
reply = requests.request(method.value, request_url, timeout=timeout, json=body, allow_redirects=allow_redirects)
if logger is not None:
logger.warning('Reply: {:s}'.format(str(reply.text)))
assert reply.status_code in expected_status_codes, 'Reply failed with status code {:d}'.format(reply.status_code)
if reply.content and len(reply.content) > 0: return reply.json()
return None
def do_rest_get_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.GET, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_post_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.POST, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_put_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.PUT, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_patch_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.PATCH, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
def do_rest_delete_request(
url : str, body : Optional[Any] = None, timeout : int = 10,
allow_redirects : bool = True, expected_status_codes : Set[int] = EXPECTED_STATUS_CODES,
logger : Optional[logging.Logger] = None
) -> Optional[Union[Dict, List]]:
return do_rest_request(
RestRequestMethod.DELETE, url, body=body, timeout=timeout, allow_redirects=allow_redirects,
expected_status_codes=expected_status_codes, logger=logger
)
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import json
from .PrepareTestScenario import ( # pylint: disable=unused-import
# be careful, order of symbols is important here!
ztp_server_application, do_rest_get_request
)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
BASE_URL = '/provisioning'
def test_get_config_file(ztp_server_application, # pylint: disable=redefined-outer-name, unused-argument
) -> None:
URL = BASE_URL + '/config/ztp.json'
data = {
"ztp": {
"01-provisioning-script": {
"plugin": {
"url": "http://localhost:9001/provisioning/provisioning_script_sonic.sh"
},
"reboot-on-success": True
}}
}
retrieved_data = do_rest_get_request(URL, body=data, logger=LOGGER, expected_status_codes={200})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
def test_get_wrong_config(
ztp_server_application # pylint: disable=redefined-outer-name, unused-argument
) -> None:
URL = BASE_URL + '/config/wrong.json'
retrieved_data = do_rest_get_request(URL, logger=LOGGER, expected_status_codes={503})
LOGGER.debug('retrieved_data={:s}'.format(json.dumps(retrieved_data, sort_keys=True)))
assert len(retrieved_data) == 0
# Copyright 2022-2025 ETSI SDG TeraFlowSDN (TFS) (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import grpc
from ztp_server.client.ZtpClient import ZtpClient
from common.proto.ztp_server_pb2 import ProvisioningScriptName, ProvisioningScript, ZtpFileName, ZtpFile
def test_GetProvisioningScript(
ztp_client : ZtpClient,
): # pylint: disable=redefined-outer-name
ztp_provisioning_file_request = ZtpFileName()
ztp_provisioning_file_request.ProvisioningScriptName = "provisioning_script_sonic.sh" # pylint: disable=no-member
ztp_reply = ztp_client.GetProvisioningScript(ztp_provisioning_file_request)
assert ztp_reply.StatusCode == grpc.StatusCode.OK
assert ztp_reply.ProvisioningScript.script.startswith("#!/bin/bash")
def test_GetProvisioningScript_wrong(
ztp_client : ZtpClient,
): # pylint: disable=redefined-outer-name
ztp_provisioning_file_request = ZtpFileName()
ztp_provisioning_file_request.filename = "wrong_file.sh" # pylint: disable=no-member
ztp_reply = ztp_client.GetProvisioningScript(ztp_provisioning_file_request)
assert ztp_reply.StatusCode == grpc.StatusCode.NOT_FOUND
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment