Newer
Older
from google.protobuf.json_format import MessageToDict

Lluis Gifre Renom
committed
from common.type_checkers.Assertions import validate_service_id
from compute.client.ComputeClient import ComputeClient

Lluis Gifre Renom
committed
from compute.proto.context_pb2 import Service
from compute.service.ComputeService import ComputeService

Lluis Gifre Renom
committed
from compute.Config import (
GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD, RESTAPI_SERVICE_PORT, RESTAPI_BASE_URL)
from compute.service.rest_server.Server import Server
from compute.service.rest_server.resources.Compute import Compute
from service.service.ServiceService import ServiceService

Lluis Gifre Renom
committed
from service.Config import (
GRPC_SERVICE_PORT as SERVICE_GRPC_SERVICE_PORT, GRPC_MAX_WORKERS as SERVICE_GRPC_MAX_WORKERS,
GRPC_GRACE_PERIOD as SERVICE_GRPC_GRACE_PERIOD)
compute_grpc_port = 10000 + GRPC_SERVICE_PORT # avoid privileged ports
compute_restapi_port = 10000 + RESTAPI_SERVICE_PORT # avoid privileged ports
service_grpc_port = 10000 + SERVICE_GRPC_SERVICE_PORT # avoid privileged ports
os.environ['SERVICESERVICE_SERVICE_HOST'] = '127.0.0.1'
os.environ['SERVICESERVICE_SERVICE_PORT_GRPC'] = str(service_grpc_port)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
@pytest.fixture(scope='session')

Lluis Gifre Renom
committed
def service_service():

Lluis Gifre Renom
committed
port=service_grpc_port, max_workers=SERVICE_GRPC_MAX_WORKERS, grace_period=SERVICE_GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def compute_service(service_service : ServiceService):
_service = ComputeService(port=compute_grpc_port, max_workers=GRPC_MAX_WORKERS, grace_period=GRPC_GRACE_PERIOD)
_service.start()
yield _service
_service.stop()
@pytest.fixture(scope='session')
def compute_service_rest():
_rest_server = Server(port=compute_restapi_port, base_url=RESTAPI_BASE_URL)
_rest_server.add_resource(
Compute, '/restconf/config/compute', endpoint='api.compute')
_rest_server.start()
time.sleep(1) # bring time for the server to start
yield _rest_server
_rest_server.shutdown()
_rest_server.join()
@pytest.fixture(scope='session')
def compute_client(compute_service):
_client = ComputeClient(address='127.0.0.1', port=compute_grpc_port)
yield _client
_client.close()
def test_dummy_create_connectivity_service(compute_client : ComputeClient):
# dummy test: should fail with assertion error
with pytest.raises(AssertionError):
validate_service_id(MessageToDict(
compute_client.CreateConnectivityService(Service()),
including_default_value_fields=True, preserving_proto_field_name=True,
use_integers_for_enums=False))
def test_dummy_create_connectivity_service_rest_api(compute_service_rest : Server):
# should work

Lluis Gifre Renom
committed
request_url = 'http://127.0.0.1:{:s}{:s}/restconf/config/compute'
request_url = request_url.format(str(compute_restapi_port), RESTAPI_BASE_URL)
reply = requests.post(request_url, json={
# here add context of POST request body as JSON
})
json_reply = reply.json()

Lluis Gifre Renom
committed
LOGGER.info('json_reply = {:s}'.format(str(json_reply)))
assert 'succeeded' in json_reply
assert json_reply['succeeded']