Skip to content
onem2m-mgr.go 35.6 KiB
Newer Older
Yann Garcia's avatar
Yann Garcia committed
/*
Yann Garcia's avatar
Yann Garcia committed
 * Copyright (c) 2024-2025  The AdvantEDGE Authors
Yann Garcia's avatar
Yann Garcia committed
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package sssmgr
Yann Garcia's avatar
Yann Garcia committed

import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"net/http"
	"reflect"
	"strconv"
	"strings"
	"sync"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	uuid "github.com/google/uuid"
)

// Sensors-Sharing Service Manager
type SssMgr struct {
	name                 string
	namespace            string
Yann Garcia's avatar
Yann Garcia committed
	bindingProtocol      string
	host                 string
	port                 int
Yann Garcia's avatar
Yann Garcia committed
	cse_name             string
	hostId               string
	mutex                sync.Mutex
	wg                   sync.WaitGroup
	refreshTicker        *time.Ticker
	sss_discovery_notify func()
	sss_status_notify    func()
	sss_data_notify      func()
Yann Garcia's avatar
Yann Garcia committed
}

type IotPlatformInfo struct {
	Address       string
	Port          int
	Name          string
	IotPlatformId string
type Point struct {
	Latitude  float64
	Longitude float64
Yann Garcia's avatar
Yann Garcia committed
}

type SensorCharacteristic struct {
	CharacteristicName          string
	CharacteristicValue         string
	CharacteristicUnitOfMeasure *string
}

type SensorDiscoveryInfo struct {
	SensorIdentifier         string
	SensorType               string
	SensorPropertyList       []string
	SensorCharacteristicList []SensorCharacteristic
	SensorPosition           *Point
	IotPlatformId            string
Yann Garcia's avatar
Yann Garcia committed
}

var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform
var sensorsMap = map[string]SensorDiscoveryInfo{}            // Map sensors by sensorIdentifier
var sensorsPerPlatformMap = map[string][]string{}            // Map dsensorIdentifiers per platform
Yann Garcia's avatar
Yann Garcia committed

// Timer to refresh devices list for all IoT platform
const refreshTickerExpeary = 10 // In seconds

// Enable profiling
const profiling = false

var profilingTimers map[string]time.Time

Yann Garcia's avatar
Yann Garcia committed
var protocol SssMgrBindingProtocol

// NewSssMgr - Creates and initializes a new SSS Traffic Manager
Yann Garcia's avatar
Yann Garcia committed
func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, hostId string, cse_name string, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) {
Yann Garcia's avatar
Yann Garcia committed
	if name == "" {
		err = errors.New("Missing connector name")
		return nil, err
	}

	// Create new Traffic Manager
	tm = new(SssMgr)
Yann Garcia's avatar
Yann Garcia committed
	tm.name = name
	if namespace != "" {
		tm.namespace = namespace
	} else {
		tm.namespace = "default"
	}

Yann Garcia's avatar
Yann Garcia committed
	tm.bindingProtocol = bindingProtocol
	tm.host = host
	tm.port = port
Yann Garcia's avatar
Yann Garcia committed
	tm.cse_name = cse_name
	tm.hostId = hostId
Yann Garcia's avatar
Yann Garcia committed
	if tm.bindingProtocol == "MQTT" {
		if tm.host == "" {
			err := errors.New("Host not set for MQTTP protocol")
			log.Error(err.Error())
			return nil, err
		}
		if tm.port == 0 {
			tm.port = 1883
		}
		protocol = NewSssMgrMqtt()
Yann Garcia's avatar
Yann Garcia committed
	} else if tm.bindingProtocol == "REST_HTTP" {
		if tm.port == 0 {
			tm.port = 80
		}
Yann Garcia's avatar
Yann Garcia committed
		protocol = NewSssMgrHttp()
	} else {
		err := errors.New("Binding protocol not set")
		log.Error(err.Error())
		return nil, err
	}
Yann Garcia's avatar
Yann Garcia committed
	if hostId == "" {
		err := errors.New("hostId not set")
		log.Error(err.Error())
		return nil, err
	}
	if cse_name == "" {
		err := errors.New("cse_name not set")
		log.Error(err.Error())
		return nil, err
	}

Yann Garcia's avatar
Yann Garcia committed
	err = protocol.init(tm)
	if err != nil {
		log.Error(err.Error())
		return nil, err
	}

	tm.sss_discovery_notify = sss_discovery_notify
	tm.sss_status_notify = sss_status_notify
	tm.sss_data_notify = sss_data_notify

Yann Garcia's avatar
Yann Garcia committed
	tm.init()

	return tm, nil
}

// Profiling init
func (tm *SssMgr) init() {
Yann Garcia's avatar
Yann Garcia committed
	if profiling {
		profilingTimers = make(map[string]time.Time)
	}

	registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 1)
Yann Garcia's avatar
Yann Garcia committed
	registeredIotPlatformsMap[tm.hostId] = IotPlatformInfo{
		Address:       tm.host,
		Port:          tm.port,
		Name:          tm.cse_name,
		IotPlatformId: tm.hostId,
	}
	sensorsMap = make(map[string]SensorDiscoveryInfo, 0)
	sensorsPerPlatformMap = make(map[string][]string, 0)
Yann Garcia's avatar
Yann Garcia committed
	tm.refreshTicker = nil
// DeleteSssMgr -
func (tm *SssMgr) DeleteSssMgr() (err error) {
Yann Garcia's avatar
Yann Garcia committed
	tm.stopRefreshTicker()

Yann Garcia's avatar
Yann Garcia committed
	if protocol != nil {
		protocol.uninit()
		protocol = nil
	}
Yann Garcia's avatar
Yann Garcia committed
	return nil
}

func (tm *SssMgr) startRefreshTicker() {
Yann Garcia's avatar
Yann Garcia committed
	log.Debug("Starting refresh loop")
	tm.refreshTicker = time.NewTicker(refreshTickerExpeary * time.Second)
	go func() {
		if tm.refreshTicker != nil {
			for range tm.refreshTicker.C {
				// Refresh the list of devices
				tm.wg.Add(1)
				err := tm.populateDevicesPerIotPlatforms()
				if err != nil {
Loading
Loading full blame…