Scheduled maintenance on Saturday, 27 September 2025, from 07:00 AM to 4:00 PM GMT (09:00 AM to 6:00 PM CEST) - some services may be unavailable -

Skip to content
Snippets Groups Projects
Select Git revision
  • 90ba05811159ff08457d53af667ed4c0116b3aba
  • master default
  • feat/320-cttc-ietf-simap-basic-support-with-kafka-yang-push
  • feat/314-tid-new-service-for-ipowdm-configuration-fron-orchestrator-to-ipowdm-controller
  • feat/327-tid-new-service-to-ipowdm-controller-to-manage-transceivers-configuration-on-external-agent
  • feat/292-cttc-implement-integration-test-for-ryu-openflow
  • cnit_tapi
  • cnit-p2mp-premerge
  • feat/325-tid-nbi-e2e-to-manage-e2e-path-computation
  • feat/307-update-python-version-service
  • feat/326-tid-external-management-of-devices-telemetry-nbi
  • openroadm-flex-grid
  • feat/310-cttc-implement-nbi-connector-to-interface-with-osm-client
  • develop protected
  • feat/324-tid-nbi-ietf_l3vpn-deploy-fail
  • feat/321-add-support-for-gnmi-configuration-via-proto
  • feat/322-add-read-support-for-ipinfusion-devices-via-netconf
  • feat/323-add-support-for-restconf-protocol-in-devices
  • feat/policy-refactor
  • feat/192-cttc-implement-telemetry-backend-collector-gnmi-openconfig
  • feat/307-update-python-version
  • feat/telemetry-collector-int
  • v5.0.0 protected
  • v4.0.0 protected
  • demo-dpiab-eucnc2024
  • v3.0.0 protected
  • v2.1.0 protected
  • v2.0.0 protected
  • v1.0.0 protected
29 results

Factory.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Factory.py 1.51 KiB
    import logging, os
    from enum import Enum
    from .api.Database import Database
    from .engines.inmemory.InMemoryDatabaseEngine import InMemoryDatabaseEngine
    from .engines.redis.RedisDatabaseEngine import RedisDatabaseEngine
    
    LOGGER = logging.getLogger(__name__)
    
    class DatabaseEngineEnum(Enum):
        INMEMORY = 'inmemory'
        REDIS = 'redis'
        #MONGO = 'mongo'
        #RETHINK = 'rethink'
        #ETCD = 'etcd'
    
    ENGINES = {
        DatabaseEngineEnum.INMEMORY.value: InMemoryDatabaseEngine,
        DatabaseEngineEnum.REDIS.value: RedisDatabaseEngine,
        #DatabaseEngineEnum.MONGO.value: MongoDatabase,
        #DatabaseEngineEnum.RETHINK.value: RethinkDatabase,
        #DatabaseEngineEnum.ETCD.value: EtcdDatabase,
    }
    
    DEFAULT_DB_ENGINE = DatabaseEngineEnum.INMEMORY
    
    def get_database(engine=None, **settings) -> Database:
        # return an instance of Database initialized with selected engine.
        # Engine is selected using following criteria (first that is not None is selected):
        # 1. user selected by parameter (engine=...)
        # 2. environment variable DB_ENGINE
        # 3. default engine: INMEMORY
        if engine is None: engine = os.environ.get('DB_ENGINE', DEFAULT_DB_ENGINE)
        if engine is None: raise Exception('Database Engine not specified')
        if isinstance(engine, DatabaseEngineEnum): engine = engine.value
        engine_class = ENGINES.get(engine)
        if engine_class is None: raise Exception('Unsupported DatabaseEngine({})'.format(engine))
        LOGGER.info('Selected Database Engine: {}'.format(engine))
        return Database(engine_class(**settings))