Skip to content
iot-mgr.go 18.6 KiB
Newer Older
/*
 * Copyright (c) 2024  The AdvantEDGE Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package iotmgr

import (
	"errors"
Yann Garcia's avatar
Yann Garcia committed
	"sync"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
Yann Garcia's avatar
Yann Garcia committed
	sssmgr "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sss-mgr"
Yann Garcia's avatar
Yann Garcia committed
	name          string
	namespace     string
	mutex         sync.Mutex
	wg            sync.WaitGroup
	refreshTicker *time.Ticker
}

type IotPlatformInfo struct {
	IotPlatformId               string
	UserTransportInfo           []MbTransportInfo
	CustomServicesTransportInfo []TransportInfo
	Enabled                     bool
Yann Garcia's avatar
Yann Garcia committed
	oneM2M                      *sssmgr.SssMgr
}

type MbTransportInfo struct {
	Id               string
	Name             string
	Description      string
	Type_            *string
	Protocol         string
	Version          string
	Endpoint         *EndPointInfo
	Security         *SecurityInfo
	ImplSpecificInfo *ImplSpecificInfo
}

type TransportInfo struct {
	Id               string
	Name             string
	Description      string
	Type_            *string
	Protocol         string
	Version          string
	Endpoint         *EndPointInfo
	Security         *SecurityInfo
	ImplSpecificInfo string
}

type EndPointInfo struct {
	Uris        []string
	Fqdn        []string
	Addresses   []Addresses
	Alternative string
}

type Addresses struct {
	Host string
	Port int32
}

type SecurityInfo struct {
	OAuth2Info *OAuth2Info
	Extensions string
}

type OAuth2Info struct {
	GrantTypes    []string
	TokenEndpoint string
}

type ImplSpecificInfo struct {
	EventTopics    []string
	UplinkTopics   []string
	DownlinkTopics []string
}

Yann Garcia's avatar
Yann Garcia committed
type TrafficRuleDescriptor struct {
	TrafficRuleId string
	FilterType    string
	Priority      int32
	TrafficFilter []TrafficFilter
	Action        string
	DstInterface  *InterfaceDescriptor
}

type InterfaceDescriptor struct {
	InterfaceType string
	//TunnelInfo *TunnelInfo FSCOM Not supported
	SrcMACAddress string
	DstMACAddress string
	DstIPAddress  string
}

type TrafficFilter struct {
	SrcAddress       []string
	DstAddress       []string
	SrcPort          []string
	DstPort          []string
	Protocol         []string
	Tag              []string
	Uri              []string
	PacketLabel      []string
	SrcTunnelAddress []string
	TgtTunnelAddress []string
	SrcTunnelPort    []string
	DstTunnelPort    []string
	QCI              int32
	TC               int32
}

type KeyValuePair struct {
	Key   string
	Value string
}

type DeviceInfo struct {
	DeviceAuthenticationInfo string
Yann Garcia's avatar
Yann Garcia committed
	DeviceMetadata           []KeyValuePair
	Gpsi                     string
	Pei                      string
	Supi                     string
	Msisdn                   string
	Imei                     string
	Imsi                     string
	Iccid                    string
	DeviceId                 string
	RequestedMecTrafficRule  []TrafficRuleDescriptor
	RequestedIotPlatformId   string
	RequestedUserTransportId string
	//DeviceSpecificMessageFormats *DeviceSpecificMessageFormats
	//DownlinkInfo *DownlinkInfo
Yann Garcia's avatar
Yann Garcia committed
	ClientCertificate string
	Enabled           bool
}

var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform
var devicesMap = map[string]DeviceInfo{}                     // Map device by deviceId
var devicesPerPlatformMap = map[string][]string{}            // Map deviceIds per platform
Yann Garcia's avatar
Yann Garcia committed
var platformPerUserTransportIdMap = map[string][]string{}    // Map userTransportId per platform
// Timer to refresh devices list for all IoT platform
Yann Garcia's avatar
Yann Garcia committed
const refreshTickerExpeary = 30 // In seconds
// Enable profiling
const profiling = false

var profilingTimers map[string]time.Time

const (
	headerAccept      = "application/json"
	headerContentType = "application/json"
)

// NewIotMgr - Creates and initializes a new IOT Traffic Manager
func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) {
	if name == "" {
		err = errors.New("Missing connector name")
		return nil, err
	}

	// Create new Traffic Manager
	tm = new(IotMgr)
	tm.name = name
	if namespace != "" {
		tm.namespace = namespace
	} else {
		tm.namespace = "default"
	}

Yann Garcia's avatar
Yann Garcia committed
	tm.init()

Yann Garcia's avatar
Yann Garcia committed
// Profiling init
func (tm *IotMgr) init() {
	if profiling {
		profilingTimers = make(map[string]time.Time)
	}
Yann Garcia's avatar
Yann Garcia committed
	registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 0)
	devicesMap = make(map[string]DeviceInfo, 0)
	devicesPerPlatformMap = make(map[string][]string, 0)
	platformPerUserTransportIdMap = make(map[string][]string, 0)
Yann Garcia's avatar
Yann Garcia committed
	tm.refreshTicker = nil
Yann Garcia's avatar
Yann Garcia committed
// DeleteIotMgr -
func (tm *IotMgr) DeleteIotMgr() (err error) {
	return nil
Yann Garcia's avatar
Yann Garcia committed
func (tm *IotMgr) startRefreshTicker() {
	log.Debug(">>> startRefreshTicker")

	tm.refreshTicker = time.NewTicker(time.Duration(refreshTickerExpeary) * time.Second)
	go func() {
		if tm.refreshTicker != nil {
			for range tm.refreshTicker.C {
				// Refresh the list of devices
				tm.wg.Add(1)
				log.Debug("startRefreshTicker: registeredIotPlatformsMap: ", registeredIotPlatformsMap)
				for _, v := range registeredIotPlatformsMap {
					if v.oneM2M != nil {
						err := tm.populateDevicesPerIotPlatforms(v)
						if err != nil {
							log.Error(err)
						}
					} else {
						log.Debug("startRefreshTicker: Nothing to do")
					}
				} // End of 'for' statement
				log.Debug("startRefreshTicker: Before Done()")
				tm.wg.Done()
				log.Debug("startRefreshTicker: After Done()")
			} // End of 'for' statement
			log.Debug("startRefreshTicker: Leaving time loop")
		}
	}()
}

func (tm *IotMgr) stopRefreshTicker() {
	if tm.refreshTicker != nil {
		// Refresh the list of devices
		tm.wg.Add(1)
		tm.refreshTicker.Stop()
		tm.refreshTicker = nil
		tm.wg.Done()
		log.Debug("Refresh loop stopped")
	}
}

func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) {
	if profiling {
		profilingTimers["RegisterIotPlatformInfo"] = time.Now()
	}

	log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo)
	if iotPlatformInfo.Enabled {
Yann Garcia's avatar
Yann Garcia committed
		//{{\"iotPlatformId\": \"1a584db5-6a3e-4f56-b126-29180069ecf1\", \"userTransportInfo\": [{\"id\": \"ca22ca5e-e0ce-4da8-a2ce-2966f4759032\", \"name\": \"MQTT\", \"description\": \"MQTT\", \"type\": \"MB_TOPIC_BASED\", \"protocol\": \"MQTT\", \"version\": \"2\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.56\", \"port\": 1883}]}, \"security\": {}, \"implSpecificInfo\": {}}], \"customServicesTransportInfo\": [{\"id\": \"85fe5e7f-c371-4f71-b7f6-61a1f808fbb3\", \"name\": \"/laboai-acme-ic-cse\", \"description\": \"ACME oneM2M CSE\", \"type\": \"REST_HTTP\", \"protocol\": \"REST_HTTP\", \"version\": \"4\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.20\", \"port\": 31110}]}, \"security\": {}}], \"enabled\": true}}
		iotPlatformInfo.oneM2M = nil
		if len(iotPlatformInfo.CustomServicesTransportInfo) == 0 || iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint == nil || len(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses) == 0 {
			log.Warn("RegisterIotPlatformInfo: Cannot use provided CustomServicesTransportInfo")
		} else {
			// FIXME FSCOM How to get the CSE_ID
			// TODO FSCOM Add notification support?
			pltf, err := sssmgr.NewSssMgr(tm.name, tm.namespace, iotPlatformInfo.CustomServicesTransportInfo[0].Protocol /*"MQTT"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Host /*"172.29.10.56"*/, int(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Port) /*1883*/, iotPlatformInfo.IotPlatformId /*"7feaadbb0400"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Name /*"laboai-acme-ic-cse"*/, nil, nil, nil)
			if err != nil {
				log.Error("RegisterIotPlatformInfo: ", err)
				iotPlatformInfo.oneM2M = nil
			} else {
				log.Info("RegisterIotPlatformInfo: IoT pltf created")
				iotPlatformInfo.oneM2M = pltf
				if tm.refreshTicker == nil {
					log.Info("RegisterIotPlatformInfo: Start RefreshTicker")
					tm.startRefreshTicker()
				}
			}
		}
		registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo
