diff --git a/.gitignore b/.gitignore index a9144d6699af12319a67e8bad5cec982f3ae6a8c..20b98c30c5b3edb0983578b0a5f74fb1c1f3025e 100644 --- a/.gitignore +++ b/.gitignore @@ -171,5 +171,8 @@ local_k8s_deployment.sh # asdf configuration .tool-versions +# libyang build files +libyang/ + # Other logs **/logs/*.log.* diff --git a/deploy/all.sh b/deploy/all.sh index 25d69b485daff4dd0307e6fa85d3a4d47d54b72a..d99ffa88cfd483ed05d29cc9864198ba7a40bf06 100755 --- a/deploy/all.sh +++ b/deploy/all.sh @@ -175,6 +175,9 @@ export GRAF_EXT_PORT_HTTP=${GRAF_EXT_PORT_HTTP:-"3000"} # Deploy TeraFlowSDN ./deploy/tfs.sh +#Configure Subscription WS +#./deploy/subscription_ws.sh + # Show deploy summary ./deploy/show.sh diff --git a/deploy/subscription_ws.sh b/deploy/subscription_ws.sh new file mode 100755 index 0000000000000000000000000000000000000000..133abd39676668e7fb0d843edb36a4b262bf22d2 --- /dev/null +++ b/deploy/subscription_ws.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +######################################################################################################################## +# Read deployment settings +######################################################################################################################## + +# If not already set, set the namespace where CockroackDB will be deployed. +export SUBSCRIPTION_WS_NAMESPACE=${SUBSCRIPTION_WS_NAMESPACE:-"tfs"} + +# If not already set, set the internal port interface will be exposed to. +export SUBSCRIPTION_WS_INT_PORT=${SUBSCRIPTION_WS_INT_PORT:-"8765"} + +# If not already set, set the external port interface will be exposed to. +export SUBSCRIPTION_WS_EXT_PORT=${SUBSCRIPTION_WS_EXT_PORT:-"8765"} +######################################################################################################################## +# Automated steps start here +######################################################################################################################## + + +echo "Subscription WebSocket Port Mapping" +echo ">>> ExposeSubscription WebSocket port (${SUBSCRIPTION_WS_EXT_PORT}->${SUBSCRIPTION_WS_EXT_PORT})" +PATCH='{"data": {"'${SUBSCRIPTION_WS_EXT_PORT}'": "'${SUBSCRIPTION_WS_NAMESPACE}'/nbiservice:'${SUBSCRIPTION_WS_EXT_PORT}'"}}' +kubectl patch configmap nginx-ingress-tcp-microk8s-conf --namespace ingress --patch "${PATCH}" + +PORT_MAP='{"containerPort": '${SUBSCRIPTION_WS_EXT_PORT}', "hostPort": '${SUBSCRIPTION_WS_EXT_PORT}'}' +CONTAINER='{"name": "nginx-ingress-microk8s", "ports": ['${PORT_MAP}']}' +PATCH='{"spec": {"template": {"spec": {"containers": ['${CONTAINER}']}}}}' +kubectl patch daemonset nginx-ingress-microk8s-controller --namespace ingress --patch "${PATCH}" +echo + + diff --git a/install_requirements.sh b/install_requirements.sh index c59ea7f13909c789fef587192a56876187eb4195..65f60c1217c0c9b52efdda706b91f92434cc8a6c 100755 --- a/install_requirements.sh +++ b/install_requirements.sh @@ -32,8 +32,11 @@ sudo apt-get --yes --quiet --quiet update sudo apt-get --yes --quiet --quiet install build-essential cmake libpcre2-dev python3-dev python3-cffi mkdir libyang git clone https://github.com/CESNET/libyang.git libyang +git fetch +git checkout v2.1.148 mkdir libyang/build cd libyang/build +echo "*" > .gitignore cmake -D CMAKE_BUILD_TYPE:String="Release" .. make sudo make install diff --git a/manifests/e2e_orchestratorservice.yaml b/manifests/e2e_orchestratorservice.yaml index 90d37771171d1f062a17d071bebe1fd1fee859ad..11d7dc398a042914ce8a1ed20c55557020d2c7fe 100644 --- a/manifests/e2e_orchestratorservice.yaml +++ b/manifests/e2e_orchestratorservice.yaml @@ -22,6 +22,9 @@ spec: app: e2e-orchestratorservice template: metadata: + annotations: + config.linkerd.io/skip-outbound-ports: "8765" + config.linkerd.io/skip-inbound-ports: "8765" labels: app: e2e-orchestratorservice spec: @@ -33,6 +36,7 @@ spec: ports: - containerPort: 10050 - containerPort: 9192 + - containerPort: 8765 env: - name: LOG_LEVEL value: "INFO" @@ -67,6 +71,9 @@ spec: - name: metrics port: 9192 targetPort: 9192 + - name: ws + port: 8765 + targetPort: 8765 --- apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler diff --git a/manifests/nbiservice.yaml b/manifests/nbiservice.yaml index 0a3bd1ea69918429de73d58812248269ccf70a56..1d0ffc1a1162d798324cd548d3d885808226a211 100644 --- a/manifests/nbiservice.yaml +++ b/manifests/nbiservice.yaml @@ -23,6 +23,9 @@ spec: replicas: 1 template: metadata: + annotations: + config.linkerd.io/skip-outbound-ports: "8765" + config.linkerd.io/skip-inbound-ports: "8765" labels: app: nbiservice spec: @@ -35,6 +38,7 @@ spec: - containerPort: 8080 - containerPort: 9090 - containerPort: 9192 + - containerPort: 8765 env: - name: LOG_LEVEL value: "INFO" @@ -75,3 +79,7 @@ spec: protocol: TCP port: 9192 targetPort: 9192 + - name: websocket + protocol: TCP + port: 8765 + targetPort: 8765 diff --git a/proto/e2eorchestrator.proto b/proto/e2eorchestrator.proto index 9eed8523e52faa32d4397bb4635a1cf0c56aa4d5..d4cd868ee1d3ad58a3a0a6e55882819d15d0ca0c 100644 --- a/proto/e2eorchestrator.proto +++ b/proto/e2eorchestrator.proto @@ -20,7 +20,9 @@ import "context.proto"; service E2EOrchestratorService { - rpc Compute(E2EOrchestratorRequest) returns (E2EOrchestratorReply) {} + rpc Compute(E2EOrchestratorRequest) returns (E2EOrchestratorReply) {} + rpc PushTopology(context.Topology) returns (context.Empty) {} + } message E2EOrchestratorRequest { diff --git a/proto/vnt_manager.proto b/proto/vnt_manager.proto new file mode 100644 index 0000000000000000000000000000000000000000..d5647352279bed3ac6b1aa196adbb0968d9c508e --- /dev/null +++ b/proto/vnt_manager.proto @@ -0,0 +1,37 @@ +// Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// protocol buffers documentation: https://developers.google.com/protocol-buffers/docs/proto3 +syntax = "proto3"; +package vnt_manager; +import "context.proto"; + + +service VNTManagerService { + rpc VNTSubscript (VNTSubscriptionRequest) returns (VNTSubscriptionReply) {} + rpc ListVirtualLinkIds (context.Empty) returns (context.LinkIdList) {} + rpc ListVirtualLinks (context.Empty) returns (context.LinkList) {} + rpc GetVirtualLink (context.LinkId) returns (context.Link) {} + rpc SetVirtualLink (context.Link) returns (context.LinkId) {} + rpc RemoveVirtualLink (context.LinkId) returns (context.Empty) {} +} + +message VNTSubscriptionRequest { + string host = 1; + string port = 2; +} + +message VNTSubscriptionReply { + string subscription = 1; +} diff --git a/scripts/show_logs_e2eorchestrator.sh b/scripts/show_logs_e2eorchestrator.sh old mode 100644 new mode 100755 index 84951ed8dc24c145715c971b76cd22b79a920f98..a69abdc9149fb610ae147d3a18add4157a240ee9 --- a/scripts/show_logs_e2eorchestrator.sh +++ b/scripts/show_logs_e2eorchestrator.sh @@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} # Automated steps start here ######################################################################################################################## -kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/e2eorchestratorservice -c server +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/e2e-orchestratorservice -c server diff --git a/scripts/show_logs_vntmanager.sh b/scripts/show_logs_vntmanager.sh old mode 100644 new mode 100755 index 15469e6473150f72c767f39970f435b3cc143e99..aadc2c578c958fe6b87ac0aaebb4233c27adcca0 --- a/scripts/show_logs_vntmanager.sh +++ b/scripts/show_logs_vntmanager.sh @@ -24,4 +24,4 @@ export TFS_K8S_NAMESPACE=${TFS_K8S_NAMESPACE:-"tfs"} # Automated steps start here ######################################################################################################################## -kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/vntmanagerservice -c server +kubectl --namespace $TFS_K8S_NAMESPACE logs deployment/vnt_managerservice -c server diff --git a/src/common/Constants.py b/src/common/Constants.py index 276603463afde5bcbc1994de09d89cb7c4e2f8d2..babde64e4b2fed5ddf118addcf46e3dcc891f7d5 100644 --- a/src/common/Constants.py +++ b/src/common/Constants.py @@ -58,9 +58,9 @@ class ServiceNameEnum(Enum): CACHING = 'caching' TE = 'te' FORECASTER = 'forecaster' - E2EORCHESTRATOR = 'e2e_orchestrator' + E2EORCHESTRATOR = 'e2e-orchestrator' OPTICALCONTROLLER = 'opticalcontroller' - VNTMANAGER = 'vnt_manager' + VNTMANAGER = 'vnt-manager' BGPLS = 'bgpls-speaker' # Used for test and debugging only diff --git a/src/e2e_orchestrator/requirements.in b/src/e2e_orchestrator/requirements.in index 4c4720a2df4482faeda1ad99f9d383ebb5c0f848..3f780913b761c42a253aa7210bf27f2fa0ffeeed 100644 --- a/src/e2e_orchestrator/requirements.in +++ b/src/e2e_orchestrator/requirements.in @@ -12,4 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -networkx \ No newline at end of file +networkx +websockets==12.0 \ No newline at end of file diff --git a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py index d233f2e17a6485d425013ab4064501bc0bad84bf..cf4475ad02f4b05b5d3f629eaa35109a75c4f2e4 100644 --- a/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py +++ b/src/e2e_orchestrator/service/E2EOrchestratorServiceServicerImpl.py @@ -17,15 +17,17 @@ import logging import networkx as nx import grpc import copy - -from common.Constants import ServiceNameEnum -from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method) +from websockets.sync.client import connect +import time +from common.method_wrappers.Decorator import MetricsPool, safe_and_metered_rpc_method from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply from common.proto.context_pb2 import Empty, Connection, EndPointId from common.proto.e2eorchestrator_pb2_grpc import E2EOrchestratorServiceServicer from context.client.ContextClient import ContextClient from context.service.database.uuids.EndPoint import endpoint_get_uuid - +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply +from common.tools.grpc.Tools import grpc_message_to_json_string +from websockets.sync.server import serve LOGGER = logging.getLogger(__name__) @@ -34,11 +36,26 @@ METRICS_POOL = MetricsPool("E2EOrchestrator", "RPC") context_client: ContextClient = ContextClient() +def event_received(websocket): + for message in websocket: + LOGGER.info("Message received!!!: {}".format(message)) + websocket.send(message) + + class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): def __init__(self): LOGGER.debug("Creating Servicer...") LOGGER.debug("Servicer Created") + time.sleep(15) + try: + LOGGER.info("Requesting subscription") + self.RequestSubscription() + except Exception as E: + LOGGER.info("Exception!: {}".format(E)) + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: endpoints_ids = [] @@ -90,4 +107,38 @@ class E2EOrchestratorServiceServicerImpl(E2EOrchestratorServiceServicer): path.connections.append(conn) - return path + def RequestSubscription(self): + HOST = "10.1.1.83" + PORT = str(8765) + + url = "ws://" + str(HOST) + ":" + str(PORT) + request = VNTSubscriptionRequest() + request.host = HOST + request.port = PORT + LOGGER.info("Trying to connect... to {}".format(url)) + with connect(url, logger=LOGGER) as websocket: + send = grpc_message_to_json_string(request) + LOGGER.info("Sending {}".format(send)) + websocket.send(send) + + try: + message = websocket.recv() + except Exception as e: + LOGGER.info('Exception1!: {}'.format(e)) + + try: + LOGGER.info("Received ws: {}".format(message)) + except Exception as e: + LOGGER.info('Exception2!: {}'.format(e)) + + + with serve(event_received, HOST, PORT, logger=LOGGER) as server: + LOGGER.info("Running subscription server...: {}:{}".format(HOST, str(PORT))) + server.serve_forever() + LOGGER.info("Exiting subscription server...") + + + + + + diff --git a/src/e2e_orchestrator/service/__main__.py b/src/e2e_orchestrator/service/__main__.py index a586543a7078d9b7f868967ad7eea7d228985086..ef01baeaf980d77ceb9aeda0889bd85e625b67f1 100644 --- a/src/e2e_orchestrator/service/__main__.py +++ b/src/e2e_orchestrator/service/__main__.py @@ -43,13 +43,6 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - wait_for_environment_variables( - [ - get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.E2EORCHESTRATOR, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ] - ) - signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) diff --git a/src/nbi/Dockerfile b/src/nbi/Dockerfile index eda4d295697e94b659f67d75025d1944db703afc..33f22953464d56725344df96fff1d41c4c7688b4 100644 --- a/src/nbi/Dockerfile +++ b/src/nbi/Dockerfile @@ -61,6 +61,9 @@ RUN apt-get --yes --quiet --quiet update && \ rm -rf /var/lib/apt/lists/* RUN mkdir -p /var/libyang RUN git clone https://github.com/CESNET/libyang.git /var/libyang +WORKDIR /var/libyang +RUN git fetch +RUN git checkout v2.1.148 RUN mkdir -p /var/libyang/build WORKDIR /var/libyang/build RUN cmake -D CMAKE_BUILD_TYPE:String="Release" .. @@ -86,6 +89,9 @@ COPY src/service/__init__.py service/__init__.py COPY src/service/client/. service/client/ COPY src/slice/__init__.py slice/__init__.py COPY src/slice/client/. slice/client/ +# COPY src/vnt_manager/__init__.py vnt_manager/__init__.py +# COPY src/vnt_manager/client/. vnt_manager/client/ +COPY --chown=teraflow:teraflow ./src/vnt_manager/. vnt_manager RUN mkdir -p /var/teraflow/tests/tools COPY src/tests/tools/mock_osm/. tests/tools/mock_osm/ diff --git a/src/nbi/README.md b/src/nbi/README.md index c5ed72704b7d6048d3829eb8f77710e51753b000..32902a0b33dba2f9ce3df4a60833608bac6e129d 100644 --- a/src/nbi/README.md +++ b/src/nbi/README.md @@ -18,6 +18,9 @@ sudo apt-get install python3-dev gcc python3-cffi ```bash mkdir ~/tfs-ctrl/libyang git clone https://github.com/CESNET/libyang.git ~/tfs-ctrl/libyang +cd ~/tfs-ctrl/libyang +git fetch +git checkout v2.1.148 mkdir ~/tfs-ctrl/libyang/build cd ~/tfs-ctrl/libyang/build cmake -D CMAKE_BUILD_TYPE:String="Release" .. diff --git a/src/nbi/requirements.in b/src/nbi/requirements.in index 6e3eb94404f9d12431c715080cf210a02c7c82f4..37c41550f280ac2242f0f4fb2ed7f7b884f289c8 100644 --- a/src/nbi/requirements.in +++ b/src/nbi/requirements.in @@ -24,3 +24,4 @@ pyang==2.6.0 git+https://github.com/robshakir/pyangbind.git requests==2.27.1 werkzeug==2.3.7 +websockets==12.0 diff --git a/src/nbi/service/NbiServiceServicerImpl.py b/src/nbi/service/NbiServiceServicerImpl.py index d454a4df92dce719acac0cb6d2a39052c3c66e06..2f641d32447d0ad1ffea78baf88636991a97c990 100644 --- a/src/nbi/service/NbiServiceServicerImpl.py +++ b/src/nbi/service/NbiServiceServicerImpl.py @@ -20,7 +20,7 @@ from common.proto.nbi_pb2_grpc import NbiServiceServicer LOGGER = logging.getLogger(__name__) -METRICS_POOL = MetricsPool('Compute', 'RPC') +METRICS_POOL = MetricsPool('NBI', 'RPC') class NbiServiceServicerImpl(NbiServiceServicer): def __init__(self): diff --git a/src/nbi/service/__main__.py b/src/nbi/service/__main__.py index 8834e45a2779c8d422ba1f9878c435f14a2f43db..aa16ee89708eb330775583cb84063cdcd83c0c8b 100644 --- a/src/nbi/service/__main__.py +++ b/src/nbi/service/__main__.py @@ -26,6 +26,7 @@ from .rest_server.nbi_plugins.ietf_l2vpn import register_ietf_l2vpn from .rest_server.nbi_plugins.ietf_l3vpn import register_ietf_l3vpn from .rest_server.nbi_plugins.ietf_network import register_ietf_network from .rest_server.nbi_plugins.ietf_network_slice import register_ietf_nss +from .rest_server.nbi_plugins.context_subscription import register_context_subscription terminate = threading.Event() LOGGER = None @@ -70,6 +71,8 @@ def main(): register_ietf_nss(rest_server) # Registering NSS entrypoint rest_server.start() + register_context_subscription() + # Wait for Ctrl+C or termination signal while not terminate.wait(timeout=1.0): pass diff --git a/src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py b/src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d5deb9352d6018c2ac1165b7326e48e3e0c24c5a --- /dev/null +++ b/src/nbi/service/rest_server/nbi_plugins/context_subscription/__init__.py @@ -0,0 +1,74 @@ +# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from websockets.sync.server import serve +from common.proto.vnt_manager_pb2 import VNTSubscriptionReply, VNTSubscriptionRequest +from common.proto.context_pb2 import Empty + +# from vnt_manager.client.VNTManagerClient import VNTManagerClient +from context.client.ContextClient import ContextClient +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME +from common.tools.object_factory.Topology import json_topology_id +from common.tools.object_factory.Context import json_context_id +from common.proto.context_pb2 import ContextId, TopologyId +import json +import os +from vnt_manager.client.VNTManagerClient import VNTManagerClient + +JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) +ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) +ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID)) + +vnt_manager_client: VNTManagerClient = VNTManagerClient() +context_client: ContextClient = ContextClient() + +HOST = "0.0.0.0" +PORT = 8765 + +LOGGER = logging.getLogger(__name__) + + +def register_context_subscription(): + with serve(subcript_to_vnt_manager, HOST, PORT, logger=LOGGER) as server: + LOGGER.info("Running subscription server...: {}:{}".format(HOST, str(PORT))) + server.serve_forever() + LOGGER.info("Exiting subscription server...") + + +def subcript_to_vnt_manager(websocket): + for message in websocket: + LOGGER.info("Message received: {}".format(message)) + message_json = json.loads(message) + request = VNTSubscriptionRequest() + request.host = message_json['host'] + request.port = message_json['port'] + LOGGER.info("Received gRPC from ws: {}".format(request)) + + reply = VNTSubscriptionReply() + try: + vntm_reply = vnt_manager_client.VNTSubscript(request) + LOGGER.info("Received gRPC from vntm: {}".format(vntm_reply)) + except Exception as e: + LOGGER.error('Could not subscript to VTNManager: {}'.format(e)) + reply.subscription = "NOT OK" + else: + reply.subscription = "OK" + + + websocket.send(reply.subscription) + + + diff --git a/src/nbi/service/rest_server/nbi_plugins/debug_api/Resources.py b/src/nbi/service/rest_server/nbi_plugins/debug_api/Resources.py index 5fb46a30294eeec4e07de2ec1ce8387775726c23..876877c25957f300a7d8a513cc5a93e7fa163b6b 100644 --- a/src/nbi/service/rest_server/nbi_plugins/debug_api/Resources.py +++ b/src/nbi/service/rest_server/nbi_plugins/debug_api/Resources.py @@ -19,6 +19,8 @@ from common.proto.context_pb2 import Empty from common.tools.grpc.Tools import grpc_message_to_json from context.client.ContextClient import ContextClient from service.client.ServiceClient import ServiceClient +from vnt_manager.client.VNTManagerClient import VNTManagerClient + from .Tools import ( format_grpc_to_json, grpc_connection_id, grpc_context_id, grpc_device_id, grpc_link_id, grpc_policy_rule_id, grpc_service_id, grpc_service, grpc_slice_id, grpc_topology_id) @@ -28,6 +30,7 @@ class _Resource(Resource): super().__init__() self.client = ContextClient() self.service_client = ServiceClient() + self.vntmanager_client = VNTManagerClient() class ContextIds(_Resource): def get(self): @@ -186,6 +189,26 @@ class Link(_Resource): def get(self, link_uuid : str): return format_grpc_to_json(self.client.GetLink(grpc_link_id(link_uuid))) +class VirtualLinkIds(_Resource): + def get(self): + return format_grpc_to_json(self.vntmanager_client.ListLinkIds(Empty())) + +class VirtualLinks(_Resource): + def get(self): + return format_grpc_to_json(self.vntmanager_client.ListLinks(Empty())) + +class VirtualLink(_Resource): + def get(self, link_uuid : str): + return format_grpc_to_json(self.vntmanager_client.GetLink(grpc_link_id(link_uuid))) + def post(self, link_uuid : str): + link = request.get_json() + return format_grpc_to_json(self.vntmanager_client.SetLink(grpc_link_id(link))) + def put(self, link_uuid : str): + link = request.get_json() + return format_grpc_to_json(self.vntmanager_client.SetLink(grpc_link_id(link))) + def delete(self, link_uuid : str): + return format_grpc_to_json(self.vntmanager_client.RemoveVirtualLink(grpc_link_id(link_uuid))) + class ConnectionIds(_Resource): def get(self, context_uuid : str, service_uuid : str): return format_grpc_to_json(self.client.ListConnectionIds(grpc_service_id(context_uuid, service_uuid))) diff --git a/src/nbi/service/rest_server/nbi_plugins/debug_api/__init__.py b/src/nbi/service/rest_server/nbi_plugins/debug_api/__init__.py index 1ccf93144eca4017bc41d6e62aa76260a4ce69f5..e420fa949d76b01bf438da2053229481483e7eb7 100644 --- a/src/nbi/service/rest_server/nbi_plugins/debug_api/__init__.py +++ b/src/nbi/service/rest_server/nbi_plugins/debug_api/__init__.py @@ -19,6 +19,7 @@ from .Resources import ( Device, DeviceIds, Devices, DummyContexts, Link, LinkIds, Links, + VirtualLink, VirtualLinkIds, VirtualLinks, PolicyRule, PolicyRuleIds, PolicyRules, Service, ServiceIds, Services, Slice, SliceIds, Slices, @@ -30,38 +31,42 @@ URL_PREFIX = '/debug-api' # Use 'path' type since some identifiers might contain char '/' and Flask is unable to recognize them in 'string' type. RESOURCES = [ # (endpoint_name, resource_class, resource_url) - ('api.context_ids', ContextIds, '/context_ids'), - ('api.contexts', Contexts, '/contexts'), - ('api.dummy_contexts', DummyContexts, '/dummy_contexts'), - ('api.context', Context, '/context/<path:context_uuid>'), + ('api.context_ids', ContextIds, '/context_ids'), + ('api.contexts', Contexts, '/contexts'), + ('api.dummy_contexts', DummyContexts, '/dummy_contexts'), + ('api.context', Context, '/context/<path:context_uuid>'), - ('api.topology_ids', TopologyIds, '/context/<path:context_uuid>/topology_ids'), - ('api.topologies', Topologies, '/context/<path:context_uuid>/topologies'), - ('api.topology', Topology, '/context/<path:context_uuid>/topology/<path:topology_uuid>'), + ('api.topology_ids', TopologyIds, '/context/<path:context_uuid>/topology_ids'), + ('api.topologies', Topologies, '/context/<path:context_uuid>/topologies'), + ('api.topology', Topology, '/context/<path:context_uuid>/topology/<path:topology_uuid>'), - ('api.service_ids', ServiceIds, '/context/<path:context_uuid>/service_ids'), - ('api.services', Services, '/context/<path:context_uuid>/services'), - ('api.service', Service, '/context/<path:context_uuid>/service/<path:service_uuid>'), + ('api.service_ids', ServiceIds, '/context/<path:context_uuid>/service_ids'), + ('api.services', Services, '/context/<path:context_uuid>/services'), + ('api.service', Service, '/context/<path:context_uuid>/service/<path:service_uuid>'), - ('api.slice_ids', SliceIds, '/context/<path:context_uuid>/slice_ids'), - ('api.slices', Slices, '/context/<path:context_uuid>/slices'), - ('api.slice', Slice, '/context/<path:context_uuid>/slice/<path:slice_uuid>'), + ('api.slice_ids', SliceIds, '/context/<path:context_uuid>/slice_ids'), + ('api.slices', Slices, '/context/<path:context_uuid>/slices'), + ('api.slice', Slice, '/context/<path:context_uuid>/slice/<path:slice_uuid>'), - ('api.device_ids', DeviceIds, '/device_ids'), - ('api.devices', Devices, '/devices'), - ('api.device', Device, '/device/<path:device_uuid>'), + ('api.device_ids', DeviceIds, '/device_ids'), + ('api.devices', Devices, '/devices'), + ('api.device', Device, '/device/<path:device_uuid>'), - ('api.link_ids', LinkIds, '/link_ids'), - ('api.links', Links, '/links'), - ('api.link', Link, '/link/<path:link_uuid>'), + ('api.link_ids', LinkIds, '/link_ids'), + ('api.links', Links, '/links'), + ('api.link', Link, '/link/<path:link_uuid>'), - ('api.connection_ids', ConnectionIds, '/context/<path:context_uuid>/service/<path:service_uuid>/connection_ids'), - ('api.connections', Connections, '/context/<path:context_uuid>/service/<path:service_uuid>/connections'), - ('api.connection', Connection, '/connection/<path:connection_uuid>'), + ('api.virtual_link_ids', VirtualLinkIds, '/virtual_link_ids'), + ('api.virtual_links', VirtualLinks, '/virtual_links'), + ('api.virtual_link', VirtualLink, '/virtual_link/<path:virtual_link_uuid>'), - ('api.policyrule_ids', PolicyRuleIds, '/policyrule_ids'), - ('api.policyrules', PolicyRules, '/policyrules'), - ('api.policyrule', PolicyRule, '/policyrule/<path:policyrule_uuid>'), + ('api.connection_ids', ConnectionIds, '/context/<path:context_uuid>/service/<path:service_uuid>/connection_ids'), + ('api.connections', Connections, '/context/<path:context_uuid>/service/<path:service_uuid>/connections'), + ('api.connection', Connection, '/connection/<path:connection_uuid>'), + + ('api.policyrule_ids', PolicyRuleIds, '/policyrule_ids'), + ('api.policyrules', PolicyRules, '/policyrules'), + ('api.policyrule', PolicyRule, '/policyrule/<path:policyrule_uuid>'), ] def register_debug_api(rest_server : RestServer): diff --git a/src/vnt_manager/client/VNTManagerClient.py b/src/vnt_manager/client/VNTManagerClient.py index 95db3b6da667ae73dd0027215bffa20772c735a5..67850a60294407e708d27728e9a11de4277920eb 100644 --- a/src/vnt_manager/client/VNTManagerClient.py +++ b/src/vnt_manager/client/VNTManagerClient.py @@ -18,11 +18,16 @@ import grpc from common.Constants import ServiceNameEnum from common.proto.context_pb2 import Empty -from common.proto.vntmanager_pb2_grpc import VNTManagerServiceStub +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply +from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceStub from common.Settings import get_service_host, get_service_port_grpc from common.tools.client.RetryDecorator import delay_exponential, retry from common.tools.grpc.Tools import grpc_message_to_json # from common.proto.e2eorchestrator_pb2 import E2EOrchestratorRequest, E2EOrchestratorReply +from common.proto.context_pb2 import ( + Link, LinkId, LinkIdList, LinkList, +) +from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) MAX_RETRIES = 15 @@ -57,15 +62,44 @@ class VNTManagerClient: self.channel = None self.stub = None - """ @RETRY_DECORATOR - def Compute(self, request: E2EOrchestratorRequest) -> E2EOrchestratorReply: - LOGGER.info( - "Compute request: {:s}".format(str(grpc_message_to_json(request))) - ) - response = self.stub.Compute(request) - LOGGER.info( - "Compute result: {:s}".format(str(grpc_message_to_json(response))) - ) + def VNTSubscript(self, request: VNTSubscriptionRequest) -> VNTSubscriptionReply: + LOGGER.info("Subscript request: {:s}".format(str(grpc_message_to_json(request)))) + response = self.stub.VNTSubscript(request) + LOGGER.info("Subscript result: {:s}".format(str(grpc_message_to_json(response)))) + return response + + @RETRY_DECORATOR + def ListVirtualLinkIds(self, request: Empty) -> LinkIdList: + LOGGER.debug('ListVirtualLinkIds request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ListVirtualLinkIds(request) + LOGGER.debug('ListVirtualLinkIds result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def ListVirtualLinks(self, request: Empty) -> LinkList: + LOGGER.debug('ListVirtualLinks request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.ListVirtualLinks(request) + LOGGER.debug('ListVirtualLinks result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def GetVirtualLink(self, request: LinkId) -> Link: + LOGGER.debug('GetVirtualLink request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.GetVirtualLink(request) + LOGGER.debug('GetVirtualLink result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def SetVirtualLink(self, request: Link) -> LinkId: + LOGGER.debug('SetVirtualLink request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.SetVirtualLink(request) + LOGGER.debug('SetVirtualLink result: {:s}'.format(grpc_message_to_json_string(response))) + return response + + @RETRY_DECORATOR + def RemoveVirtualLink(self, request: LinkId) -> Empty: + LOGGER.debug('RemoveVirtualLink request: {:s}'.format(grpc_message_to_json_string(request))) + response = self.stub.RemoveVirtualLink(request) + LOGGER.debug('RemoveVirtualLink result: {:s}'.format(grpc_message_to_json_string(response))) return response - """ \ No newline at end of file diff --git a/src/vnt_manager/requirements.in b/src/vnt_manager/requirements.in index 4c4720a2df4482faeda1ad99f9d383ebb5c0f848..3f780913b761c42a253aa7210bf27f2fa0ffeeed 100644 --- a/src/vnt_manager/requirements.in +++ b/src/vnt_manager/requirements.in @@ -12,4 +12,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -networkx \ No newline at end of file +networkx +websockets==12.0 \ No newline at end of file diff --git a/src/vnt_manager/service/VNTManagerService.py b/src/vnt_manager/service/VNTManagerService.py index b61b213a6ff2e64219b2202dbec1bf0d14010d59..0580d1c7faf38e9d63ef9e509950708acdb5fb2b 100644 --- a/src/vnt_manager/service/VNTManagerService.py +++ b/src/vnt_manager/service/VNTManagerService.py @@ -15,7 +15,7 @@ import logging from common.Constants import ServiceNameEnum -from common.proto.vntmanager_pb2_grpc import add_VNTManagerServiceServicer_to_server +from common.proto.vnt_manager_pb2_grpc import add_VNTManagerServiceServicer_to_server from common.Settings import get_service_port_grpc from common.tools.service.GenericGrpcService import GenericGrpcService from .VNTManagerServiceServicerImpl import VNTManagerServiceServicerImpl diff --git a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py index 4869218a7a3bc05527d383f44b93fa7aea1c7f7b..e2b110de062cf7f6725171674f7b2d546c69d4d8 100644 --- a/src/vnt_manager/service/VNTManagerServiceServicerImpl.py +++ b/src/vnt_manager/service/VNTManagerServiceServicerImpl.py @@ -16,15 +16,30 @@ import logging import networkx as nx import grpc -import copy - -from common.Constants import ServiceNameEnum +import time +from websockets.sync.client import connect from common.method_wrappers.Decorator import (MetricsPool, MetricTypeEnum, safe_and_metered_rpc_method) -from common.proto.vntmanager_pb2 import VNTManagerRequest, VNTManagerReply -from common.proto.context_pb2 import Empty, Connection, EndPointId -from common.proto.vntmanager_pb2_grpc import VNTManagerServiceServicer +from common.proto.vnt_manager_pb2 import VNTSubscriptionRequest, VNTSubscriptionReply +from common.proto.vnt_manager_pb2_grpc import VNTManagerServiceServicer +from context.client.ContextClient import ContextClient +from common.proto.context_pb2 import ( + Empty, + Event, EventTypeEnum, + Link, LinkEvent, LinkId, LinkIdList, LinkList, +) +from common.tools.object_factory.Context import json_context_id +from common.tools.object_factory.Topology import json_topology_id +from common.proto.context_pb2 import ContextId, TopologyId +import threading +from common.proto.context_pb2 import ( + ConnectionEvent, ContextEvent, DeviceEvent, EventTypeEnum, ServiceEvent, TopologyEvent) from context.client.ContextClient import ContextClient -from context.service.database.uuids.EndPoint import endpoint_get_uuid +from context.client.EventsCollector import EventsCollector +from common.tests.EventTools import EVENT_CREATE, EVENT_UPDATE, check_events +from common.Constants import DEFAULT_CONTEXT_NAME, DEFAULT_TOPOLOGY_NAME, INTERDOMAIN_TOPOLOGY_NAME +from typing import Any, Dict, Set +from common.proto.dlt_gateway_pb2 import DltRecordEvent, DltRecordOperationEnum, DltRecordTypeEnum +from common.tools.grpc.Tools import grpc_message_to_json_string LOGGER = logging.getLogger(__name__) @@ -33,63 +48,117 @@ METRICS_POOL = MetricsPool("VNTManager", "RPC") context_client: ContextClient = ContextClient() +JSON_ADMIN_CONTEXT_ID = json_context_id(DEFAULT_CONTEXT_NAME) +ADMIN_CONTEXT_ID = ContextId(**JSON_ADMIN_CONTEXT_ID) +ADMIN_TOPOLOGY_ID = TopologyId(**json_topology_id(DEFAULT_TOPOLOGY_NAME, context_id=JSON_ADMIN_CONTEXT_ID)) -class E2EOrchestratorServiceServicerImpl(VNTManagerServiceServicer): - def __init__(self): - LOGGER.debug("Creating Servicer...") - LOGGER.debug("Servicer Created") +GET_EVENT_TIMEOUT = 0.5 - """ - @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) - def Compute(self, request: E2EOrchestratorRequest, context: grpc.ServicerContext) -> E2EOrchestratorReply: - endpoints_ids = [] - for endpoint_id in request.service.service_endpoint_ids: - endpoints_ids.append(endpoint_get_uuid(endpoint_id)[2]) - graph = nx.Graph() - devices = context_client.ListDevices(Empty()).devices +HOST = "10.1.1.83" +PORT = str(8765) + + + +class VNTMEventDispatcher(threading.Thread): + def __init__(self, host, port) -> None: + LOGGER.debug('Creating VTNM connector...') + self.host = host + self.port = port + super().__init__(name='VNTMEventDispatcher', daemon=True) + self._terminate = threading.Event() + LOGGER.debug('VNTM connector created') + + def start(self) -> None: + self._terminate.clear() + return super().start() + + def stop(self): + self._terminate.set() + + def run(self) -> None: + LOGGER.info('Thread running!') + events_collector = EventsCollector( + context_client, log_events_received=True, + activate_context_collector = True, + activate_topology_collector = True, + activate_device_collector = False, + activate_link_collector = False, + activate_service_collector = False, + activate_slice_collector = False, + activate_connection_collector = False,) + events_collector.start() + + while not self._terminate.is_set(): + event = events_collector.get_event(block=True, timeout=GET_EVENT_TIMEOUT) + if event is None: continue + + url = "ws://" + str(self.host) + ":" + str(self.port) + request = VNTSubscriptionRequest() + request.host = self.host + request.port = self.port + LOGGER.info("Sending event to {}".format(url)) + with connect(url, logger=LOGGER) as websocket: + send = grpc_message_to_json_string(request) + LOGGER.info("Sending {}".format(send)) + websocket.send(send) + message = websocket.recv() + LOGGER.info("Received ws: {}".format(message)) - for device in devices: - endpoints_uuids = [endpoint.endpoint_id.endpoint_uuid.uuid - for endpoint in device.device_endpoints] - for ep in endpoints_uuids: - graph.add_node(ep) + - for ep in endpoints_uuids: - for ep_i in endpoints_uuids: - if ep == ep_i: - continue - graph.add_edge(ep, ep_i) + events_collector.stop() - links = context_client.ListLinks(Empty()).links - for link in links: - eps = [] - for endpoint_id in link.link_endpoint_ids: - eps.append(endpoint_id.endpoint_uuid.uuid) - graph.add_edge(eps[0], eps[1]) - shortest = nx.shortest_path(graph, endpoints_ids[0], endpoints_ids[1]) - path = E2EOrchestratorReply() - path.services.append(copy.deepcopy(request.service)) - for i in range(0, int(len(shortest)/2)): - conn = Connection() - ep_a_uuid = str(shortest[i*2]) - ep_z_uuid = str(shortest[i*2+1]) +class VNTManagerServiceServicerImpl(VNTManagerServiceServicer): + def __init__(self): + LOGGER.debug("Creating Servicer...") + LOGGER.debug("Servicer Created") + self.links = [] + + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def VNTSubscript(self, request: VNTSubscriptionRequest, context: grpc.ServicerContext) -> VNTSubscriptionReply: + LOGGER.info('----------------') + LOGGER.info(request) + LOGGER.info('----------------') + reply = VNTSubscriptionReply() + reply.subscription = "OK" + + event_dispatcher = VNTMEventDispatcher(request.host, int(request.port)) + event_dispatcher.start() - conn.connection_id.connection_uuid.uuid = str(ep_a_uuid) + '_->_' + str(ep_z_uuid) - ep_a_id = EndPointId() - ep_a_id.endpoint_uuid.uuid = ep_a_uuid - conn.path_hops_endpoint_ids.append(ep_a_id) + return reply - ep_z_id = EndPointId() - ep_z_id.endpoint_uuid.uuid = ep_z_uuid - conn.path_hops_endpoint_ids.append(ep_z_id) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def ListVirtualLinkIds(self, request : Empty, context : grpc.ServicerContext) -> LinkIdList: + return LinkIdList(link_ids=[link.link_id for link in self.links]) + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def ListVirtualLinks(self, request : Empty, context : grpc.ServicerContext) -> LinkList: + return LinkList(link=self.links) - path.connections.append(conn) + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def GetVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Link: + for link in self.links: + if link.link_uuid.uuid == request.uuid: + return link + return Empty() - return path - """ \ No newline at end of file + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def SetVirtualLink(self, request : Link, context : grpc.ServicerContext) -> LinkId: + self.links.append(request) + return request.linkd_id + + @safe_and_metered_rpc_method(METRICS_POOL, LOGGER) + def RemoveVirtualLink(self, request : LinkId, context : grpc.ServicerContext) -> Empty: + for link in self.links: + if link.link_uuid.uuid == request.uuid: + self.links.remove(link) + return Empty() + return Empty() + diff --git a/src/vnt_manager/service/__main__.py b/src/vnt_manager/service/__main__.py index 03fb4dd5da97866a28ce9d2edbefd702bbd75a4b..66d3e435ce254e592cc66863873816dc165e50a6 100644 --- a/src/vnt_manager/service/__main__.py +++ b/src/vnt_manager/service/__main__.py @@ -43,12 +43,6 @@ def main(): logging.basicConfig(level=log_level) LOGGER = logging.getLogger(__name__) - wait_for_environment_variables( - [ - get_env_var_name(ServiceNameEnum.VNTMANAGER, ENVVAR_SUFIX_SERVICE_HOST), - get_env_var_name(ServiceNameEnum.VNTMANAGER, ENVVAR_SUFIX_SERVICE_PORT_GRPC), - ] - ) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @@ -59,7 +53,7 @@ def main(): metrics_port = get_metrics_port() start_http_server(metrics_port) - # Starting CentralizedCybersecurity service + # Starting VNTManager service grpc_service = VNTManagerService() grpc_service.start() LOGGER.info("Started...")