diff --git a/deploy/subscription_ws.sh b/deploy/subscription_ws.sh index 133abd39676668e7fb0d843edb36a4b262bf22d2..a3d2f939eb3c2e0587dbf9d6f29cc7a756e7c211 100755 --- a/deploy/subscription_ws.sh +++ b/deploy/subscription_ws.sh @@ -21,11 +21,11 @@ # If not already set, set the namespace where CockroackDB will be deployed. export SUBSCRIPTION_WS_NAMESPACE=${SUBSCRIPTION_WS_NAMESPACE:-"tfs"} -# If not already set, set the internal port interface will be exposed to. -export SUBSCRIPTION_WS_INT_PORT=${SUBSCRIPTION_WS_INT_PORT:-"8765"} - # If not already set, set the external port interface will be exposed to. export SUBSCRIPTION_WS_EXT_PORT=${SUBSCRIPTION_WS_EXT_PORT:-"8765"} + +# If not already set, set the external port interface will be exposed to. +export SUBSCRIPTION_WS_INT_PORT=${SUBSCRIPTION_WS_INT_PORT:-"8765"} ######################################################################################################################## # Automated steps start here ######################################################################################################################## @@ -43,3 +43,16 @@ kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress -- echo + +echo "Subscription WebSocket Port Mapping" +echo ">>> ExposeSubscription WebSocket port (${SUBSCRIPTION_WS_INT_PORT}->${SUBSCRIPTION_WS_INT_PORT})" +PATCH='{"data": {"'${SUBSCRIPTION_WS_INT_PORT}'": "'${SUBSCRIPTION_WS_NAMESPACE}'/nbiservice:'${SUBSCRIPTION_WS_INT_PORT}'"}}' +kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}" + +PORT_MAP='{"containerPort": '${SUBSCRIPTION_WS_INT_PORT}', "hostPort": '${SUBSCRIPTION_WS_INT_PORT}'}' +CONTAINER='{"name": "nginx-ingress-microk8s", "ports": ['${PORT_MAP}']}' +PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}' +kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}" +echo + + diff --git a/manifests/e2e_orchestratorservice.yaml b/manifests/e2e_orchestratorservice.yaml index 9763b7a56ed5b801b0c6f9494c263b8a95a66246..dfbfff816d274b3b6ba67bf0d7d4777eb0d329f1 100644 --- a/manifests/e2e_orchestratorservice.yaml +++ b/manifests/e2e_orchestratorservice.yaml @@ -40,7 +40,7 @@ spec: - containerPort: 8765 env: - name: LOG_LEVEL - value: "INFO" + value: "DEBUG" readinessProbe: exec: command: ["/bin/grpc_health_probe", "-addr=:10050"] diff --git a/my_deploy.sh b/my_deploy.sh index deb0d85625bfc6ce46f268b05d382ccf55ba34b5..1c12f5b96d192d7f62e59df26edc85e769d01ce8 100755 --- a/my_deploy.sh +++ b/my_deploy.sh @@ -57,10 +57,10 @@ export TFS_COMPONENTS="context device pathcomp service slice nbi webui load_gene #export TFS_COMPONENTS="${TFS_COMPONENTS} forecaster" # Uncomment to activate E2E Orchestrator -#export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" +export TFS_COMPONENTS="${TFS_COMPONENTS} e2e_orchestrator" # Uncomment to activate VNT Manager -#export TFS_COMPONENTS="${TFS_COMPONENTS} vnt_manager" +export TFS_COMPONENTS="${TFS_COMPONENTS} vnt_manager" # Set the tag you want to use for your images. export TFS_IMAGE_TAG="dev" diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index cf4475ad02f4b05b5d3f629eaa35109a75c4f2e4..1fe4d047839d5b400ae46898da748e8969e0c803 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -21,13 +21,14 @@ from websockets.sync.client import connect import time from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply -from common.proto.context_pb2 import Empty, Connection, EndPointId +from common.proto.context_pb2 import Empty, Connection, EndPointId, Link, LinkId from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply from common.tools.grpc.Tools import grpc_message_to_json_string from websockets.sync.server import serve +import json LOGGER = logging.getLogger(__name__) @@ -36,10 +37,6 @@ METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") context_client: ContextClient = ContextClient() -def event_received(websocket): - for message in websocket: - LOGGER.info("Message received!!!: {}".format(message)) - websocket.send(message) class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): @@ -52,7 +49,7 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): LOGGER.info("Requesting subscription") self.RequestSubscription() except Exception as E: - LOGGER.info("Exception!: {}".format(E)) + LOGGER.info("Exception0!: {}".format(E)) @@ -108,13 +105,17 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): path.connections.append(conn) def RequestSubscription(self): - HOST = "10.1.1.83" - PORT = str(8765) + LOGGER.info("Trying to connect...!!!") + OWN_HOST = "10.1.1.83" + OWN_PORT = "8765" - url = "ws://" + str(HOST) + ":" + str(PORT) + EXT_HOST = "10.1.1.83" + EXT_PORT = "8765" + + url = "ws://" + EXT_HOST + ":" + EXT_PORT request = VNTSubscriptionRequest() - request.host = HOST - request.port = PORT + request.host = OWN_HOST + request.port = OWN_PORT LOGGER.info("Trying to connect... to {}".format(url)) with connect(url, logger=LOGGER) as websocket: send = grpc_message_to_json_string(request) @@ -132,13 +133,32 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): LOGGER.info('Exception2!: {}'.format(e)) - with serve(event_received, HOST, PORT, logger=LOGGER) as server: - LOGGER.info("Running subscription server...: {}:{}".format(HOST, str(PORT))) + with serve(self._event_received, "0.0.0.0", OWN_PORT, logger=LOGGER) as server: + LOGGER.info("Running subscription server...: {}:{}".format("0.0.0.0", OWN_PORT)) server.serve_forever() LOGGER.info("Exiting subscription server...") - + def _event_received(self, websocket): + for message in websocket: + LOGGER.info("Message received!!!: {}".format(message)) + message_json = json.loads(message) + if 'event_type' in message_json: + + pass + elif 'link_id' in message_json: + obj = Link(**message_json) + if self._check_policies(obj): + pass + elif 'link_uuid' in message_json: + obj = LinkId(**message_json) + + websocket.send(message) + + + def _check_policies(self, link): + return True + diff --git a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py index 876877c25957f300a7d8a513cc5a93e7fa163b6b..abf240fb798f18fc3cce5ec82e3f0cc28f3eee64 100644 --- a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py +++ b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Resources.py @@ -22,7 +22,7 @@ from service.client.ServiceClient import ServiceClient from vnt_manager.client.VNTManagerClient import VNTManagerClient from .Tools import ( - format_grpc_to_json, grpc_connection_id, grpc_context_id, grpc_device_id, grpc_link_id, grpc_policy_rule_id, + format_grpc_to_json, grpc_connection_id, grpc_context_id, grpc_device_id, grpc_link, grpc_link_id, grpc_policy_rule_id, grpc_service_id, grpc_service, grpc_slice_id, grpc_topology_id) class _Resource(Resource): @@ -191,21 +191,21 @@ class Link(_Resource): class VirtualLinkIds(_Resource): def get(self): - return format_grpc_to_json(self.vntmanager_client.ListLinkIds(Empty())) + return format_grpc_to_json(self.vntmanager_client.ListVirtualLinkIds(Empty())) class VirtualLinks(_Resource): def get(self): - return format_grpc_to_json(self.vntmanager_client.ListLinks(Empty())) + return format_grpc_to_json(self.vntmanager_client.ListVirtualLinks(Empty())) class VirtualLink(_Resource): def get(self, link_uuid : str): - return format_grpc_to_json(self.vntmanager_client.GetLink(grpc_link_id(link_uuid))) - def post(self, link_uuid : str): + return format_grpc_to_json(self.vntmanager_client.GetVirtualLink(grpc_link_id(link_uuid))) + def post(self): link = request.get_json() - return format_grpc_to_json(self.vntmanager_client.SetLink(grpc_link_id(link))) - def put(self, link_uuid : str): + return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link))) + def put(self): link = request.get_json() - return format_grpc_to_json(self.vntmanager_client.SetLink(grpc_link_id(link))) + return format_grpc_to_json(self.vntmanager_client.SetVirtualLink(grpc_link(link))) def delete(self, link_uuid : str): return format_grpc_to_json(self.vntmanager_client.RemoveVirtualLink(grpc_link_id(link_uuid))) diff --git a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Tools.py b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Tools.py index fd5eb2316d44f4f13e6d8bedef7411beee80c46a..d101a6569c36c540897ad74e95c7375a7b24da7f 100644 --- a/src/nbi/service/rest_server/nbi_plugins/tfs_api/Tools.py +++ b/src/nbi/service/rest_server/nbi_plugins/tfs_api/Tools.py @@ -14,7 +14,7 @@ from flask.json import jsonify from common.proto.context_pb2 import ( - ConnectionId, ContextId, DeviceId, LinkId, ServiceId, SliceId, TopologyId, Service, ServiceStatusEnum + ConnectionId, ContextId, DeviceId, Link, LinkId, ServiceId, SliceId, TopologyId, Service, ServiceStatusEnum ) from common.proto.policy_pb2 import PolicyRuleId from common.tools.grpc.Tools import grpc_message_to_json @@ -24,7 +24,7 @@ from common.tools.object_factory.ConfigRule import json_config_rule from common.tools.object_factory.Constraint import json_constraint_custom from common.tools.object_factory.EndPoint import json_endpoint_id from common.tools.object_factory.Device import json_device_id -from common.tools.object_factory.Link import json_link_id +from common.tools.object_factory.Link import json_link_id, json_link from common.tools.object_factory.PolicyRule import json_policyrule_id from common.tools.object_factory.Service import json_service_id, json_service from common.tools.object_factory.Slice import json_slice_id @@ -46,6 +46,9 @@ def grpc_device_id(device_uuid): def grpc_link_id(link_uuid): return LinkId(**json_link_id(link_uuid)) +def grpc_link(link): + return Link(**json_link(link.link_id.uuid, link.link_endpoint_ids)) + def grpc_service_id(context_uuid, service_uuid): return ServiceId(**json_service_id(service_uuid, context_id=json_context_id(context_uuid))) diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index e2b110de062cf7f6725171674f7b2d546c69d4d8..9d2e8364a795edf22dae80f39e7a0e8cf111817f 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -60,6 +60,16 @@ HOST = "10.1.1.83" PORT = str(8765) +def send_msg(url, msg): + LOGGER.info("Sending event to {}".format(url)) + with connect(url, logger=LOGGER) as websocket: + send = grpc_message_to_json_string(msg) + LOGGER.info("Sending {}".format(send)) + websocket.send(send) + message = websocket.recv() + LOGGER.info("Received ws: {}".format(message)) + + class VNTMEventDispatcher(threading.Thread): def __init__(self, host, port) -> None: @@ -90,27 +100,24 @@ class VNTMEventDispatcher(threading.Thread): activate_connection_collector = False,) events_collector.start() - while not self._terminate.is_set(): - event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) - if event is None: continue - url = "ws://" + str(self.host) + ":" + str(self.port) - request = VNTSubscriptionRequest() - request.host = self.host - request.port = self.port - LOGGER.info("Sending event to {}".format(url)) - with connect(url, logger=LOGGER) as websocket: - send = grpc_message_to_json_string(request) - LOGGER.info("Sending {}".format(send)) - websocket.send(send) - message = websocket.recv() - LOGGER.info("Received ws: {}".format(message)) + url = "ws://" + str(self.host) + ":" + str(self.port) - - events_collector.stop() + request = VNTSubscriptionRequest() + request.host = str(self.host) + request.port = str(self.port) + + send_msg(url, request) + while not self._terminate.is_set(): + event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue + + send_msg(url, event) + + events_collector.stop() class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @@ -129,6 +136,8 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): reply.subscription = "OK" event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) + self.host = request.host + self.port = request.port event_dispatcher.start() @@ -144,21 +153,39 @@ class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: - for link in self.links: - if link.link_uuid.uuid == request.uuid: - return link + try: + url = "ws://" + str(self.host) + ":" + str(self.port) + send_msg(url, request) + except Exception as e: + LOGGER.error('Exection getting virtual link={}\n\t{}'.format(request.link_uuid.uuid, e)) + else: + for link in self.links: + if link.link_uuid.uuid == request.uuid: + return link return Empty() @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: - self.links.append(request) - return request.linkd_id + try: + url = "ws://" + str(self.host) + ":" + str(self.port) + send_msg(url, request) + except Exception as e: + LOGGER.error('Exection setting virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) + else: + self.links.append(request) + return request.linkd_id @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: - for link in self.links: - if link.link_uuid.uuid == request.uuid: - self.links.remove(link) - return Empty() + try: + url = "ws://" + str(self.host) + ":" + str(self.port) + send_msg(url, request) + except Exception as e: + LOGGER.error('Exection removing virtual link={}\n\t{}'.format(request.link_id.link_uuid.uuid, e)) + else: + for link in self.links: + if link.link_uuid.uuid == request.uuid: + self.links.remove(link) + return Empty() return Empty()