Loading .env.sample +1 −0 Original line number Diff line number Diff line MONGO_URI=mongodb://username:password@localhost:27017/sample_db PI_EDGE_BASE_URL=http://example.com/api PI_EDGE_USERNAME=username PI_EDGE_PASSWORD=password Loading docker-compose.yaml 0 → 100644 +20 −0 Original line number Diff line number Diff line version: '3.1' services: mongo: image: mongo restart: always environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example # cloud-management-api: # image: cloud-management-api # restart: always # ports: # - 8080:8080 # environment: # ME_CONFIG_MONGODB_ADMINUSERNAME: root # ME_CONFIG_MONGODB_ADMINPASSWORD: example # ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/ # ME_CONFIG_BASICAUTH: false edge_cloud_management_api/configs/env_config.py +12 −5 Original line number Diff line number Diff line from dotenv import load_dotenv import os from pydantic import BaseSettings from dotenv import load_dotenv load_dotenv() PI_EDGE_BASE_URL = os.getenv("PI_EDGE_BASE_URL") PI_EDGE_USERNAME = os.getenv("PI_EDGE_USERNAME") PI_EDGE_PASSWORD = os.getenv("PI_EDGE_PASSWORD") HTTP_PROXY = os.getenv("HTTP_PROXY") class Config(BaseSettings): MONGO_URI: str = os.getenv("MONGO_URI") PI_EDGE_BASE_URL: str = os.getenv("PI_EDGE_BASE_URL") PI_EDGE_USERNAME: str = os.getenv("PI_EDGE_USERNAME") PI_EDGE_PASSWORD: str = os.getenv("PI_EDGE_PASSWORD") HTTP_PROXY: str = os.getenv("HTTP_PROXY") # config = Config() edge_cloud_management_api/managers/__init__.py 0 → 100644 +0 −0 Empty file added. edge_cloud_management_api/managers/db_manager.py 0 → 100644 +56 −0 Original line number Diff line number Diff line from pymongo import MongoClient from edge_cloud_management_api.configs.env_config import Config class MongoManager: def __init__(self): """ Initializes the MongoDB connection using the URI from Config. """ self.client = MongoClient(Config.MONGO_URI) mongo_db_name: str = Config.MONGO_URI.split("/")[-1] self.db = self.client[mongo_db_name] def insert_document(self, collection_name, document): """ Inserts a document into the specified collection. """ collection = self.db[collection_name] result = collection.insert_one(document) return result.inserted_id def find_document(self, collection_name, query): """ Finds a single document based on the query. """ collection = self.db[collection_name] return collection.find_one(query) def find_documents(self, collection_name, query): """ Finds multiple documents based on the query. """ collection = self.db[collection_name] return collection.find(query) def update_document(self, collection_name, query, update_data): """ Updates a single document based on the query. """ collection = self.db[collection_name] result = collection.update_one(query, {"$set": update_data}) return result.modified_count def delete_document(self, collection_name, query): """ Deletes a single document based on the query. """ collection = self.db[collection_name] result = collection.delete_one(query) return result.deleted_count def close_connection(self): """ Closes the MongoDB connection. """ self.client.close() Loading
.env.sample +1 −0 Original line number Diff line number Diff line MONGO_URI=mongodb://username:password@localhost:27017/sample_db PI_EDGE_BASE_URL=http://example.com/api PI_EDGE_USERNAME=username PI_EDGE_PASSWORD=password Loading
docker-compose.yaml 0 → 100644 +20 −0 Original line number Diff line number Diff line version: '3.1' services: mongo: image: mongo restart: always environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example # cloud-management-api: # image: cloud-management-api # restart: always # ports: # - 8080:8080 # environment: # ME_CONFIG_MONGODB_ADMINUSERNAME: root # ME_CONFIG_MONGODB_ADMINPASSWORD: example # ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/ # ME_CONFIG_BASICAUTH: false
edge_cloud_management_api/configs/env_config.py +12 −5 Original line number Diff line number Diff line from dotenv import load_dotenv import os from pydantic import BaseSettings from dotenv import load_dotenv load_dotenv() PI_EDGE_BASE_URL = os.getenv("PI_EDGE_BASE_URL") PI_EDGE_USERNAME = os.getenv("PI_EDGE_USERNAME") PI_EDGE_PASSWORD = os.getenv("PI_EDGE_PASSWORD") HTTP_PROXY = os.getenv("HTTP_PROXY") class Config(BaseSettings): MONGO_URI: str = os.getenv("MONGO_URI") PI_EDGE_BASE_URL: str = os.getenv("PI_EDGE_BASE_URL") PI_EDGE_USERNAME: str = os.getenv("PI_EDGE_USERNAME") PI_EDGE_PASSWORD: str = os.getenv("PI_EDGE_PASSWORD") HTTP_PROXY: str = os.getenv("HTTP_PROXY") # config = Config()
edge_cloud_management_api/managers/db_manager.py 0 → 100644 +56 −0 Original line number Diff line number Diff line from pymongo import MongoClient from edge_cloud_management_api.configs.env_config import Config class MongoManager: def __init__(self): """ Initializes the MongoDB connection using the URI from Config. """ self.client = MongoClient(Config.MONGO_URI) mongo_db_name: str = Config.MONGO_URI.split("/")[-1] self.db = self.client[mongo_db_name] def insert_document(self, collection_name, document): """ Inserts a document into the specified collection. """ collection = self.db[collection_name] result = collection.insert_one(document) return result.inserted_id def find_document(self, collection_name, query): """ Finds a single document based on the query. """ collection = self.db[collection_name] return collection.find_one(query) def find_documents(self, collection_name, query): """ Finds multiple documents based on the query. """ collection = self.db[collection_name] return collection.find(query) def update_document(self, collection_name, query, update_data): """ Updates a single document based on the query. """ collection = self.db[collection_name] result = collection.update_one(query, {"$set": update_data}) return result.modified_count def delete_document(self, collection_name, query): """ Deletes a single document based on the query. """ collection = self.db[collection_name] result = collection.delete_one(query) return result.deleted_count def close_connection(self): """ Closes the MongoDB connection. """ self.client.close()