Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# Copyright 2022-2023 ETSI TeraFlowSDN - TFS OSG (https://tfs.etsi.org/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json, logging,threading, queue,time,signal
from datetime import datetime, timedelta
from typing import Any, Iterator, List, Optional, Tuple, Union
# from apscheduler.executors.pool import ThreadPoolExecutor
# from apscheduler.job import Job
# from apscheduler.jobstores.memory import MemoryJobStore
# from apscheduler.schedulers.background import BackgroundScheduler
# from common.method_wrappers.Decorator import MetricTypeEnum, MetricsPool, metered_subclass_method, INF
# from common.type_checkers.Checkers import chk_float, chk_length, chk_string, chk_type
import logging,threading
import grpc
from bgpls_speaker.service.tools.DiscoveredDBManager import DiscoveredDBManager
from .protos import grpcService_pb2_grpc
from .protos import grpcService_pb2
from .Tools import UpdateRequest
from concurrent import futures
import os
import subprocess
from multiprocessing import Pool
import logging
from .JavaRunner import JavaRunner
LOGGER = logging.getLogger(__name__)
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
SERVER_ADDRESS = 'localhost:2021'
class GrpcServer():
"""
This class gets the current topology from a bgps speaker module in java
and updates the posible new devices to add in the context topology.
Needs the address, port and as_number from the device that will provide the information via bgpls
to the java module.
"""
def __init__(self,DiscoveredDB : DiscoveredDBManager) -> None: # pylint: disable=super-init-not-called
self.__lock = threading.Lock()
self.__started = threading.Event()
self.__terminate = threading.Event()
self.__out_samples = queue.Queue()
self.__server=grpc.aio.server()
# self.__address="10.95.86.214"
# self.__port=179
# self.__asNumber=65006
# self.__configFile="TMConfiguration_guillermo.xml"
# self.__process=0
self.__javaLocalPort=0 # --> BGP4Port in XML file
self.__mngPort=0 # Port used in XML config file for management (NOT used in TFS)
self.__runnerList=[]
# Data base for saving all new devices discovered
self.__discoveredDB=DiscoveredDB
# self.__comms=grpcComms
# Este tendría que tener la info del runner al que se connecta¿
def ConnectThread(self) -> bool:
# TODO: Metodos necesarios para conectarte al speaker
# If started, assume it is already connected
if self.__started.is_set(): return True
self.__started.set() #notifyAll -->event.is_set()
# 10 workers ?
self.__server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
grpcService_pb2_grpc.add_updateServiceServicer_to_server(self, self.__server)
self.__server.add_insecure_port(SERVER_ADDRESS)
# server.add_secure_port(SERVER_ADDRESS)
LOGGER.info("Starting server on %s", SERVER_ADDRESS)
self.__server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
LOGGER.info("DISCONNECT")
self.Disconnect()
return True
def Connect(self):
grpcThread = threading.Thread(target=self.ConnectThread)
grpcThread.start()
return True
def Disconnect(self) -> bool:
self.__terminate.set()
# If not started, assume it is already disconnected
if not self.__started.is_set(): return True
LOGGER.info("Keyboard interrupt, stop server")
self.__server.stop(0)
# Disconnect triggers deactivation of sampling events
# self.__scheduler.shutdown()
# exit(0)
return True
def update(self,request, context) -> bool:
"""
Processes the messages recived by de grpc server
"""
with self.__lock:
#TODO: Get update
LOGGER.info("(server) Update message from bgpls speaker: \n %s" % (request))
response = grpcService_pb2.updateResponse(ack="OK")
update_request = UpdateRequest.from_proto(request)
self.__discoveredDB.AddToDB(update_request)
# LOGGER.debug("Update class string %s",update_request.toString())
return response
def connectToJavaBgpls(self, address : str = "10.95.86.214", port : str = "179", asNumber : str = "65006"):
# Get unused* port
self.setLocalPort()
runner = JavaRunner(self.__javaLocalPort,address,self.__mngPort)
# Sets port in XML config file for java program
runner.setAsNumber(asNumber)
runner.setPort(port)
runner.setPeer()
process=runner.execBGPLSpeaker()
self.__runnerList.append(runner)
return process.pid
def terminateRunners(self):
for runner in self.__runnerList:
runner.endBGPSpeaker()
return True
def terminateGrpcServer(self):
LOGGER.debug("Terminating java programs...")
self.terminateRunners()
LOGGER.debug("Disconnecting grpc server...")
self.Disconnect()
return True
def terminateRunnerById(self,speaker_id):
"""
Disconnect from BGP-LS speaker given an speaker Id. Its the same
as the java running proccess PID.
"""
for runner in self.__runnerList:
if(runner.getPid()==speaker_id):
runner.endBGPSpeaker()
self.__runnerList.remove(runner)
return True
def setLocalPort(self,initPort=12179):
"""
If java already running add 1 to current used port,
else initialize port .
initPort --> BGP4Port, usually 179 corresponding to BGP
"""
with self.__lock:
if(self.__runnerList):
LOGGER.debug("Port exists %s",self.__javaLocalPort)
lastRunner=self.__runnerList[-1]
self.__javaLocalPort=lastRunner.getCurrentLocalPort()+1
self.__mngPort=lastRunner.getCurrentMngPort()+1
else:
LOGGER.debug("Port DONT exists %s",self.__javaLocalPort)
self.__javaLocalPort=initPort
self.__mngPort=1112 # default management port
return self.__javaLocalPort
def getSpeakerListIds(self):
return [runner.getPid() for runner in self.__runnerList]
def getSpeakerFromId(self,speaker_id):
"""
Returns address,as_number,peer_port
"""
for runner in self.__runnerList:
if(runner.getPid()==speaker_id):
return runner.getRunnerInfo()
return None
def getSpeakerIdFromIpAddr(self,addr):
"""
Returns Id from the speaker IP Address
"""
for runner in self.__runnerList:
ip_addr,asN,port=runner.getRunnerInfo()
if(ip_addr==addr):
return runner.getPid()
return