Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ocf/sdk
1 result
Show changes
......@@ -151,13 +151,13 @@ class capif_provider_connector:
self.provider_capif_ids = {}
path_prov_funcs=os.path.join(self.provider_folder,"provider_capif_ids.json")
path_prov_funcs = os.path.join(self.provider_folder, "provider_capif_ids.json")
if os.path.exists(path_prov_funcs):
self.provider_capif_ids=self.__load_provider_api_details()
self.provider_capif_ids = self._load_provider_api_details()
path_published=os.path.join(self.provider_folder,"provider_service_ids.json")
path_published = os.path.join(self.provider_folder, "provider_service_ids.json")
if os.path.exists(path_published):
self.provider_service_ids=self.__load_config_file(path_published)
self.provider_service_ids = self.__load_config_file(path_published)
# Construct the CAPIF HTTPS URL
if len(self.capif_https_port) == 0 or int(self.capif_https_port) == 443:
......@@ -170,7 +170,12 @@ class capif_provider_connector:
self.capif_register_url = f"https://{capif_register_host.strip()}:8084/"
else:
self.capif_register_url = f"https://{capif_register_host.strip()}:{capif_register_port.strip()}/"
events_config = provider_config.get('events', {})
self.events_description = os.getenv('PROVIDER_EVENTS_DESCRIPTION', events_config.get('description', ''))
self.events_filter = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('eventFilters', ''))
self.notification_destination = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('notificationDestination', ''))
self.websock_notif_config = os.getenv('PROVIDER_EVENTS_FILTERS', events_config.get('websockNotifConfig', ''))
# Log initialization success message
self.logger.info("capif_provider_connector initialized with the capif_sdk_config.json parameters")
......@@ -427,7 +432,7 @@ class capif_provider_connector:
self.logger.info(
f"Loading provider details from {provider_details_path}")
provider_details = self.__load_provider_api_details()
provider_details = self._load_provider_api_details()
publish_url = provider_details["publish_url"]
......@@ -604,7 +609,7 @@ class capif_provider_connector:
self.logger.info(
f"Loading provider details from {provider_details_path}")
provider_details = self.__load_provider_api_details()
provider_details = self._load_provider_api_details()
publish_url = provider_details["publish_url"]
# Load provider details
......@@ -633,7 +638,7 @@ class capif_provider_connector:
cert = (
os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
os.path.join(self.provider_folder,
f"apf-{apf_number}_private_key.key"),
f"APF-{apf_number}_private_key.key"),
)
self.logger.info(f"Unpublishing service to URL: {url}")
......@@ -722,7 +727,7 @@ class capif_provider_connector:
self.logger.info(
f"Loading provider details from {provider_details_path}")
provider_details = self.__load_provider_api_details()
provider_details = self._load_provider_api_details()
publish_url = provider_details["publish_url"]
chosenAPFsandAEFs = self.publish_req
......@@ -749,7 +754,7 @@ class capif_provider_connector:
cert = (
os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
os.path.join(self.provider_folder,
f"apf-{apf_number}_private_key.key"),
f"APF-{apf_number}_private_key.key"),
)
self.logger.info(f"Getting service to URL: {url}")
......@@ -796,7 +801,7 @@ class capif_provider_connector:
self.logger.info(
f"Loading provider details from {provider_details_path}")
provider_details = self.__load_provider_api_details()
provider_details = self._load_provider_api_details()
publish_url = provider_details["publish_url"]
chosenAPFsandAEFs = self.publish_req
......@@ -823,7 +828,7 @@ class capif_provider_connector:
cert = (
os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
os.path.join(self.provider_folder,
f"apf-{apf_number}_private_key.key"),
f"APF-{apf_number}_private_key.key"),
)
self.logger.info(f"Getting services to URL: {url}")
......@@ -872,7 +877,7 @@ class capif_provider_connector:
self.logger.info(
f"Loading provider details from {provider_details_path}")
provider_details = self.__load_provider_api_details()
provider_details = self._load_provider_api_details()
publish_url = provider_details["publish_url"]
chosenAPFsandAEFs = self.publish_req
......@@ -971,7 +976,7 @@ class capif_provider_connector:
cert = (
os.path.join(self.provider_folder, f"apf-{apf_number}.crt"),
os.path.join(self.provider_folder,
f"apf-{apf_number}_private_key.key"),
f"APF-{apf_number}_private_key.key"),
)
self.logger.info(f"Publishing services to URL: {url}")
......@@ -1071,7 +1076,7 @@ class capif_provider_connector:
self.logger.info("Offboarding the provider")
# Load CAPIF API details
capif_api_details = self.__load_provider_api_details()
capif_api_details = self._load_provider_api_details()
url = f"{self.capif_https_url}api-provider-management/v1/registrations/{capif_api_details['capif_registration_id']}"
# Define certificate paths
......@@ -1120,7 +1125,7 @@ class capif_provider_connector:
self.logger.error(f"Error during removing folder contents: {e}")
raise
def __load_provider_api_details(self) -> dict:
def _load_provider_api_details(self) -> dict:
"""
Loads NEF API details from the CAPIF provider details JSON file.
......@@ -1163,7 +1168,7 @@ class capif_provider_connector:
)
def certs_modifications(self):
api_details = self.__load_provider_api_details()
api_details = self._load_provider_api_details()
apf_count = 0
aef_count = 0
......@@ -1253,7 +1258,7 @@ class capif_provider_connector:
def update_onboard(self, capif_onboarding_url, access_token):
self.logger.info(
"Onboarding Provider to CAPIF and waiting signed certificate by giving our public keys to CAPIF")
api_details = self.__load_provider_api_details()
api_details = self._load_provider_api_details()
capif_id = "/" + api_details["capif_registration_id"]
url = f"{self.capif_https_url}{capif_onboarding_url}{capif_id}"
......@@ -1346,3 +1351,70 @@ class capif_provider_connector:
self.logger.error(
f"Onboarding failed: {e} - Response: {response.text}")
raise
def _create_or_update_file(self, file_name, file_type, content, mode="w"):
"""
Create or update a file with the specified content.
:param file_name: Name of the file (without extension).
:param file_type: File type or extension (e.g., "txt", "json", "html").
:param content: Content to write into the file. Can be a string, dictionary, or list.
:param mode: Write mode ('w' to overwrite, 'a' to append). Default is 'w'.
"""
# Validate the mode
if mode not in ["w", "a"]:
raise ValueError("Mode must be 'w' (overwrite) or 'a' (append).")
# Construct the full file name
full_file_name = f"{file_name}.{file_type}"
full_path = os.path.join(self.provider_folder, full_file_name)
# Ensure the content is properly formatted
if isinstance(content, (dict, list)):
if file_type == "json":
try:
# Serialize content to JSON
content = json.dumps(content, indent=4)
except TypeError as e:
raise ValueError(f"Failed to serialize content to JSON: {e}")
else:
raise TypeError("Content must be a string when the file type is not JSON.")
elif not isinstance(content, str):
raise TypeError("Content must be a string, dictionary, or list.")
try:
# Open the file in the specified mode
with open(full_path, mode, encoding="utf-8") as file:
file.write(content)
# Log success based on the mode
if mode == "w":
self.logger.info(f"File '{full_file_name}' created or overwritten successfully.")
elif mode == "a":
self.logger.info(f"Content appended to file '{full_file_name}' successfully.")
except Exception as e:
self.logger.error(f"Error handling the file '{full_file_name}': {e}")
raise
def _find_key_by_value(self, data, target_value):
"""
Given a dictionary and a value, return the key corresponding to that value.
:param data: Dictionary to search.
:param target_value: Value to find the corresponding key for.
:return: Key corresponding to the target value, or None if not found.
"""
for key, value in data.items():
if value == target_value:
return key
return None
def _load_config_file(self, config_file: str):
"""Loads the configuration file."""
try:
with open(config_file, 'r') as file:
return json.load(file)
except FileNotFoundError:
self.logger.warning(
f"Configuration file {config_file} not found. Using defaults or environment variables.")
return {}
\ No newline at end of file
......@@ -61,6 +61,15 @@
"csr_country_name": "",
"csr_email_address": ""
},
"api_description_path": ""
"api_description_path": "",
"log":{
"apiName": "",
"apiVersion": "",
"resourceName": "",
"uri": "",
"protocol": "",
"operation": "",
"result": ""
}
}
}
import utilities
from opencapif_sdk import capif_invoker_connector, capif_invoker_event_feature
def showcase_capif_connector():
"""
This method showcases how one can use the CAPIFConnector class.
"""
# invoker = capif_invoker_connector(config_file=utilities.get_config_file())
# invoker.onboard_invoker()
events = capif_invoker_event_feature(config_file=utilities.get_config_file())
events.create_subscription(name="Servicio_2")
events.create_subscription(name="Servicio_3")
# events.update_subcription(name="Servicio_2")
events.delete_subscription(name="Servicio_2")
events.delete_subscription(name="Servicio_3")
print("COMPLETED")
if __name__ == "__main__":
# Register invoker to CAPIF. This should happen exactly once
showcase_capif_connector()
import utilities
from opencapif_sdk import capif_provider_connector, capif_provider_event_feature
def showcase_capif_connector():
"""
This method showcases how one can use the CAPIFConnector class.
"""
provider = capif_provider_connector(config_file=utilities.get_config_file())
# provider.onboard_provider()
id = provider.provider_capif_ids['AEF-1']
events = capif_provider_event_feature(config_file=utilities.get_config_file())
events.create_subscription(name="Servicio_2", id=id)
events.create_subscription(name="Servicio_3", id=id)
# events.update_subcription(name="Servicio_2", id=id)
events.delete_subscription(name="Servicio_2", id=id)
events.delete_subscription(name="Servicio_3", id=id)
print("COMPLETED")
if __name__ == "__main__":
# Register invoker to CAPIF. This should happen exactly once
showcase_capif_connector()
def get_config_file() -> str:
return "../config/capif_sdk_config.json"
return "../test/capif_sdk_config_sample_test.json"
def get_register_file() -> str:
......
......@@ -7,7 +7,7 @@ with open(os.path.join(this_directory, "README_pipy.md"), encoding="utf-8") as f
setup(
name="opencapif_sdk",
version="0.1.17.3",
version="0.1.18",
author="JorgeEcheva, dgs-cgm",
author_email="jorge.echevarriauribarri.practicas@telefonica.com, daniel.garciasanchez@telefonica.com",
description=(
......
{
"capif_host": "",
"register_host": "",
"capif_https_port": "",
"capif_register_port": "",
"capif_username": "",
"capif_password": "",
"debug_mode": "",
"capif_host": "capif-prev.mobilesandbox.cloud",
"register_host": "registercapif-prev.mobilesandbox.cloud",
"capif_https_port": "36212",
"capif_register_port": "36211",
"capif_username": "echeva_0",
"capif_password": "echevapass",
"debug_mode": "True",
"invoker": {
"invoker_folder": "",
"capif_callback_url": "",
"supported_features":"",
"check_authentication_data":{
"ip":"",
"port":""
"invoker_folder": "/Users/IDB0128/Documents/OpenCapif/test_invoker_certificate_folder",
"capif_callback_url": "http://localhost:5000",
"supported_features": "fffffff",
"check_authentication_data": {
"ip": "",
"port": ""
},
"cert_generation": {
"csr_common_name": "",
"csr_organizational_unit": "",
"csr_organization": "",
"csr_locality": "",
"csr_state_or_province_name": "",
"csr_country_name": "",
"csr_email_address": ""
"csr_common_name": "Echeva",
"csr_organizational_unit": "discovery",
"csr_organization": "telefonica",
"csr_locality": "madrid",
"csr_state_or_province_name": "madrid",
"csr_country_name": "ES",
"csr_email_address": "adios@gmail.com"
},
"discover_filter": {
"api-name": "",
......@@ -37,30 +37,61 @@
"api-supported-features": "",
"ue-ip-addr": "",
"service-kpis": ""
},
"events": {
"description": ["SERVICE_API_AVAILABLE"],
"eventFilters": [
{
"apiIds": [""],
"apiInvokerIds": [""],
"aefIds": [""]
}
]
}
},
"provider": {
"provider_folder": "",
"supported_features": "",
"apfs": "",
"aefs": "",
"provider_folder": "/Users/IDB0128/Documents/OpenCapif/test_provider_certificate_folder",
"supported_features": "fffffff",
"cert_generation": {
"csr_common_name": "provider",
"csr_organizational_unit": "discovery",
"csr_organization": "telefonica",
"csr_locality": "madrid",
"csr_state_or_province_name": "madrid",
"csr_country_name": "ES",
"csr_email_address": "hola@gmail.com"
},
"apfs": "2",
"aefs": "3",
"publish_req": {
"service_api_id": "",
"publisher_apf_id": "",
"publisher_aefs_ids": [
"",
""
]
"publisher_aefs_ids": ["", ""]
},
"cert_generation": {
"csr_common_name": "",
"csr_organizational_unit": "",
"csr_organization": "",
"csr_locality": "",
"csr_state_or_province_name": "",
"csr_country_name": "",
"csr_email_address": ""
"api_description_path": "",
"events": {
"description": ["SERVICE_API_AVAILABLE"],
"eventFilters": [
{
"apiIds": [""],
"apiInvokerIds": [""],
"aefIds": [""]
}
],
"notificationDestination" : "http://localhost:5000",
"websockNotifConfig": {
"websocketUri" : "http://localhost:5000",
"requestWebsocketUri": true
}
},
"api_description_path": ""
}
"log": {
"apiName": "Testtrece",
"apiVersion": "v1",
"resourceName": "MONITORING_SUBSCRIPTIONS",
"uri": "/{scsAsId}/subscriptions",
"protocol": "HTTP_2",
"operation": "GET",
"result": "200"
}
}
}
......@@ -2,7 +2,7 @@
import json
# flake8: noqa
from opencapif_sdk import capif_invoker_connector, capif_provider_connector, service_discoverer
from opencapif_sdk import capif_invoker_connector, capif_provider_connector, service_discoverer, capif_logging_feature, capif_invoker_event_feature, capif_provider_event_feature
capif_sdk_config_path = "./capif_sdk_config_sample_test.json"
......@@ -92,6 +92,16 @@ if __name__ == "__main__":
# Update configuration file
capif_provider_connector.publish_req['publisher_apf_id'] = APF1
capif_provider_connector.publish_req['publisher_aefs_ids'] = [AEF1, AEF2]
event_provider = capif_provider_event_feature(config_file=capif_sdk_config_path)
event_provider.create_subscription(name="Ejemplo1",id=AEF2)
event_provider.create_subscription(name="Ejemplo2",id=APF1)
event_provider.delete_subscription(name="Ejemplo1",id=AEF2)
event_provider.delete_subscription(name="Ejemplo2",id=APF1)
capif_provider_connector.publish_services()
......@@ -130,6 +140,22 @@ if __name__ == "__main__":
print("SERVICE GET TOKENS COMPLETED")
logger=capif_logging_feature(config_file=capif_sdk_config_path)
invoker_id=discoverer.invoker_capif_details["api_invoker_id"]
logger.create_logs(aefId=AEF1,api_invoker_id=invoker_id)
event_invoker = capif_invoker_event_feature(config_file=capif_sdk_config_path)
event_invoker.create_subscription(name="Ejemplo3")
event_invoker.create_subscription(name="Ejemplo4")
event_invoker.delete_subscription(name="Ejemplo3")
event_invoker.delete_subscription(name="Ejemplo4")
capif_invoker_connector.update_invoker()
print("INVOKER UPDATE SERVICE COMPLETED")
......