Newer
Older
# Copyright 2022-2024 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging,threading, queue
import logging,threading
import grpc
import logging
from .JavaRunner import JavaRunner
LOGGER = logging.getLogger(__name__)
#_ONE_DAY_IN_SECONDS = 60 * 60 * 24
#SERVER_ADDRESS = 'localhost:2021'
class GrpcServer():
def __init__(self) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__out_samples = queue.Queue()
self.__server=grpc.aio.server()
self.__runnerList=[]
def connectToJavaPcep(self, address):
runner = JavaRunner(address)
# Sets IpAddress in XML config file for java program
runner.setPeer(address)
process_thread = threading.Thread(target=runner.execPcep)
process_thread.start()
self.__runnerList.append(runner)
return process_thread
'''
def ConnectThread(self) -> bool:
# TODO: Metodos necesarios para conectarte al speaker
# If started, assume it is already connected
if self.__started.is_set(): return True
self.__started.set()
self.__server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
grpcService_pb2_grpc.add_pceServiceServicer_to_server(self, self.__server)
self.__server.add_insecure_port(SERVER_ADDRESS)
LOGGER.info("Starting server on %s", SERVER_ADDRESS)
self.__server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
LOGGER.info("DISCONNECT")
self.Disconnect()
return True
def Connect(self):
grpcThread = threading.Thread(target=self.ConnectThread)
grpcThread.start()
LOGGER.info("Calling the JavaRunner")
return True
def Disconnect(self) -> bool:
self.__terminate.set()
# If not started, assume it is already disconnected
if not self.__started.is_set(): return True
LOGGER.info("Keyboard interrupt, stop server")
self.__server.stop(0)
# Disconnect triggers deactivation of sampling events
# self.__scheduler.shutdown()
# exit(0)
return True
def update():
with grpc.insecure_channel('localhost:10060') as channel:
#n = "initiate lsp directo 10.95.90.56 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13"
n = "terminate lsp 10.95.90.56 0 nombre"
#n="create candidatepath 10.95.90.56 1.1.1.1 4 97 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2"
stub = grpcService_pb2_grpc.pceServiceStub(channel)
request = grpcService_pb2.commandRequest(command=n)
print("updateService req: " ,request)
response = stub.update(request)
print("updateService client received: " ,response.commandResp)
def requestToJavaPcep(self,message):
with grpc.insecure_channel('localhost:10060') as channel:
#n = "initiate lsp largo2 10.95.90.56 1.1.1.1 1.1.1.2 m69644288 nn1.1.1.3 m69640192 nn1.1.1.2"
#n = "initiate lsp directo 10.95.90.56 1.1.1.1 1.1.1.3 m1228800 na192.168.3.11-192.168.3.13"
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
LOGGER.debug("LLego al request")
stub = grpcService_pb2_grpc.pceServiceStub(channel)
LOGGER.debug("updateService req 2: %s" ,message)
request = grpcService_pb2.commandRequest(command=message)
LOGGER.debug("updateService req 2: %s" ,request)
response = stub.update(request)
LOGGER.debug("updateServide client received: %s" ,response.commandResp)
LOGGER.debug("updateServide client received IP: %s" ,response.ipAddress)
return response.ipAddress
def terminateRunners(self):
for runner in self.__runnerList:
runner.endBGPSpeaker()
return True
def terminateGrpcServer(self):
LOGGER.debug("Terminating java programs...")
self.terminateRunners()
LOGGER.debug("Disconnecting grpc server...")
self.Disconnect()
return True
def terminateRunnerById(self,speaker_id):
"""
Disconnect from BGP-LS speaker given an speaker Id. Its the same
as the java running proccess PID.
"""
for runner in self.__runnerList:
if(runner.getPid()==speaker_id):
runner.endBGPSpeaker()
self.__runnerList.remove(runner)
return True
'''