Skip to content
iot-mgr.go 9.44 KiB
Newer Older
/*
 * Copyright (c) 2024  The AdvantEDGE Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package iotmgr

import (
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"strconv"
	"sync"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	uuid "github.com/google/uuid"
	"github.com/gorilla/mux"
)

// IOT Manager
type IotMgr struct {
	name      string
	namespace string
	mutex     sync.Mutex
}

type IotPlatformInfo struct {
	IotPlatformId               string
	UserTransportInfo           []MbTransportInfo
	CustomServicesTransportInfo []TransportInfo
	Enabled                     bool
}

type MbTransportInfo struct {
	Id               string
	Name             string
	Description      string
	Type_            *string
	Protocol         string
	Version          string
	Endpoint         *EndPointInfo
	Security         *SecurityInfo
	ImplSpecificInfo *ImplSpecificInfo
}

type TransportInfo struct {
	Id               string
	Name             string
	Description      string
	Type_            *string
	Protocol         string
	Version          string
	Endpoint         *EndPointInfo
	Security         *SecurityInfo
	ImplSpecificInfo string
}

type EndPointInfo struct {
	Uris        []string
	Fqdn        []string
	Addresses   []Addresses
	Alternative string
}

type Addresses struct {
	Host string
	Port int32
}

type SecurityInfo struct {
	OAuth2Info *OAuth2Info
	Extensions string
}

type OAuth2Info struct {
	GrantTypes    []string
	TokenEndpoint string
}

type ImplSpecificInfo struct {
	EventTopics    []string
	UplinkTopics   []string
	DownlinkTopics []string
}

type DeviceInfo struct {
	DeviceAuthenticationInfo string
	//DeviceMetadata []KeyValuePair
	Gpsi     string
	Pei      string
	Supi     string
	Msisdn   string
	Imei     string
	Imsi     string
	Iccid    string
	DeviceId string
	//RequestedMecTrafficRule []TrafficRuleDescriptor
	RequestedIotPlatformId   string
	RequestedUserTransportId string
	//DeviceSpecificMessageFormats *DeviceSpecificMessageFormats
	//DownlinkInfo *DownlinkInfo
	ClientCertificate string
	Enabled           bool
}

var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform
var devicesMap = map[string]DeviceInfo{}                     // Map device by deviceId
var devicesPerPlatformMap = map[string][]string{}            // Map deviceIds per platform

// Enable profiling
const profiling = false

var profilingTimers map[string]time.Time

const (
	headerAccept      = "application/json"
	headerContentType = "application/json"
)

// Profiling init
func init() {
	if profiling {
		profilingTimers = make(map[string]time.Time)
	}
}

// NewIotMgr - Creates and initializes a new IOT Traffic Manager
func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) {
	if name == "" {
		err = errors.New("Missing connector name")
		return nil, err
	}

	// Create new Traffic Manager
	tm = new(IotMgr)
	tm.name = name
	if namespace != "" {
		tm.namespace = namespace
	} else {
		tm.namespace = "default"
	}

	return tm, nil
}

// DeleteIotMgr -
func (tm *IotMgr) DeleteIotMgr() (err error) {

	return nil
}

func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) {
	if profiling {
		profilingTimers["RegisterIotPlatformInfo"] = time.Now()
	}

	log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo)
	if iotPlatformInfo.Enabled {
		registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo
	} // else, Skip disabled platform

	if profiling {
		now := time.Now()
		log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"]))
	}
	return nil
}

func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) {
	if profiling {
		profilingTimers["DeregisterIotPlatformInfo"] = time.Now()
	}

	log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId)
	// Remove the list of the devices for this IoT platform
	if val, ok := devicesPerPlatformMap[iotPlatformId]; ok {
		// Free resources from devicesMap map
		for _, dev := range val {
			delete(devicesMap, dev)
		} // End of 'for' statement
		delete(devicesPerPlatformMap, iotPlatformId)
	}
	if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok {
		delete(registeredIotPlatformsMap, iotPlatformId)
	}

	if profiling {
		now := time.Now()
		log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"]))
	}
	return nil
}

func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) {
	if profiling {
		profilingTimers["GetDevices"] = time.Now()
	}

	log.Info(">>> GetDevices: iotPlatformId")

	if len(registeredIotPlatformsMap) == 0 {
		return devices, nil
	}

	// Refresh the list of devices
	for _, val := range registeredIotPlatformsMap {
		err := populateDevices(val.IotPlatformId)
		if err != nil {
			continue
		}
	}
	log.Info("GetDevices: devices: ", devices)

	if profiling {
		now := time.Now()
		log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"]))
	}
	return devices, nil
}

func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) {
	if profiling {
		profilingTimers["GetDevice"] = time.Now()
	}

	log.Info(">>> GetDevice: deviceId: ", deviceId)

	if val, ok := devicesMap[deviceId]; !ok {
		err = errors.New("Wrong Device identifier")
		return device, err
	} else {
		log.Info(" GetDevice: device: ", val)
		device = val
	}

	if profiling {
		now := time.Now()
		log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"]))
	}
	return device, nil
}

/*
 * func (tm *IotMgr) PopulateDevices IoT devices for the specified Iot platform
 * @param {string} iotPlatformId contains the IoT platform identifier
 * @return {struct} nil on success, error otherwise
 */
