Newer
Older

Lluis Gifre Renom
committed
import grpc
import logging
from concurrent import futures
from grpc_health.v1.health import HealthServicer, OVERALL_HEALTH
from grpc_health.v1.health_pb2 import HealthCheckResponse
from grpc_health.v1.health_pb2_grpc import add_HealthServicer_to_server
from device.proto.device_pb2_grpc import add_DeviceServiceServicer_to_server
from device.service.DeviceServiceServicerImpl import DeviceServiceServicerImpl
from device.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD

Lluis Gifre Renom
committed
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class DeviceService:
def __init__(self, database, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):

Lluis Gifre Renom
committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
self.database = database
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.device_servicer = None
self.health_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{}:{}'.format(self.address, self.port)
LOGGER.debug('Starting Service (tentative endpoint: {}, max_workers: {})...'.format(
self.endpoint, self.max_workers))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.device_servicer = DeviceServiceServicerImpl(self.database)
add_DeviceServiceServicer_to_server(self.device_servicer, self.server)
self.health_servicer = HealthServicer(
experimental_non_blocking=True, experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=1))
add_HealthServicer_to_server(self.health_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{}:{}'.format(self.address, port)
LOGGER.info('Listening on {}...'.format(self.endpoint))
self.server.start()
self.health_servicer.set(OVERALL_HEALTH, HealthCheckResponse.SERVING) # pylint: disable=maybe-no-member
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {} seconds)...'.format(self.grace_period))
self.health_servicer.enter_graceful_shutdown()
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')