Yann Garcia's avatar
Yann Garcia committed
		log.Info("RegisterIotPlatformInfo: iotPlatformId: ", registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId])

	} // else, Skip disabled platform

	if profiling {
		now := time.Now()
		log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"]))
	}
	return nil
}

func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) {
	if profiling {
		profilingTimers["DeregisterIotPlatformInfo"] = time.Now()
	}

	log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId)
	// Remove the list of the devices for this IoT platform
	if val, ok := devicesPerPlatformMap[iotPlatformId]; ok {
		// Free resources from devicesMap map
		for _, dev := range val {
			delete(devicesMap, dev)
		} // End of 'for' statement
		delete(devicesPerPlatformMap, iotPlatformId)
Yann Garcia's avatar
Yann Garcia committed
		log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap)
Yann Garcia's avatar
Yann Garcia committed
		for _, rule := range platformPerUserTransportIdMap {
			for idx, pltf := range rule {
				if pltf == iotPlatformId {
					rule = append(rule[:idx], rule[idx+1:]...)
				}
			} // End of 'for' statement
		} // End of 'for' statement
Yann Garcia's avatar
Yann Garcia committed
		log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap)
Yann Garcia's avatar
Yann Garcia committed
	if pltf, ok := registeredIotPlatformsMap[iotPlatformId]; ok {
		if pltf.oneM2M != nil {
			_ = pltf.oneM2M.DeleteSssMgr()
			pltf.oneM2M = nil
			log.Info("RegisterIotPlatformInfo: IoT pltf removed")
		}
		delete(registeredIotPlatformsMap, iotPlatformId)
Yann Garcia's avatar
Yann Garcia committed
		if len(registeredIotPlatformsMap) == 0 {
			if tm.refreshTicker != nil {
				log.Info("RegisterIotPlatformInfo: Stop RefreshTicker")
				tm.stopRefreshTicker()
				tm.refreshTicker = nil
			}

		}
	}

	if profiling {
		now := time.Now()
		log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"]))
	}
	return nil
}

