Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • osl/code/addons/org.etsi.osl.controllers.camara
1 result
Show changes
Showing
with 2947 additions and 0 deletions
QoDProvisioning/Documentation/Pictures/QoDProvisioningAPI-Docs.png

745 KiB

{
"name": "Dummy Operator Service - CFS",
"description": "Dummy Operator Service - CFS",
"lifecycleStatus": "In design",
"version": "0.1.0",
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
},
"isBundle": null,
"serviceSpecCharacteristic": [
{
"name": "qodProv.operation",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.provisioningId",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.qosProfile",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.phoneNumber",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.networkAccessIdentifier",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.ipv4Address.publicAddress",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": null,
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.ipv4Address.privateAddress",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.ipv4Address.publicPort",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "INTEGER",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.device.ipv6Address",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.sink",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "qodProv.sinkCredential.credentialType",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
},
{
"name": "camaraResults",
"configurable": false,
"description": null,
"extensible": null,
"maxCardinality": 1,
"minCardinality": 0,
"regex": null,
"valueType": "TEXT",
"serviceSpecCharRelationship": [],
"serviceSpecCharacteristicValue": [],
"validFor": {
"endDateTime": "2043-12-31T23:59:59.999Z",
"startDateTime": "2024-01-01T00:00:00.001Z"
}
}
]
}
\ No newline at end of file
apiVersion: org.etsi.osl/v1
kind: DummyOperatorService
metadata:
name: _to_be_replaced_by_osl_
namespace: default
spec:
status: "%s"
\ No newline at end of file
apiVersion: org.etsi.osl/v1
kind: DummyOperatorService
metadata:
name: _to_be_replaced_by_osl_
namespace: default
spec:
qodProv:
operation: "%s"
provisioningId: "%s"
device:
phoneNumber: "%s"
networkAccessIdentifier: "%s"
ipv4Address:
publicAddress: "%s"
privateAddress: "%s"
publicPort: %d
ipv6Address: "%s"
qosProfile: "%s"
sink: "%s"
sinkCredential:
credentialType: "%s"
{
java.util.HashMap<String,String> charvals = new java.util.HashMap<>();
charvals.put("_CR_SPEC",String.format("""
apiVersion: org.etsi.osl/v1
kind: DummyOperatorService
metadata:
name: _to_be_replaced_by_osl
namespace: default
spec:
status: "%s"
"""
, "RUNNING"));
setServiceRefCharacteristicsValues("Dummy Operator Service - RFS", charvals);
}
{
java.util.HashMap<String,String> charvals = new java.util.HashMap<>();
charvals.put("_CR_SPEC",String.format("""
apiVersion: org.etsi.osl/v1
kind: DummyOperatorService
metadata:
name: _to_be_replaced_by_osl
namespace: default
spec:
qodProv:
operation: "%s"
provisioningId: "%s"
device:
phoneNumber: "%s"
networkAccessIdentifier: "%s"
ipv4Address:
publicAddress: "%s"
privateAddress: "%s"
publicPort: %d
ipv6Address: "%s"
qosProfile: "%s"
sink: "%s"
sinkCredential:
credentialType: "%s"
"""
, getCharValAsString("qodProv.operation"), getCharValAsString("qodProv.provisioningId"), getCharValAsString("qodProv.device.phoneNumber"), getCharValAsString("qodProv.device.networkAccessIdentifier"), getCharValAsString("qodProv.device.ipv4Address.publicAddress"), getCharValAsString("qodProv.device.ipv4Address.privateAddress"), getCharValNumber("qodProv.device.ipv4Address.publicPort"), getCharValAsString("qodProv.device.ipv6Address"), getCharValAsString("qodProv.qosProfile"), getCharValAsString("qodProv.sink"), getCharValAsString("qodProv.sinkCredential.credentialType")));
setServiceRefCharacteristicsValues("Dummy Operator Service - RFS", charvals);
}
\ No newline at end of file
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: dummy-operator-services.org.etsi.osl
spec:
group: org.etsi.osl
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
status:
type: string
default: NOT_RUNNING
enum:
- "RUNNING"
- "NOT_RUNNING"
qodProv:
type: object
properties:
operation:
type: string
provisioningId:
type: string
device:
type: object
properties:
phoneNumber:
type: string
pattern: '^\+[1-9][0-9]{4,14}$'
networkAccessIdentifier:
type: string
ipv4Address:
type: object
properties:
publicAddress:
type: string
privateAddress:
type: string
publicPort:
type: integer
ipv6Address:
type: string
qosProfile:
type: string
sink:
type: string
sinkCredential:
type: object
properties:
credentialType:
type: string
camaraResults:
type: string
default: "[]"
status:
type: object
scope: Namespaced
names:
plural: dummy-operator-services
singular: dummy-operator-service
kind: DummyOperatorService
shortNames:
- dos
apiVersion: org.etsi.osl/v1
kind: DummyOperatorService
metadata:
name: example-dummy-operator-service
namespace: default
spec:
status: RUNNING
qodProv:
operation: CREATE
provisioningId: 3fa85f64-5717-4562-b3fc-2c963f66afa6
device:
phoneNumber: "+123456789"
networkAccessIdentifier: "123456789@domain.com"
ipv4Address:
publicAddress: "203.0.113.0"
privateAddress: "192.168.1.100"
publicPort: 5000
ipv6Address: "2001:db8::ff00:42:8329"
qosProfile: "QOS_A"
sink: "https://endpoint.example.com/sink"
sinkCredential:
credentialType: "PLAIN"
# VARIABLES
# API Docker Image
API_DOCKER_IMAGE_LOCAL_NAME = osl-camaraaas-qod-provisioning-api
API_DOCKER_IMAGE_LOCAL_TAG = latest
API_DOCKER_IMAGE_NAME_ON_REPOSITORY = osl-camaraaas-qod-provisioning-api
API_DOCKER_IMAGE_TAG_ON_REPOSITORY = latest
# Operator Docker Image
OPERATOR_DOCKER_IMAGE_LOCAL_NAME = osl-camaraaas-qod-provisioning-api-op
OPERATOR_DOCKER_IMAGE_LOCAL_TAG = latest
OPERATOR_DOCKER_IMAGE_NAME_ON_REPOSITORY = osl-camaraaas-qod-provisioning-api-op
OPERATOR_DOCKER_IMAGE_TAG_ON_REPOSITORY = latest
# VARIABLES TO UPDATE
REPOSITORY_HOST = atnog-harbor.av.it.pt/camaraaas
OPERATOR_NAMESPACE = osl-camara-controllers
# Dummy Operator Service
create-dummy-operator-crd:
kubectl apply -f ./DummyOperatorService/crd.yaml
delete-dummy-operator-crd:
kubectl delete -f ./DummyOperatorService/crd.yaml || true
create-dummy-operator-cr:
kubectl apply -f ./DummyOperatorService/example-cr.yaml
delete-dummy-operator-cr:
kubectl delete -f ./DummyOperatorService/example-cr.yaml || true
describe-dummy-operator-cr:
kubectl describe dos example-dummy-operator-service || true
# CAMARAaaS QoD Provisioning API
build-api-docker-image:
docker build -t $(API_DOCKER_IMAGE_LOCAL_NAME):$(API_DOCKER_IMAGE_LOCAL_TAG) ./QoDProvisioningAPI/API
tag-api-docker-image:
docker tag $(API_DOCKER_IMAGE_LOCAL_NAME):$(API_DOCKER_IMAGE_LOCAL_TAG) $(REPOSITORY_HOST)/$(API_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(API_DOCKER_IMAGE_TAG_ON_REPOSITORY)
push-api-docker-image:
docker push $(REPOSITORY_HOST)/$(API_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(API_DOCKER_IMAGE_TAG_ON_REPOSITORY)
api-docker-image: build-api-docker-image tag-api-docker-image push-api-docker-image
# CAMARAaaS QoD Provisioning API Operator
build-operator-docker-image:
docker build -t $(OPERATOR_DOCKER_IMAGE_LOCAL_NAME):$(OPERATOR_DOCKER_IMAGE_LOCAL_TAG) ./QoDProvisioningAPI/Operator
tag-operator-docker-image:
docker tag $(OPERATOR_DOCKER_IMAGE_LOCAL_NAME):$(OPERATOR_DOCKER_IMAGE_LOCAL_TAG) $(REPOSITORY_HOST)/$(OPERATOR_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(OPERATOR_DOCKER_IMAGE_TAG_ON_REPOSITORY)
push-operator-docker-image:
docker push $(REPOSITORY_HOST)/$(OPERATOR_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(OPERATOR_DOCKER_IMAGE_TAG_ON_REPOSITORY)
operator-docker-image: build-operator-docker-image tag-operator-docker-image push-operator-docker-image
install-operator:
helm install camaraaas-qod-prov-operator ./QoDProvisioningAPI/Operator/chart --set operator.image="$(REPOSITORY_HOST)/$(OPERATOR_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(OPERATOR_DOCKER_IMAGE_TAG_ON_REPOSITORY)" --set camaraQoDAPI.image="$(REPOSITORY_HOST)/$(API_DOCKER_IMAGE_NAME_ON_REPOSITORY):$(API_DOCKER_IMAGE_TAG_ON_REPOSITORY)" --namespace $(OPERATOR_NAMESPACE) --create-namespace
get-operator-logs:
kubectl logs -f $$(kubectl get pods -n $(OPERATOR_NAMESPACE) -l app=camaraaas-qod-provisioning-api-op -o jsonpath="{.items[0].metadata.name}") -n $(OPERATOR_NAMESPACE)
uninstall-operator:
helm uninstall camaraaas-qod-prov-operator -n $(OPERATOR_NAMESPACE)
create-operator-test-cr:
kubectl apply -f ./QoDProvisioningAPI/Operator/test-cr.yaml
describe-operator-test-cr:
kubectl describe camaraaas-qod-provisioning-apis.org.etsi.osl test-qod-provisioning
delete-operator-test-cr:
kubectl delete -f ./QoDProvisioningAPI/Operator/test-cr.yaml
# Use an official Python runtime as a parent image
FROM python:3.9-slim
# Set the working directory to /app
WORKDIR /app
# Install system dependencies and SQLite CLI
RUN apt-get update && \
apt-get install -y --no-install-recommends \
sqlite3 \
gcc \
libsqlite3-dev && \
rm -rf /var/lib/apt/lists/*
# Copy the requirements file into the container
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy the application code into the container
COPY src/ /app/
# Create the data directory for the SQLite database
RUN mkdir -p /data
# Set environment variables
ENV SQLITE_DB_PATH=/data/sqlite.db
# Set the command to run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Variables
DOCKER_REPOSITORY_IMAGE_NAME = camaraaas-qod-provisioning-api
DOCKER_REPOSITORY_TAG = latest
REPOSITORY_HOST = atnog-harbor.av.it.pt/camaraaas
SERVICE_UUID=6b6b2f37-b232-4a6a-b9bf-55f96dbdb773
# API
init:
python3 -m venv venv && . venv/bin/activate && pip install -r requirements.txt && deactivate
run:
. venv/bin/activate && SQLITE_DB_PATH=~/sqlite.db BROKER_ADDRESS=10.255.28.137 BROKER_PORT=61613 BROKER_USERNAME=artemis BROKER_PASSWORD=artemis SERVICE_UUID=$(SERVICE_UUID) PYTHONPATH=src uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload --log-level info && deactivate
delete-db:
rm -f ~/sqlite.db
# DOCKER
docker-build:
docker build -t $(DOCKER_REPOSITORY_IMAGE_NAME):$(DOCKER_REPOSITORY_TAG) .
docker-tag:
docker tag $(DOCKER_REPOSITORY_IMAGE_NAME):$(DOCKER_REPOSITORY_TAG) $(REPOSITORY_HOST)/$(DOCKER_REPOSITORY_IMAGE_NAME):$(DOCKER_REPOSITORY_TAG)
docker-push:
docker push $(REPOSITORY_HOST)/$(DOCKER_REPOSITORY_IMAGE_NAME):$(DOCKER_REPOSITORY_TAG)
docker-clean:
docker image prune -f
docker-remove:
docker rmi $(LOCAL_IMAGE_NAME):$(LOCAL_TAG)
docker: docker-build docker-tag docker-push
version: '3.6'
services:
service:
build:
context: .
ports:
- "8000:8000"
command: uvicorn main:app --host 0.0.0.0 --port 8000 --log-level info
environment:
- SQLITE_DB_PATH=/data/sqlite.db
- BROKER_ADDRESS=10.255.28.137
- BROKER_PORT=61613
- BROKER_USERNAME=artemis
- BROKER_PASSWORD=artemis
- SERVICE_UUID=0726c0c2-9fc8-4593-b2e0-8740764ae365
volumes:
- sqlite_data:/data
volumes:
sqlite_data:
driver: local
This diff is collapsed.
aiofiles==23.1.0
aniso8601==7.0.0
async-exit-stack==1.0.1
async-generator==1.10
certifi==2024.7.4
chardet==4.0.0
click==7.1.2
dnspython==2.6.1
email-validator==2.0.0
fastapi==0.115.5
graphene==2.1.8
graphql-core==2.3.2
graphql-relay==2.0.1
h11==0.12.0
httptools>=0.3.0,<0.7.0
httpx==0.24.1
idna==3.7
itsdangerous==1.1.0
Jinja2==3.1.4
MarkupSafe==2.0.1
orjson==3.9.15
promise==2.3
pydantic>=2
python-dotenv==0.17.1
python-multipart==0.0.7
PyYAML>=5.4.1,<6.1.0
requests==2.32.0
Rx==1.6.1
starlette==0.40.0
typing-extensions==4.8.0
ujson==4.0.2
urllib3==1.26.19
uvicorn==0.13.4
uvloop==0.19.0
watchgod==0.7
websockets==10.0
pysqlite3==0.5.4
sqlalchemy==1.4.46
stomp.py==8.2.0
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Authors:
# Eduardo Santos (eduardosantoshf@av.it.pt)
# Rafael Direito (rdireito@av.it.pt)
# @Organization:
# Instituto de Telecomunicações, Aveiro (ITAv)
# Aveiro, Portugal
# @Date:
# December 2024
class Constants():
processed_camara_results = []
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Authors:
# Eduardo Santos (eduardosantoshf@av.it.pt)
# Rafael Direito (rdireito@av.it.pt)
# @Organization:
# Instituto de Telecomunicações, Aveiro (ITAv)
# Aveiro, Portugal
# @Date:
# December 2024
from database.base_models import Provisioning, Device
def map_device_to_dict(device: Device) -> dict:
return {
"phone_number": device.phone_number,
"network_access_identifier": device.network_access_identifier,
"ipv4_address": {
"public_address": device.ipv4_public_address,
"private_address": device.ipv4_private_address,
"public_port": device.ipv4_public_port
},
"ipv6_address": device.ipv6_address
}
def map_service_characteristics(provisioning, operation):
characteristics = [
{
"name": "qodProv.device.phoneNumber",
"value": {"value": provisioning.device.phone_number or ""}
},
{
"name": "qodProv.device.networkAccessIdentifier",
"value": {"value": provisioning.device.network_access_identifier \
or ""}
},
{
"name": "qodProv.device.ipv4Address.publicAddress",
"value": {"value": provisioning.device.ipv4_public_address or ""}
},
{
"name": "qodProv.device.ipv4Address.privateAddress",
"value": {"value": provisioning.device.ipv4_private_address or ""}
},
{
"name": "qodProv.device.ipv4Address.publicPort",
"value": {"value": provisioning.device.ipv4_public_port or ""}
},
{
"name": "qodProv.device.ipv6Address",
"value": {"value": provisioning.device.ipv6_address or ""}
},
{
"name": "qodProv.qosProfile",
"value": {"value": provisioning.qos_profile}
},
{
"name": "qodProv.operation",
"value": {"value": operation}
},
{
"name": "qodProv.provisioningId",
"value": {"value": provisioning.id}
},
{
"name": "qodProv.sink",
"value": {"value": provisioning.sink or ""}
},
{
"name": "qodProv.sinkCredential.credentialType",
"value": {"value": provisioning.sink_credential or ""}
}
]
return characteristics
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Authors:
# Eduardo Santos (eduardosantoshf@av.it.pt)
# Rafael Direito (rdireito@av.it.pt)
# @Organization:
# Instituto de Telecomunicações, Aveiro (ITAv)
# Aveiro, Portugal
# @Date:
# December 2024
import asyncio
from aux.service_event_manager.service_event_manager import ServiceEventManager
from config import Config
import json
from database import crud
from database.db import get_db
from aux.constants import Constants
# Set up logging
logger = Config.setup_logging()
class CamaraResultsProcessor:
"""Handles processing of camara results from the queue."""
def __init__(self, queue):
self.queue = queue
self.db_session = next(get_db())
async def process_results(self):
"""Continuously processes results from the queue."""
try:
results = None
# Enter the infinite loop to process subsequent results
while True:
results_str = await self.queue.get()
try:
results = json.loads(results_str)
except Exception as e:
logger.error(
f"Could not parse Camara results. Reason: {e}"
)
logger.info(
f"Amounf of processed CAMARA Results: {len(results)}."
)
logger.debug(f"Processed camaraResults: {results}")
self.update_provisionings(results)
except asyncio.CancelledError:
logger.info("CamaraResultsProcessor stopped gracefully.")
except Exception as e:
logger.error(f"Error processing camara results: {e}", exc_info=True)
def update_provisionings(self, current_results):
Constants.processed_camara_results = []
for result in current_results:
try:
prov_id = result["provisioningId"]
prov_status = result["status"]
prov_timestamp = result["startedAt"]
crud.update_provisioning_by_id(
self.db_session,
prov_id,
prov_status,
prov_timestamp
)
# Deal with camara-current-results endpoint
if "sinkCredential" in result:
if "credentialType" in result["sinkCredential"] \
and result["sinkCredential"]["credentialType"] not in \
['PLAIN', 'ACCESSTOKEN', 'REFRESHTOKEN']:
result["sinkCredential"]["credentialType"] = None
else:
result["sinkCredential"] = {
"credentialType": None
}
Constants.processed_camara_results.append(result)
except Exception as e:
logger.error(
f"Could not process CAMARA Result: {result}. Reason: {e}"
)
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Authors:
# Eduardo Santos (eduardosantoshf@av.it.pt)
# Rafael Direito (rdireito@av.it.pt)
# @Organization:
# Instituto de Telecomunicações, Aveiro (ITAv)
# Aveiro, Portugal
# @Date:
# December 2024
import stomp
import json
import time
import os
import asyncio
from functools import wraps
from config import Config
# Set up logging
logger = Config.setup_logging()
def check_subscribe_connection(method):
@wraps(method)
def wrapper(cls, *args, **kwargs):
if not cls.connection or not cls.connection.is_connected():
logger.warning("Connection not active. Reconnecting...")
cls.connection = stomp.Connection(
[(cls.broker_address, cls.broker_port)],
heartbeats=(15000, 15000)
)
cls.connection.connect(
cls.broker_username,
cls.broker_password,
wait=True
)
if cls.connection and cls.connection.is_connected():
logger.info("Connection is active.")
return method(cls, *args, **kwargs)
return wrapper
class ServiceEventManager:
"""Manages event subscriptions and service updates using STOMP."""
# Validate the environment variables before using them
Config.validate()
camara_results_queue = None
camara_results_lock = None
connection = None
@classmethod
def initialize(cls):
cls.broker_address = Config.broker_address
cls.broker_port = Config.broker_port
cls.broker_username = Config.broker_username
cls.broker_password = Config.broker_password
cls.service_uuid = Config.service_uuid
cls.catalog_upd_service = Config.catalog_upd_service
cls.event_service_attrchanged = Config.event_service_attrchanged
# Initialize shared resources
cls.camara_results_queue = asyncio.Queue()
cls.camara_results_lock = asyncio.Lock()
@classmethod
@check_subscribe_connection
def subscribe_to_events(cls):
"""Subscribe to the events topic."""
loop = asyncio.get_event_loop()
def run_listener():
cls.connection.set_listener('', cls.MyListener(loop))
cls.connection.subscribe(
destination=cls.event_service_attrchanged,
id=1
)
logger.info(
f"Subscribed to {cls.event_service_attrchanged}. "
f"Waiting for messages..."
)
# Run the listener in a separate thread
import threading
listener_thread = threading.Thread(target=run_listener, daemon=True)
listener_thread.start()
@classmethod
def update_service(cls, update_payload):
"""Send a service update to the specified destination."""
try:
headers = {
"serviceid": cls.service_uuid,
"triggerServiceActionQueue": True
}
# Connect to STOMP broker and send the message
conn = stomp.Connection([(cls.broker_address, cls.broker_port)])
conn.connect(
cls.broker_username,
cls.broker_password,
wait=True
)
logger.info(f"Sending update to {cls.catalog_upd_service}...")
conn.send(
destination=cls.catalog_upd_service,
body=json.dumps(update_payload),
headers=headers
)
logger.info("Update sent successfully.")
conn.disconnect()
except Exception as e:
logger.error(f"Cannot update Service: {cls.service_uuid}: {str(e)}")
class MyListener(stomp.ConnectionListener):
"""Custom listener to handle incoming messages from the STOMP broker."""
def __init__(self, loop):
super().__init__()
self.loop = loop
def get_camara_results(self, service_info):
for charact in service_info.get("serviceCharacteristic"):
if charact.get("name") == "camaraResults":
return charact.get("value").get("value")
def on_message(self, frame):
"""Handle received message frames."""
# Attempt to parse the body as JSON
try:
message = json.loads(frame.body)
service_info = message.get("event").get("service")
camara_results = None
if service_info.get("uuid") == ServiceEventManager.service_uuid:
camara_results = self.get_camara_results(service_info)
# Add the result to the async queue
if camara_results:
asyncio.run_coroutine_threadsafe(
ServiceEventManager.camara_results_queue.put(
camara_results
),
self.loop
)
except json.JSONDecodeError:
logger.info('Received message is not valid JSON.')
\ No newline at end of file