diff --git a/common_requirements.in b/common_requirements.in index e1bcad78bfc23217633fb28ef28a2d70d070644a..b277265768c9726f17ab046d8aa932167615f523 100644 --- a/common_requirements.in +++ b/common_requirements.in @@ -15,6 +15,7 @@ coverage==6.3 grpcio==1.47.* grpcio-health-checking==1.47.* +grpcio-reflection==1.47.* grpcio-tools==1.47.* grpclib==0.4.4 prettytable==3.5.0 diff --git a/src/common/tools/service/GenericGrpcService.py b/src/common/tools/service/GenericGrpcService.py index f29582fff87f4ca89ee44c78adbec33f321a9a39..453309127ccf49272d004740c1e3be52cba26779 100644 --- a/src/common/tools/service/GenericGrpcService.py +++ b/src/common/tools/service/GenericGrpcService.py @@ -12,18 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Union import grpc, logging from concurrent import futures +from typing import Any, List, Optional, Union from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from grpc_reflection.v1alpha import reflection from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers class GenericGrpcService: def __init__( - self, bind_port : Union[str, int], bind_address : Optional[str] = None, max_workers : Optional[int] = None, - grace_period : Optional[int] = None, enable_health_servicer : bool = True, cls_name : str = __name__ + self, bind_port : Union[str, int], bind_address : Optional[str] = None, + max_workers : Optional[int] = None, grace_period : Optional[int] = None, + enable_health_servicer : bool = True, enable_reflection : bool = True, + cls_name : str = __name__ ) -> None: self.logger = logging.getLogger(cls_name) self.bind_port = bind_port @@ -31,6 +34,8 @@ class GenericGrpcService: self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.enable_health_servicer = enable_health_servicer + self.enable_reflection = enable_reflection + self.reflection_service_names : List[str] = [reflection.SERVICE_NAME] self.endpoint = None self.health_servicer = None self.pool = None @@ -39,6 +44,11 @@ class GenericGrpcService: def install_servicers(self): pass + def add_reflection_service_name(self, service_descriptor : Any, service_name : str): + self.reflection_service_names.append( + service_descriptor.services_by_name[service_name].full_name + ) + def start(self): self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( @@ -54,6 +64,9 @@ class GenericGrpcService: experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) add_HealthServicer_to_server(self.health_servicer, self.server) + if self.enable_reflection: + reflection.enable_server_reflection(self.reflection_service_names, self.server) + self.bind_port = self.server.add_insecure_port(self.endpoint) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.logger.info('Listening on {:s}...'.format(str(self.endpoint))) diff --git a/src/common/tools/service/GenericGrpcServiceAsync.py b/src/common/tools/service/GenericGrpcServiceAsync.py index 1d652deb79e1389e2403d1d13debcba7dbc43ecc..551a3d568612f59c2bc26f692ab8d1d27dc4f4b3 100644 --- a/src/common/tools/service/GenericGrpcServiceAsync.py +++ b/src/common/tools/service/GenericGrpcServiceAsync.py @@ -12,19 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, Union -import grpc -import logging +import grpc, logging from concurrent import futures +from typing import Any, List, Optional, Union from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH from grpc_health.v1.health_pb2 import HealthCheckResponse from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server +from grpc_reflection.v1alpha import reflection from common.Settings import get_grpc_bind_address, get_grpc_grace_period, get_grpc_max_workers class GenericGrpcServiceAsync: def __init__( - self, bind_port: Union[str, int], bind_address: Optional[str] = None, max_workers: Optional[int] = None, - grace_period: Optional[int] = None, enable_health_servicer: bool = True, cls_name: str = __name__ + self, bind_port : Union[str, int], bind_address : Optional[str] = None, + max_workers : Optional[int] = None, grace_period : Optional[int] = None, + enable_health_servicer : bool = True, enable_reflection : bool = True, + cls_name : str = __name__ ) -> None: self.logger = logging.getLogger(cls_name) self.bind_port = bind_port @@ -32,6 +34,8 @@ class GenericGrpcServiceAsync: self.max_workers = get_grpc_max_workers() if max_workers is None else max_workers self.grace_period = get_grpc_grace_period() if grace_period is None else grace_period self.enable_health_servicer = enable_health_servicer + self.enable_reflection = enable_reflection + self.reflection_service_names : List[str] = [reflection.SERVICE_NAME] self.endpoint = None self.health_servicer = None self.pool = None @@ -40,7 +44,12 @@ class GenericGrpcServiceAsync: async def install_servicers(self): pass - async def start(self): + def add_reflection_service_name(self, service_descriptor : Any, service_name : str): + self.reflection_service_names.append( + service_descriptor.services_by_name[service_name].full_name + ) + + def start(self): self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.logger.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format( str(self.endpoint), str(self.max_workers))) @@ -55,6 +64,9 @@ class GenericGrpcServiceAsync: experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1)) add_HealthServicer_to_server(self.health_servicer, self.server) + if self.enable_reflection: + reflection.enable_server_reflection(self.reflection_service_names, self.server) + self.bind_port = self.server.add_insecure_port(self.endpoint) self.endpoint = '{:s}:{:s}'.format(str(self.bind_address), str(self.bind_port)) self.logger.info('Listening on {:s}...'.format(str(self.endpoint)))