func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) {
	if profiling {
		profilingTimers["GetDevices"] = time.Now()
	}

	log.Info(">>> GetDevices")
Yann Garcia's avatar
Yann Garcia committed
	tm.wg.Wait()
	log.Info("GetDevices: After Wait()")

	devices = make([]DeviceInfo, 0)
	if len(registeredIotPlatformsMap) == 0 {
		return devices, nil
	}

	for _, v := range devicesMap {
		log.Info("GetDevices: adding device: ", v)
		devices = append(devices, v)
	} // End of 'for' statement
	log.Info("GetDevices: devices: ", devices)

	if profiling {
		now := time.Now()
		log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"]))
	}
	return devices, nil
}

func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) {
	if profiling {
		profilingTimers["GetDevice"] = time.Now()
	}

	log.Info(">>> GetDevice: deviceId: ", deviceId)

Yann Garcia's avatar
Yann Garcia committed
	tm.wg.Wait()
	log.Info("GetDevices: After Wait()")

	if val, ok := devicesMap[deviceId]; !ok {
		err = errors.New("Wrong Device identifier")
		return device, err
	} else {
		device = val
	}

	if profiling {
		now := time.Now()
		log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"]))
	}
	log.Info("GetDevice: device: ", device)

Yann Garcia's avatar
Yann Garcia committed
func (tm *IotMgr) CreateDevice(device DeviceInfo) (deviceResp DeviceInfo, err error) {
	log.Info(">>> CreateDevice: ", device)
Yann Garcia's avatar
Yann Garcia committed
	tm.wg.Wait()
	log.Info("GetDevices: After Wait()")

Yann Garcia's avatar
Yann Garcia committed
	// RequestedMecTrafficRule is not supported yet
	if len(device.RequestedMecTrafficRule) != 0 {
		err = errors.New("Unsupported traffic rule provided")
		log.Error(err.Error())
		return deviceResp, err
Yann Garcia's avatar
Yann Garcia committed
	if len(device.RequestedIotPlatformId) != 0 {
		deviceResp, err = tm.createDeviceWithIotPlatformId(device, device.RequestedIotPlatformId)
	} else {
		deviceResp, err = tm.createDeviceWithRequestedUserTransportId(device, device.RequestedUserTransportId)
Yann Garcia's avatar
Yann Garcia committed
	if err != nil {
		log.Error(err.Error())
		return deviceResp, err
Yann Garcia's avatar
Yann Garcia committed
	log.Info("CreateDevice: deviceResp: ", deviceResp)
Yann Garcia's avatar
Yann Garcia committed
	return deviceResp, nil
Yann Garcia's avatar
Yann Garcia committed
func (tm *IotMgr) DeleteDevice(deviceId string) (err error) {
Yann Garcia's avatar
Yann Garcia committed
		profilingTimers["DeleteDevice"] = time.Now()
Yann Garcia's avatar
Yann Garcia committed
	log.Info(">>> DeleteDevice: device: ", deviceId)
Yann Garcia's avatar
Yann Garcia committed
	tm.wg.Wait()
	log.Info("GetDevices: After Wait()")

Yann Garcia's avatar
Yann Garcia committed
	if _, ok := devicesMap[deviceId]; !ok {
		err = errors.New("Invalid device identifier")
		log.Error(err.Error())
		return err
	}
Yann Garcia's avatar
Yann Garcia committed
	device := devicesMap[deviceId]
	// Remove the list of the devices for this IoT platform
	if val, ok := devicesPerPlatformMap[device.RequestedIotPlatformId]; ok {
		// Free resource from devicesMap map
		log.Info("DeleteDevice: devicesPerPlatformMap (before): ", devicesPerPlatformMap)
		for idx, devId := range val {
			if devId == device.DeviceId {
				val = append(val[:idx], val[idx+1:]...)
				break
			}
		} // End of 'for' statement
	}
	log.Info("DeleteDevice: devicesPerPlatformMap (after): ", devicesPerPlatformMap)
	// Free resource from devicesMap map
	log.Info("DeleteDevice: devicesMap (before): ", devicesMap)
	delete(devicesMap, device.DeviceId)
	log.Info("DeleteDevice: devicesMap (after): ", devicesMap)
Yann Garcia's avatar
Yann Garcia committed
		log.Debug("DeleteDevice: ", now.Sub(profilingTimers["DeleteDevice"]))
Yann Garcia's avatar
Yann Garcia committed
func (tm *IotMgr) createDeviceWithIotPlatformId(device DeviceInfo, requestedIotPlatformId string) (deviceResp DeviceInfo, err error) {
	log.Info(">>> createDeviceWithIotPlatformId: ", device)
Yann Garcia's avatar
Yann Garcia committed
	// Sanity checks
	if _, ok := registeredIotPlatformsMap[requestedIotPlatformId]; !ok {
		err = errors.New("Invalid IotPlatform identifier")
		return deviceResp, err
Yann Garcia's avatar
Yann Garcia committed
	if _, ok := devicesMap[device.DeviceId]; ok {
		err = errors.New("Device already exist")
		return deviceResp, err
Yann Garcia's avatar
Yann Garcia committed
	if registeredIotPlatformsMap[requestedIotPlatformId].oneM2M != nil && device.Enabled == true {
		log.Info("createDeviceWithIotPlatformId: Create device on IoT platform", device)
		var sensor = sssmgr.SensorDiscoveryInfo{
			SensorIdentifier: device.DeviceId,
			SensorType:       "CNT", // FIXME FSCOM How to retrieve this info
			SensorPosition:   nil,
			IotPlatformId:    requestedIotPlatformId,
		}
		if len(device.DeviceMetadata) != 0 {
			sensor.SensorCharacteristicList = make([]sssmgr.SensorCharacteristic, len(device.DeviceMetadata))
			for i, c := range device.DeviceMetadata {
				sensor.SensorCharacteristicList[i] = sssmgr.SensorCharacteristic{CharacteristicName: c.Key, CharacteristicValue: c.Value}
			} // End of 'for' statement
		}
		// FIXME FSCOM How to manage these fields from DeviceInfo
		// 	DeviceAuthenticationInfo string
		// 	Gpsi                     string
		// 	Pei                      string
		// 	Supi                     string
		// 	Msisdn                   string
		// 	Imei                     string
		// 	Imsi                     string
		// 	Iccid                    string
		// 	RequestedMecTrafficRule  []TrafficRuleDescriptor
		// 	//DeviceSpecificMessageFormats *DeviceSpecificMessageFormats
		// 	//DownlinkInfo *DownlinkInfo
		// 	ClientCertificate string
		// }
Yann Garcia's avatar
Yann Garcia committed
		sensor, err := registeredIotPlatformsMap[requestedIotPlatformId].oneM2M.OneM2M_create(sensor, "")
Yann Garcia's avatar
Yann Garcia committed
		if err != nil {
			return deviceResp, err
		}
	}

Yann Garcia's avatar
Yann Garcia committed
	devicesMap[device.DeviceId] = device
	devicesPerPlatformMap[device.DeviceId] = append(devicesPerPlatformMap[device.DeviceId], requestedIotPlatformId)
	platformPerUserTransportIdMap[requestedIotPlatformId] = append(platformPerUserTransportIdMap[requestedIotPlatformId], device.RequestedUserTransportId)
Yann Garcia's avatar
Yann Garcia committed
	deviceResp = device
	log.Debug("createDeviceWithIotPlatformId: deviceResp: ", deviceResp)

	return deviceResp, nil
}

func (tm *IotMgr) createDeviceWithRequestedUserTransportId(device DeviceInfo, requestedUserTransportId string) (deviceResp DeviceInfo, err error) {
	log.Info(">>> createDeviceWithRequestedUserTransportId: ", device)

	if val, ok := platformPerUserTransportIdMap[requestedUserTransportId]; ok {
		deviceResp, err = tm.createDeviceWithIotPlatformId(device, val[0])
	} else {
		err = errors.New("Invalid UserTransportId")
Yann Garcia's avatar
Yann Garcia committed
		log.Error("createDeviceWithIotPlatformId: ", err.Error())
		return deviceResp, err
Yann Garcia's avatar
Yann Garcia committed
	log.Info("createDeviceWithIotPlatformId: deviceResp: ", deviceResp)
Yann Garcia's avatar
Yann Garcia committed
	return deviceResp, nil
Yann Garcia's avatar
Yann Garcia committed

func (tm *IotMgr) resetMaps(iotPlatformId string) {
	log.Info(">>> resetMaps: ", iotPlatformId)

	// Free resources from devicesMap map
	// Remove all devices for this IoT platform
	log.Info("resetMaps: devicesMap (before): ", devicesMap)
	for _, deviceId := range devicesPerPlatformMap[iotPlatformId] {
		delete(devicesMap, deviceId)
	} // End of 'for' statement
	log.Info("resetMaps: devicesMap (after): ", devicesMap)

	// Remove all devices for this IoT platform
	log.Info("resetMaps: devicesPerPlatformMap (before): ", devicesPerPlatformMap)
	delete(devicesPerPlatformMap, iotPlatformId)
	log.Info("resetMaps: devicesPerPlatformMap (after): ", devicesPerPlatformMap)

	log.Info("resetMaps: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap)
	for _, rule := range platformPerUserTransportIdMap {
		for idx, pltf := range rule {
			if pltf == iotPlatformId {
				rule = append(rule[:idx], rule[idx+1:]...)
			}
		} // End of 'for' statement
	} // End of 'for' statement
	log.Info("resetMaps: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap)
}

func (tm *IotMgr) populateDevicesPerIotPlatforms(iotPlatformInfo IotPlatformInfo) (err error) {
	log.Info(">>> populateDevicesPerIotPlatforms: ", iotPlatformInfo)

	if iotPlatformInfo.oneM2M == nil {
		log.Info("populateDevicesPerIotPlatforms: Nothing to do")
		return nil
	}

	// Reset maps
	tm.resetMaps(iotPlatformInfo.IotPlatformId)

	sensors, err := iotPlatformInfo.oneM2M.SensorDiscoveryInfoAll()
	if err != nil {
		log.Error("populateDevicesPerIotPlatforms: ", err)
		return err
	}
	log.Info("populateDevicesPerIotPlatforms: sensors: ", sensors)

	for _, sensor := range sensors {
		var deviceInfo = DeviceInfo{
			DeviceId: sensor.SensorIdentifier,
			Enabled:  true,
		}
		deviceInfo.DeviceMetadata = make([]KeyValuePair, len(sensor.SensorCharacteristicList))
		for i, c := range sensor.SensorCharacteristicList {
			deviceInfo.DeviceMetadata[i] = KeyValuePair{Key: c.CharacteristicName, Value: c.CharacteristicValue}
		} // End of 'for' statement

		devicesMap[deviceInfo.DeviceId] = deviceInfo
		devicesPerPlatformMap[iotPlatformInfo.IotPlatformId] = append(devicesPerPlatformMap[iotPlatformInfo.IotPlatformId], deviceInfo.DeviceId)
	} // End of 'for' statement
	log.Info("populateDevicesPerIotPlatforms: devicesMap: ", devicesMap)
	log.Info("populateDevicesPerIotPlatforms: devicesPerPlatformMap: ", devicesPerPlatformMap)

	return nil
}