func populateDevices(iotPlatformId string) error {
	if profiling {
		profilingTimers["populateDevices"] = time.Now()
	}

	log.Info(">>> populateDevices: iotPlatformId=", iotPlatformId)

	// Build the URL
	t := registeredIotPlatformsMap[iotPlatformId].CustomServicesTransportInfo[0]
	log.Info("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0])
	url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name
	// Builkd the headers
	var headers = http.Header{}
	headers["Accept"] = []string{headerAccept}
	headers["ContentType"] = []string{headerContentType}
	headers["X-M2M-Origin"] = []string{"CAdmin"}
	headers["X-M2M-RI"] = []string{uuid.New().String()}
	headers["X-M2M-RVI"] = []string{t.Version}
	// Send the request
	response, err := sendRequest("GET", url, headers, nil, nil, nil, 200)
	if err != nil {
		log.Error("populateDevices: ", err.Error())
		return err
	}
	log.Debug("populateDevices: response: ", string(response))

	var oneM2M_data map[string]interface{}
	err = json.Unmarshal(response, &oneM2M_data)
	if err != nil {
		log.Error("populateDevices: ", err.Error())
		return err
	}
	log.Debug("populateDevices: oneM2M_data: ", len(oneM2M_data))
	log.Debug(oneM2M_data)
	for k := range oneM2M_data {
		log.Info("populateDevices: Processing key: ", k)
		var device = DeviceInfo{
			DeviceId:               "",
			RequestedIotPlatformId: "",
		}

		// type DeviceInfo struct {
		// 	DeviceAuthenticationInfo string
		// 	//DeviceMetadata []KeyValuePair
		// 	Gpsi     string
		// 	Pei      string
		// 	Supi     string
		// 	Msisdn   string
		// 	Imei     string
		// 	Imsi     string
		// 	Iccid    string
		// 	DeviceId string
		// 	//RequestedMecTrafficRule []TrafficRuleDescriptor
		// 	RequestedIotPlatformId   string
		// 	RequestedUserTransportId string
		// 	//DeviceSpecificMessageFormats *DeviceSpecificMessageFormats
		// 	//DownlinkInfo *DownlinkInfo
		// 	ClientCertificate string
		// 	Enabled           bool
		// }
		devicesMap[device.DeviceId] = device
		devicesPerPlatformMap[device.RequestedIotPlatformId] = append(devicesPerPlatformMap[device.RequestedIotPlatformId], device.DeviceId)
	} // End of 'for' statement
	// for _, val := range devices {
	// }
	// log.Info("populateDevices: devices: ", devices)

	if profiling {
		now := time.Now()
		log.Debug("populateDevices: ", now.Sub(profilingTimers["populateDevices"]))
	}
	return nil
}

func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) {
	log.Debug(">>> sendRequest: url: ", url)
	log.Debug(">>> sendRequest: headers: ", headers)

	req, err := http.NewRequest(method, url, body)
	if err != nil || req == nil {
		return nil, err
	}
	if vars != nil {
		req = mux.SetURLVars(req, vars)
	}
	if query != nil {
		q := req.URL.Query()
		for k, v := range query {
			q.Add(k, v)
		}
		req.URL.RawQuery = q.Encode()
	}
	req.Header = headers
	req.Close = true

	rr, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}

	// Check the status code is what we expect.
	if status := rr.StatusCode; status != code {
		s := fmt.Sprintf("Wrong status code - got %v want %v", status, code)
		return nil, errors.New(s)
	}
	responseData, err := ioutil.ReadAll(rr.Body)
	if err != nil {
		return nil, err
	}

	return responseData, nil
}