Newer
Older
import json
import logging
import os
import re
JorgeEcheva26
committed
import socket
log_path = 'logs/builder_logs.log'
log_dir = os.path.dirname(log_path)
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=logging.NOTSET, # Minimum severity level to log
# Log message format
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path), # Log to a file
logging.StreamHandler() # Also display in the console
]
)
REQUIRED_COMPONENTS = ["openapi", "info", "servers", "paths", "components"]
def __init__(self, api_path):
self.api_path = os.path.abspath(api_path)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.DEBUG)
self.api_info = self.__load_api_file(self.api_path)
self.__validate_api_info()
JorgeEcheva26
committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __validate_ip_port(self, ip, port):
"""Validates if an IP address and port are correctly formatted."""
try:
socket.inet_aton(ip) # Validate IPv4
return isinstance(port, int) and 0 < port < 65536
except socket.error:
return False
def __parse_url(self, url):
"""
Parses the URL to extract apiRoot, apiName, and apiVersion.
Handles:
- URLs with or without ports.
- API root as an IP, FQDN, or preconfigured prefix.
- Assigns default ports for HTTP (80) and HTTPS (443) if missing.
"""
pattern = r"^(https?:\/\/)?([\w\.-]+|\[.*\])(?::(\d+))?(\/[\w\/-]+)?\/([\w-]+)\/(v\d+)$"
match = re.match(pattern, url)
if not match:
self.logger.error(f"Invalid URL format: {url}")
return None, None, None, None, None, None, None
scheme, host, port, api_root, api_name, api_version = match.groups()
# Asignar puerto por defecto si no está presente
if not port:
port = 443 if scheme == "https" else 80 # Si no hay esquema, asumimos HTTPS
else:
port = int(port)
if not api_name or not api_version:
self.logger.error(f"URL must contain API name and version in the format: <apiRoot>/<apiName>/v<version>")
return None, None, None, None, None, None, None
# Si apiRoot está presente, eliminar "/" innecesarios
api_root = api_root.strip('/') if api_root else None
if re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host): # IPv4
return host, port, None, None, api_root, api_name, api_version
elif re.match(r"^\[.*\]$", host): # IPv6 (brackets format)
return None, port, None, host.strip("[]"), api_root, api_name, api_version
else: # FQDN
return None, port, host, None, api_root, api_name, api_version
def build(self, url, supported_features, api_supp_features):
"""
Builds the API description and saves it to a JSON file.
JorgeEcheva26
committed
Extracts apiRoot, apiName, and apiVersion.
Supports IP, FQDN, and URLs with or without explicit ports.
JorgeEcheva26
committed
# Validación de campos requeridos
if not supported_features or not api_supp_features:
self.logger.error("Both 'supported_features' and 'api_supp_features' are required. Aborting build.")
return
JorgeEcheva26
committed
# Parseamos la URL
ip, port, fqdn, ipv6Addr, api_root, api_name, api_version = self.__parse_url(url)
# Validamos que al menos una dirección sea válida
if not (ip or fqdn or ipv6Addr or api_root):
self.logger.error("Invalid URL: No valid IP, IPv6, FQDN, or API root found. Aborting build.")
JorgeEcheva26
committed
# Validamos IP y puerto si es IPv4
self.logger.error("Invalid IP or port. Aborting build.")
return
JorgeEcheva26
committed
# Construcción de la API
JorgeEcheva26
committed
"apiName": api_name,
"aefProfiles": self.__build_aef_profiles(ip, port, api_version, fqdn, ipv6Addr),
"description": self.api_info["info"].get("description", "No description provided"),
"shareableInfo": {
"isShareable": True,
"capifProvDoms": ["string"]
},
"serviceAPICategory": "string",
"pubApiPath": {
"ccfIds": ["string"]
},
"ccfId": "string"
}
JorgeEcheva26
committed
# Guardamos los datos en un archivo JSON
with open(f"{api_name}.json", "w") as outfile:
json.dump(api_data, outfile, indent=4)
self.logger.info(f"API description saved to {api_name}.json")
except Exception as e:
self.logger.error(f"An error occurred during the build process: {e}")
def __load_api_file(self, api_file: str):
"""Loads the Swagger API configuration file and converts YAML to JSON format if necessary."""
if api_file.endswith('.yaml') or api_file.endswith('.yml'):
yaml_content = yaml.safe_load(file)
return json.loads(json.dumps(yaml_content)) # Convert YAML to JSON format
elif api_file.endswith('.json'):
return json.load(file)
else:
self.logger.warning(
f"Unsupported file extension for {api_file}. Only .yaml, .yml, and .json are supported.")
return {}
except FileNotFoundError:
self.logger.warning(
f"Configuration file {api_file} not found. Using defaults or environment variables.")
return {}
except (json.JSONDecodeError, yaml.YAMLError) as e:
self.logger.error(
f"Error parsing the configuration file {api_file}: {e}")
return {}
def __validate_api_info(self):
"""Validates that all required components are present in the API specification."""
missing_components = [comp for comp in self.REQUIRED_COMPONENTS if comp not in self.api_info]
if missing_components:
self.logger.warning(f"Missing components in API specification: {', '.join(missing_components)}")
else:
self.logger.info("All required components are present in the API specification.")
JorgeEcheva26
committed
def __build_aef_profiles(self, ip, port, api_version, fqdn=None, ipv6Addr=None, ):
"""Builds the aefProfiles section based on the paths and components in the API info."""
aef_profiles = []
for path, methods in self.api_info.get("paths", {}).items():
for method, details in methods.items():
resource = {
"resourceName": details.get("summary", "Unnamed Resource"),
"commType": "REQUEST_RESPONSE",
"uri": path,
"custOpName": f"http_{method}",
"operations": [method.upper()],
"description": details.get("description", "")
}
resources.append(resource)
# Create interface description based on the standard
interface_description = {
"port": port,
"securityMethods": ["OAUTH"]
}
# Include ipv4Addr, ipv6Addr, or fqdn as per the standard
if ip:
interface_description["ipv4Addr"] = ip
elif ipv6Addr:
interface_description["ipv6Addr"] = ipv6Addr
elif fqdn:
interface_description["fqdn"] = fqdn
else:
raise ValueError("At least one of ipv4Addr, ipv6Addr, or fqdn must be provided.")
# Example profile creation based on paths, customize as needed
aef_profile = {
"aefId": "", # Placeholder AEF ID
"versions": [
{
JorgeEcheva26
committed
"apiVersion": f"{api_version}",
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"expiry": "2100-11-30T10:32:02.004Z",
"resources": resources,
"custOperations": [
{
"commType": "REQUEST_RESPONSE",
"custOpName": "string",
"operations": ["POST"],
"description": "string"
},
{
"commType": "REQUEST_RESPONSE",
"custOpName": "check-authentication",
"operations": [
"POST"
],
"description": "Check authentication request."
},
{
"commType": "REQUEST_RESPONSE",
"custOpName": "revoke-authentication",
"operations": [
"POST"
],
"description": "Revoke authorization for service APIs."
}
]
}
],
"protocol": "HTTP_1_1",
"dataFormat": "JSON",
return aef_profiles
def __validate_ip_port(self, ip, port):
"""Validates that the IP and port have the correct format."""
ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$")
# Validate IP
if not ip_pattern.match(ip):
self.logger.warning(f"Invalid IP format: {ip}. Expected IPv4 format.")
return False
# Validate each octet in the IP address
if any(int(octet) > 255 or int(octet) < 0 for octet in ip.split(".")):
self.logger.warning(f"IP address out of range: {ip}. Each octet should be between 0 and 255.")
return False
# Validate Port
if not (1 <= port <= 65535):
self.logger.warning(f"Invalid port number: {port}. Port should be between 1 and 65535.")
return False
self.logger.info("IP and port have correct format.")
return True