Newer
Older
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import grpc, logging
from concurrent import futures
from queue import Queue
from monitoring.Config import GRPC_SERVICE_PORT, GRPC_MAX_WORKERS, GRPC_GRACE_PERIOD
from monitoring.proto.monitoring_pb2_grpc import add_MonitoringServiceServicer_to_server
from .MockMonitoringServiceServicerImpl import MockMonitoringServiceServicerImpl
BIND_ADDRESS = '0.0.0.0'
LOGGER = logging.getLogger(__name__)
class MockMonitoringService:
def __init__(
self, address=BIND_ADDRESS, port=GRPC_SERVICE_PORT, max_workers=GRPC_MAX_WORKERS,
grace_period=GRPC_GRACE_PERIOD):
self.queue_samples = Queue()
self.address = address
self.port = port
self.endpoint = None
self.max_workers = max_workers
self.grace_period = grace_period
self.monitoring_servicer = None
self.pool = None
self.server = None
def start(self):
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(self.port))
LOGGER.info('Starting Service (tentative endpoint: {:s}, max_workers: {:s})...'.format(
str(self.endpoint), str(self.max_workers)))
self.pool = futures.ThreadPoolExecutor(max_workers=self.max_workers)
self.server = grpc.server(self.pool) # , interceptors=(tracer_interceptor,))
self.monitoring_servicer = MockMonitoringServiceServicerImpl(self.queue_samples)
add_MonitoringServiceServicer_to_server(self.monitoring_servicer, self.server)
port = self.server.add_insecure_port(self.endpoint)
self.endpoint = '{:s}:{:s}'.format(str(self.address), str(port))
LOGGER.info('Listening on {:s}...'.format(str(self.endpoint)))
self.server.start()
LOGGER.debug('Service started')
def stop(self):
LOGGER.debug('Stopping service (grace period {:s} seconds)...'.format(str(self.grace_period)))
self.server.stop(self.grace_period)
LOGGER.debug('Service stopped')