Loading integration_tests/Dockerfiledeleted 100644 → 0 +0 −14 Original line number Diff line number Diff line FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye RUN apt update RUN apt install -y git RUN git clone https://labs.etsi.org/rep/ocf/sdk.git WORKDIR /sdk RUN python -m pip install --upgrade pip RUN pip install -r installation/requirements.txt RUN pip install . ENTRYPOINT ["tail", "-f", "/dev/null"] integration_tests/Dockerfile2deleted 100644 → 0 +0 −8 Original line number Diff line number Diff line FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye RUN apt update RUN apt install -y git COPY setup.sh . ENTRYPOINT ["bash", "setup.sh"] integration_tests/setup.shdeleted 100644 → 0 +0 −29 Original line number Diff line number Diff line #!/bin/bash set -euo pipefail SDK_DIR="/sdk" SDK_REPO="https://labs.etsi.org/rep/ocf/sdk.git" SDK_JUST_INSTALLED="true" if [[ -d "$SDK_DIR/.git" ]]; then echo "OpenCAPIF SDK repository exists. Updating..." git -C "$SDK_DIR" pull SDK_JUST_INSTALLED="false" else echo "OpenCAPIF SDK repository not found. Cloning..." git clone "$SDK_REPO" "$SDK_DIR" fi ### !!! # python -m build echo "Installing OpenCAPIF SDK dependencies..." python -m pip install -r "$SDK_DIR/installation/requirements.txt" python -m pip install "$SDK_DIR" [[ "$SDK_JUST_INSTALLED" == "true" ]] && action_text="installation completed" || action_text="updated" echo "OpenCAPIF SDK $action_text successfully." tail -f /dev/null # TODO: Replace with actual entrypoint command (tests) opencapif_sdk/__init__.py +6 −4 Original line number Diff line number Diff line from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature from opencapif_sdk.capif_invoker_connector import capif_invoker_connector from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature", "capif_invoker_event_feature", "capif_provider_event_feature"] No newline at end of file __all__ = ['capif_invoker_connector', 'service_discoverer', 'capif_provider_connector', 'api_schema_translator', 'capif_logging_feature', 'capif_invoker_event_feature', 'capif_provider_event_feature'] No newline at end of file opencapif_sdk/api_schema_translator.py +73 −72 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import logging import os import re import socket import yaml log_path = 'logs/builder_logs.log' Loading @@ -25,7 +26,7 @@ logging.basicConfig( class api_schema_translator: REQUIRED_COMPONENTS = ["openapi", "info", "servers", "paths", "components"] REQUIRED_COMPONENTS = ['openapi', 'info', 'servers', 'paths', 'components'] def __init__(self, api_path): self.api_path = os.path.abspath(api_path) Loading @@ -51,32 +52,32 @@ class api_schema_translator: - Assigns default ports for HTTP (80) and HTTPS (443) if missing. """ pattern = r"^(https?:\/\/)?([\w\.-]+|\[.*\])(?::(\d+))?(\/[\w\/-]+)?\/([\w-]+)\/(v\d+)$" pattern = r'^(https?:\/\/)?([\w\.-]+|\[.*\])(?::(\d+))?(\/[\w\/-]+)?\/([\w-]+)\/(v\d+)$' match = re.match(pattern, url) if not match: self.logger.error(f"Invalid URL format: {url}") self.logger.error(f'Invalid URL format: {url}') return None, None, None, None, None, None, None scheme, host, port, api_root, api_name, api_version = match.groups() # Asignar puerto por defecto si no está presente if not port: port = 443 if scheme == "https" else 80 # Si no hay esquema, asumimos HTTPS port = 443 if scheme == 'https' else 80 # Si no hay esquema, asumimos HTTPS else: port = int(port) if not api_name or not api_version: self.logger.error(f"URL must contain API name and version in the format: <apiRoot>/<apiName>/v<version>") self.logger.error('URL must contain API name and version in the format: <apiRoot>/<apiName>/v<version>') return None, None, None, None, None, None, None # Si apiRoot está presente, eliminar "/" innecesarios api_root = api_root.strip('/') if api_root else None if re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host): # IPv4 if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): # IPv4 return host, port, None, None, api_root, api_name, api_version elif re.match(r"^\[.*\]$", host): # IPv6 (brackets format) return None, port, None, host.strip("[]"), api_root, api_name, api_version elif re.match(r'^\[.*\]$', host): # IPv6 (brackets format) return None, port, None, host.strip('[]'), api_root, api_name, api_version else: # FQDN return None, port, host, None, api_root, api_name, api_version Loading @@ -96,40 +97,40 @@ class api_schema_translator: # Validamos que al menos una dirección sea válida if not (ip or fqdn or ipv6Addr or api_root): self.logger.error("Invalid URL: No valid IP, IPv6, FQDN, or API root found. Aborting build.") self.logger.error('Invalid URL: No valid IP, IPv6, FQDN, or API root found. Aborting build.') return # Validamos IP y puerto si es IPv4 if ip and not self.__validate_ip_port(ip, port): self.logger.error("Invalid IP or port. Aborting build.") self.logger.error('Invalid IP or port. Aborting build.') return # Construcción de la API try: api_data = { "apiName": api_name, "aefProfiles": self.__build_aef_profiles(ip, port, api_version, fqdn, ipv6Addr), "description": self.api_info["info"].get("description", "No description provided"), "supportedFeatures": supported_features, "shareableInfo": { "isShareable": True, "capifProvDoms": ["string"] 'apiName': api_name, 'aefProfiles': self.__build_aef_profiles(ip, port, api_version, fqdn, ipv6Addr), 'description': self.api_info['info'].get('description', 'No description provided'), 'supportedFeatures': supported_features, 'shareableInfo': { 'isShareable': True, 'capifProvDoms': ['string'] }, "serviceAPICategory": "string", "apiSuppFeats": api_supp_features, "pubApiPath": { "ccfIds": ["string"] 'serviceAPICategory': 'string', 'apiSuppFeats': api_supp_features, 'pubApiPath': { 'ccfIds': ['string'] }, "ccfId": "string" 'ccfId': 'string' } # Guardamos los datos en un archivo JSON with open(f"{api_name}.json", "w") as outfile: with open(f'{api_name}.json', 'w') as outfile: json.dump(api_data, outfile, indent=4) self.logger.info(f"API description saved to {api_name}.json") self.logger.info(f'API description saved to {api_name}.json') except Exception as e: self.logger.error(f"An error occurred during the build process: {e}") self.logger.error(f'An error occurred during the build process: {e}') def __load_api_file(self, api_file: str): """Loads the Swagger API configuration file and converts YAML to JSON format if necessary.""" Loading @@ -142,15 +143,15 @@ class api_schema_translator: return json.load(file) else: self.logger.warning( f"Unsupported file extension for {api_file}. Only .yaml, .yml, and .json are supported.") f'Unsupported file extension for {api_file}. Only .yaml, .yml, and .json are supported.') return {} except FileNotFoundError: self.logger.warning( f"Configuration file {api_file} not found. Using defaults or environment variables.") f'Configuration file {api_file} not found. Using defaults or environment variables.') return {} except (json.JSONDecodeError, yaml.YAMLError) as e: self.logger.error( f"Error parsing the configuration file {api_file}: {e}") f'Error parsing the configuration file {api_file}: {e}') return {} def __validate_api_info(self): Loading @@ -159,100 +160,100 @@ class api_schema_translator: if missing_components: self.logger.warning(f"Missing components in API specification: {', '.join(missing_components)}") else: self.logger.info("All required components are present in the API specification.") self.logger.info('All required components are present in the API specification.') def __build_aef_profiles(self, ip, port, api_version, fqdn=None, ipv6Addr=None, ): """Builds the aefProfiles section based on the paths and components in the API info.""" aef_profiles = [] resources = [] for path, methods in self.api_info.get("paths", {}).items(): for path, methods in self.api_info.get('paths', {}).items(): for method, details in methods.items(): resource = { "resourceName": details.get("summary", "Unnamed Resource"), "commType": "REQUEST_RESPONSE", "uri": path, "custOpName": f"http_{method}", "operations": [method.upper()], "description": details.get("description", "") 'resourceName': details.get('summary', 'Unnamed Resource'), 'commType': 'REQUEST_RESPONSE', 'uri': path, 'custOpName': f'http_{method}', 'operations': [method.upper()], 'description': details.get('description', '') } resources.append(resource) # Create interface description based on the standard interface_description = { "port": port, "securityMethods": ["OAUTH"] 'port': port, 'securityMethods': ['OAUTH'] } # Include ipv4Addr, ipv6Addr, or fqdn as per the standard if ip: interface_description["ipv4Addr"] = ip interface_description['ipv4Addr'] = ip elif ipv6Addr: interface_description["ipv6Addr"] = ipv6Addr interface_description['ipv6Addr'] = ipv6Addr elif fqdn: interface_description["fqdn"] = fqdn interface_description['fqdn'] = fqdn else: raise ValueError("At least one of ipv4Addr, ipv6Addr, or fqdn must be provided.") raise ValueError('At least one of ipv4Addr, ipv6Addr, or fqdn must be provided.') # Example profile creation based on paths, customize as needed aef_profile = { "aefId": "", # Placeholder AEF ID "versions": [ 'aefId': '', # Placeholder AEF ID 'versions': [ { "apiVersion": f"{api_version}", "expiry": "2100-11-30T10:32:02.004Z", "resources": resources, "custOperations": [ 'apiVersion': f'{api_version}', 'expiry': '2100-11-30T10:32:02.004Z', 'resources': resources, 'custOperations': [ { "commType": "REQUEST_RESPONSE", "custOpName": "string", "operations": ["POST"], "description": "string" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'string', 'operations': ['POST'], 'description': 'string' }, { "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'check-authentication', 'operations': [ 'POST' ], "description": "Check authentication request." 'description': 'Check authentication request.' }, { "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'revoke-authentication', 'operations': [ 'POST' ], "description": "Revoke authorization for service APIs." 'description': 'Revoke authorization for service APIs.' } ] } ], "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": ["OAUTH"], "interfaceDescriptions": [interface_description] 'protocol': 'HTTP_1_1', 'dataFormat': 'JSON', 'securityMethods': ['OAUTH'], 'interfaceDescriptions': [interface_description] } aef_profiles.append(aef_profile) return aef_profiles def __validate_ip_port(self, ip, port): """Validates that the IP and port have the correct format.""" ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") ip_pattern = re.compile(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$') # Validate IP if not ip_pattern.match(ip): self.logger.warning(f"Invalid IP format: {ip}. Expected IPv4 format.") self.logger.warning(f'Invalid IP format: {ip}. Expected IPv4 format.') return False # Validate each octet in the IP address if any(int(octet) > 255 or int(octet) < 0 for octet in ip.split(".")): self.logger.warning(f"IP address out of range: {ip}. Each octet should be between 0 and 255.") if any(int(octet) > 255 or int(octet) < 0 for octet in ip.split('.')): self.logger.warning(f'IP address out of range: {ip}. Each octet should be between 0 and 255.') return False # Validate Port if not (1 <= port <= 65535): self.logger.warning(f"Invalid port number: {port}. Port should be between 1 and 65535.") self.logger.warning(f'Invalid port number: {port}. Port should be between 1 and 65535.') return False self.logger.info("IP and port have correct format.") self.logger.info('IP and port have correct format.') return True Loading
integration_tests/Dockerfiledeleted 100644 → 0 +0 −14 Original line number Diff line number Diff line FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye RUN apt update RUN apt install -y git RUN git clone https://labs.etsi.org/rep/ocf/sdk.git WORKDIR /sdk RUN python -m pip install --upgrade pip RUN pip install -r installation/requirements.txt RUN pip install . ENTRYPOINT ["tail", "-f", "/dev/null"]
integration_tests/Dockerfile2deleted 100644 → 0 +0 −8 Original line number Diff line number Diff line FROM labs.etsi.org:5050/ocf/capif/python:3-slim-bullseye RUN apt update RUN apt install -y git COPY setup.sh . ENTRYPOINT ["bash", "setup.sh"]
integration_tests/setup.shdeleted 100644 → 0 +0 −29 Original line number Diff line number Diff line #!/bin/bash set -euo pipefail SDK_DIR="/sdk" SDK_REPO="https://labs.etsi.org/rep/ocf/sdk.git" SDK_JUST_INSTALLED="true" if [[ -d "$SDK_DIR/.git" ]]; then echo "OpenCAPIF SDK repository exists. Updating..." git -C "$SDK_DIR" pull SDK_JUST_INSTALLED="false" else echo "OpenCAPIF SDK repository not found. Cloning..." git clone "$SDK_REPO" "$SDK_DIR" fi ### !!! # python -m build echo "Installing OpenCAPIF SDK dependencies..." python -m pip install -r "$SDK_DIR/installation/requirements.txt" python -m pip install "$SDK_DIR" [[ "$SDK_JUST_INSTALLED" == "true" ]] && action_text="installation completed" || action_text="updated" echo "OpenCAPIF SDK $action_text successfully." tail -f /dev/null # TODO: Replace with actual entrypoint command (tests)
opencapif_sdk/__init__.py +6 −4 Original line number Diff line number Diff line from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature from opencapif_sdk.capif_invoker_connector import capif_invoker_connector from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_provider_connector import capif_provider_connector from opencapif_sdk.service_discoverer import service_discoverer from opencapif_sdk.api_schema_translator import api_schema_translator from opencapif_sdk.capif_logging_feature import capif_logging_feature from opencapif_sdk.capif_event_feature import capif_invoker_event_feature, capif_provider_event_feature __all__ = ["capif_invoker_connector", "service_discoverer", "capif_provider_connector", "api_schema_translator", "capif_logging_feature", "capif_invoker_event_feature", "capif_provider_event_feature"] No newline at end of file __all__ = ['capif_invoker_connector', 'service_discoverer', 'capif_provider_connector', 'api_schema_translator', 'capif_logging_feature', 'capif_invoker_event_feature', 'capif_provider_event_feature'] No newline at end of file
opencapif_sdk/api_schema_translator.py +73 −72 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import logging import os import re import socket import yaml log_path = 'logs/builder_logs.log' Loading @@ -25,7 +26,7 @@ logging.basicConfig( class api_schema_translator: REQUIRED_COMPONENTS = ["openapi", "info", "servers", "paths", "components"] REQUIRED_COMPONENTS = ['openapi', 'info', 'servers', 'paths', 'components'] def __init__(self, api_path): self.api_path = os.path.abspath(api_path) Loading @@ -51,32 +52,32 @@ class api_schema_translator: - Assigns default ports for HTTP (80) and HTTPS (443) if missing. """ pattern = r"^(https?:\/\/)?([\w\.-]+|\[.*\])(?::(\d+))?(\/[\w\/-]+)?\/([\w-]+)\/(v\d+)$" pattern = r'^(https?:\/\/)?([\w\.-]+|\[.*\])(?::(\d+))?(\/[\w\/-]+)?\/([\w-]+)\/(v\d+)$' match = re.match(pattern, url) if not match: self.logger.error(f"Invalid URL format: {url}") self.logger.error(f'Invalid URL format: {url}') return None, None, None, None, None, None, None scheme, host, port, api_root, api_name, api_version = match.groups() # Asignar puerto por defecto si no está presente if not port: port = 443 if scheme == "https" else 80 # Si no hay esquema, asumimos HTTPS port = 443 if scheme == 'https' else 80 # Si no hay esquema, asumimos HTTPS else: port = int(port) if not api_name or not api_version: self.logger.error(f"URL must contain API name and version in the format: <apiRoot>/<apiName>/v<version>") self.logger.error('URL must contain API name and version in the format: <apiRoot>/<apiName>/v<version>') return None, None, None, None, None, None, None # Si apiRoot está presente, eliminar "/" innecesarios api_root = api_root.strip('/') if api_root else None if re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host): # IPv4 if re.match(r'^\d{1,3}(\.\d{1,3}){3}$', host): # IPv4 return host, port, None, None, api_root, api_name, api_version elif re.match(r"^\[.*\]$", host): # IPv6 (brackets format) return None, port, None, host.strip("[]"), api_root, api_name, api_version elif re.match(r'^\[.*\]$', host): # IPv6 (brackets format) return None, port, None, host.strip('[]'), api_root, api_name, api_version else: # FQDN return None, port, host, None, api_root, api_name, api_version Loading @@ -96,40 +97,40 @@ class api_schema_translator: # Validamos que al menos una dirección sea válida if not (ip or fqdn or ipv6Addr or api_root): self.logger.error("Invalid URL: No valid IP, IPv6, FQDN, or API root found. Aborting build.") self.logger.error('Invalid URL: No valid IP, IPv6, FQDN, or API root found. Aborting build.') return # Validamos IP y puerto si es IPv4 if ip and not self.__validate_ip_port(ip, port): self.logger.error("Invalid IP or port. Aborting build.") self.logger.error('Invalid IP or port. Aborting build.') return # Construcción de la API try: api_data = { "apiName": api_name, "aefProfiles": self.__build_aef_profiles(ip, port, api_version, fqdn, ipv6Addr), "description": self.api_info["info"].get("description", "No description provided"), "supportedFeatures": supported_features, "shareableInfo": { "isShareable": True, "capifProvDoms": ["string"] 'apiName': api_name, 'aefProfiles': self.__build_aef_profiles(ip, port, api_version, fqdn, ipv6Addr), 'description': self.api_info['info'].get('description', 'No description provided'), 'supportedFeatures': supported_features, 'shareableInfo': { 'isShareable': True, 'capifProvDoms': ['string'] }, "serviceAPICategory": "string", "apiSuppFeats": api_supp_features, "pubApiPath": { "ccfIds": ["string"] 'serviceAPICategory': 'string', 'apiSuppFeats': api_supp_features, 'pubApiPath': { 'ccfIds': ['string'] }, "ccfId": "string" 'ccfId': 'string' } # Guardamos los datos en un archivo JSON with open(f"{api_name}.json", "w") as outfile: with open(f'{api_name}.json', 'w') as outfile: json.dump(api_data, outfile, indent=4) self.logger.info(f"API description saved to {api_name}.json") self.logger.info(f'API description saved to {api_name}.json') except Exception as e: self.logger.error(f"An error occurred during the build process: {e}") self.logger.error(f'An error occurred during the build process: {e}') def __load_api_file(self, api_file: str): """Loads the Swagger API configuration file and converts YAML to JSON format if necessary.""" Loading @@ -142,15 +143,15 @@ class api_schema_translator: return json.load(file) else: self.logger.warning( f"Unsupported file extension for {api_file}. Only .yaml, .yml, and .json are supported.") f'Unsupported file extension for {api_file}. Only .yaml, .yml, and .json are supported.') return {} except FileNotFoundError: self.logger.warning( f"Configuration file {api_file} not found. Using defaults or environment variables.") f'Configuration file {api_file} not found. Using defaults or environment variables.') return {} except (json.JSONDecodeError, yaml.YAMLError) as e: self.logger.error( f"Error parsing the configuration file {api_file}: {e}") f'Error parsing the configuration file {api_file}: {e}') return {} def __validate_api_info(self): Loading @@ -159,100 +160,100 @@ class api_schema_translator: if missing_components: self.logger.warning(f"Missing components in API specification: {', '.join(missing_components)}") else: self.logger.info("All required components are present in the API specification.") self.logger.info('All required components are present in the API specification.') def __build_aef_profiles(self, ip, port, api_version, fqdn=None, ipv6Addr=None, ): """Builds the aefProfiles section based on the paths and components in the API info.""" aef_profiles = [] resources = [] for path, methods in self.api_info.get("paths", {}).items(): for path, methods in self.api_info.get('paths', {}).items(): for method, details in methods.items(): resource = { "resourceName": details.get("summary", "Unnamed Resource"), "commType": "REQUEST_RESPONSE", "uri": path, "custOpName": f"http_{method}", "operations": [method.upper()], "description": details.get("description", "") 'resourceName': details.get('summary', 'Unnamed Resource'), 'commType': 'REQUEST_RESPONSE', 'uri': path, 'custOpName': f'http_{method}', 'operations': [method.upper()], 'description': details.get('description', '') } resources.append(resource) # Create interface description based on the standard interface_description = { "port": port, "securityMethods": ["OAUTH"] 'port': port, 'securityMethods': ['OAUTH'] } # Include ipv4Addr, ipv6Addr, or fqdn as per the standard if ip: interface_description["ipv4Addr"] = ip interface_description['ipv4Addr'] = ip elif ipv6Addr: interface_description["ipv6Addr"] = ipv6Addr interface_description['ipv6Addr'] = ipv6Addr elif fqdn: interface_description["fqdn"] = fqdn interface_description['fqdn'] = fqdn else: raise ValueError("At least one of ipv4Addr, ipv6Addr, or fqdn must be provided.") raise ValueError('At least one of ipv4Addr, ipv6Addr, or fqdn must be provided.') # Example profile creation based on paths, customize as needed aef_profile = { "aefId": "", # Placeholder AEF ID "versions": [ 'aefId': '', # Placeholder AEF ID 'versions': [ { "apiVersion": f"{api_version}", "expiry": "2100-11-30T10:32:02.004Z", "resources": resources, "custOperations": [ 'apiVersion': f'{api_version}', 'expiry': '2100-11-30T10:32:02.004Z', 'resources': resources, 'custOperations': [ { "commType": "REQUEST_RESPONSE", "custOpName": "string", "operations": ["POST"], "description": "string" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'string', 'operations': ['POST'], 'description': 'string' }, { "commType": "REQUEST_RESPONSE", "custOpName": "check-authentication", "operations": [ "POST" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'check-authentication', 'operations': [ 'POST' ], "description": "Check authentication request." 'description': 'Check authentication request.' }, { "commType": "REQUEST_RESPONSE", "custOpName": "revoke-authentication", "operations": [ "POST" 'commType': 'REQUEST_RESPONSE', 'custOpName': 'revoke-authentication', 'operations': [ 'POST' ], "description": "Revoke authorization for service APIs." 'description': 'Revoke authorization for service APIs.' } ] } ], "protocol": "HTTP_1_1", "dataFormat": "JSON", "securityMethods": ["OAUTH"], "interfaceDescriptions": [interface_description] 'protocol': 'HTTP_1_1', 'dataFormat': 'JSON', 'securityMethods': ['OAUTH'], 'interfaceDescriptions': [interface_description] } aef_profiles.append(aef_profile) return aef_profiles def __validate_ip_port(self, ip, port): """Validates that the IP and port have the correct format.""" ip_pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") ip_pattern = re.compile(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$') # Validate IP if not ip_pattern.match(ip): self.logger.warning(f"Invalid IP format: {ip}. Expected IPv4 format.") self.logger.warning(f'Invalid IP format: {ip}. Expected IPv4 format.') return False # Validate each octet in the IP address if any(int(octet) > 255 or int(octet) < 0 for octet in ip.split(".")): self.logger.warning(f"IP address out of range: {ip}. Each octet should be between 0 and 255.") if any(int(octet) > 255 or int(octet) < 0 for octet in ip.split('.')): self.logger.warning(f'IP address out of range: {ip}. Each octet should be between 0 and 255.') return False # Validate Port if not (1 <= port <= 65535): self.logger.warning(f"Invalid port number: {port}. Port should be between 1 and 65535.") self.logger.warning(f'Invalid port number: {port}. Port should be between 1 and 65535.') return False self.logger.info("IP and port have correct format.") self.logger.info('IP and port have correct format.') return True