Skip to content
Snippets Groups Projects
Commit 6f9c0bd2 authored by Lluis Gifre Renom's avatar Lluis Gifre Renom
Browse files

Common framework:

- Extend generic gRPC services to enable gRPC reflection
- Add grpc-reflection dependency
parent ba137f9e
No related branches found
No related tags found
2 merge requests!359Release TeraFlowSDN 5.0,!312Resolve "Enable gRPC reflection in Python-based services"
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
coverage==6.3 coverage==6.3
grpcio==1.47.* grpcio==1.47.*
grpcio-health-checking==1.47.* grpcio-health-checking==1.47.*
grpcio-reflection==1.47.*
grpcio-tools==1.47.* grpcio-tools==1.47.*
grpclib==0.4.4 grpclib==0.4.4
prettytable==3.5.0 prettytable==3.5.0
......
...@@ -12,18 +12,21 @@ ...@@ -12,18 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Optional, Union
import grpc, logging import grpc, logging
from concurrent import futures from concurrent import futures
from typing import Any, List, Optional, Union
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from grpc_reflection.v1alpha import reflection
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcService: class GenericGrpcService:
def __init__( def __init__(
self, bind_port : Union[str, int], bind_address : Optional[str] = None, max_workers : Optional[int] = None, self, bind_port : Union[str, int], bind_address : Optional[str] = None,
grace_period : Optional[int] = None, enable_health_servicer : bool = True, cls_name : str = __name__ max_workers : Optional[int] = None, grace_period : Optional[int] = None,
enable_health_servicer : bool = True, enable_reflection : bool = True,
cls_name : str = __name__
) -> None: ) -> None:
self.logger = logging.getLogger(cls_name) self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port self.bind_port = bind_port
...@@ -31,6 +34,8 @@ class GenericGrpcService: ...@@ -31,6 +34,8 @@ class GenericGrpcService:
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer self.enable_health_servicer = enable_health_servicer
self.enable_reflection = enable_reflection
self.reflection_service_names : List[str] = [reflection.SERVICE_NAME]
self.endpoint = None self.endpoint = None
self.health_servicer = None self.health_servicer = None
self.pool = None self.pool = None
...@@ -39,6 +44,11 @@ class GenericGrpcService: ...@@ -39,6 +44,11 @@ class GenericGrpcService:
def install_servicers(self): def install_servicers(self):
pass pass
def add_reflection_service_name(self, service_descriptor : Any, service_name : str):
self.reflection_service_names.append(
service_descriptor.services_by_name[service_name].full_name
)
def start(self): def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
...@@ -54,6 +64,9 @@ class GenericGrpcService: ...@@ -54,6 +64,9 @@ class GenericGrpcService:
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server) add_HealthServicer_to_server(self.health_servicer, self.server)
if self.enable_reflection:
reflection.enable_server_reflection(self.reflection_service_names, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint) self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
......
...@@ -12,19 +12,21 @@ ...@@ -12,19 +12,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Optional, Union import grpc, logging
import grpc
import logging
from concurrent import futures from concurrent import futures
from typing import Any, List, Optional, Union
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from grpc_reflection.v1alpha import reflection
from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers
class GenericGrpcServiceAsync: class GenericGrpcServiceAsync:
def __init__( def __init__(
self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None, self, bind_port : Union[str, int], bind_address : Optional[str] = None,
grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__ max_workers : Optional[int] = None, grace_period : Optional[int] = None,
enable_health_servicer : bool = True, enable_reflection : bool = True,
cls_name : str = __name__
) -> None: ) -> None:
self.logger = logging.getLogger(cls_name) self.logger = logging.getLogger(cls_name)
self.bind_port = bind_port self.bind_port = bind_port
...@@ -32,6 +34,8 @@ class GenericGrpcServiceAsync: ...@@ -32,6 +34,8 @@ class GenericGrpcServiceAsync:
self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers
self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period
self.enable_health_servicer = enable_health_servicer self.enable_health_servicer = enable_health_servicer
self.enable_reflection = enable_reflection
self.reflection_service_names : List[str] = [reflection.SERVICE_NAME]
self.endpoint = None self.endpoint = None
self.health_servicer = None self.health_servicer = None
self.pool = None self.pool = None
...@@ -40,7 +44,12 @@ class GenericGrpcServiceAsync: ...@@ -40,7 +44,12 @@ class GenericGrpcServiceAsync:
async def install_servicers(self): async def install_servicers(self):
pass pass
async def start(self): def add_reflection_service_name(self, service_descriptor : Any, service_name : str):
self.reflection_service_names.append(
service_descriptor.services_by_name[service_name].full_name
)
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers))) str(self.endpoint), str(self.max_workers)))
...@@ -55,6 +64,9 @@ class GenericGrpcServiceAsync: ...@@ -55,6 +64,9 @@ class GenericGrpcServiceAsync:
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server) add_HealthServicer_to_server(self.health_servicer, self.server)
if self.enable_reflection:
reflection.enable_server_reflection(self.reflection_service_names, self.server)
self.bind_port = self.server.add_insecure_port(self.endpoint) self.bind_port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port))
self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment