diff --git a/go-apps/meep-iot/sbi/iot-sbi.go b/go-apps/meep-iot/sbi/iot-sbi.go index abdc11e9c6eb463f0ecc511f6a977c8139ca84e5..9157b58853a70f0aff5c44a7fb8461a3c9094125 100644 --- a/go-apps/meep-iot/sbi/iot-sbi.go +++ b/go-apps/meep-iot/sbi/iot-sbi.go @@ -371,10 +371,11 @@ func GetDevices() (devices []DeviceInfo, err error) { func GetDevice(deviceId string) (device DeviceInfo, err error) { log.Info(">>> sbi.GetDevice: ", deviceId) - _, err = sbi.iotMgr.GetDevice(deviceId) + d, err := sbi.iotMgr.GetDevice(deviceId) if err != nil { return device, err } + device = convertDeviceInfoFromIotMgr(d) return device, nil } @@ -460,6 +461,29 @@ func convertIotPlatformInfoToIotMgr(val IotPlatformInfo) (item tm.IotPlatformInf return item } +func convertDeviceInfoFromIotMgr(dev tm.DeviceInfo) (device DeviceInfo) { + //log.Debug(">>> convertDeviceInfoFromIotMgr") + + device = DeviceInfo{ + DeviceAuthenticationInfo: dev.DeviceAuthenticationInfo, + Gpsi: dev.Gpsi, + Pei: dev.Pei, + Supi: dev.Supi, + Msisdn: dev.Msisdn, + Imei: dev.Imei, + Imsi: dev.Imsi, + Iccid: dev.Iccid, + DeviceId: dev.DeviceId, + RequestedIotPlatformId: dev.RequestedIotPlatformId, + RequestedUserTransportId: dev.RequestedUserTransportId, + ClientCertificate: dev.ClientCertificate, + Enabled: dev.Enabled, + } + //log.Debug("convertDeviceInfoFromIotMgr: device: ", device) + + return device +} + func convertDeviceInfosFromIotMgr(devicesList []tm.DeviceInfo) (devices []DeviceInfo, err error) { //log.Debug(">>> convertDeviceInfosFromIotMgr") diff --git a/go-apps/meep-iot/server/meep-iot.go b/go-apps/meep-iot/server/meep-iot.go index 59bf6531bbdff4a1b262e85b95d31a33d1c0eee3..2b403d0271bfc3aee7cc8eff072b865f2d438370 100644 --- a/go-apps/meep-iot/server/meep-iot.go +++ b/go-apps/meep-iot/server/meep-iot.go @@ -824,13 +824,22 @@ func registereddevicesByIdGET(w http.ResponseWriter, r *http.Request) { registeredDeviceIdParamStr := vars["registeredDeviceId"] log.Debug("registereddevicesByIdGET: registeredDeviceIdParamStr: ", registeredDeviceIdParamStr) - _, err := sbi.GetDevice(registeredDeviceIdParamStr) + device, err := sbi.GetDevice(registeredDeviceIdParamStr) if err != nil { errHandlerProblemDetails(w, err.Error(), http.StatusNotFound) return } + d := convertDeviceInfoFromSbi(device) + log.Debug("registereddevicesByIdGET: d=", d) - // FIXME FSCOM To do + // Prepare & send the response + jsonResponse, err := json.Marshal(d) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + log.Debug("registereddevicesByIdGET: jsonResponse=", string(jsonResponse)) + fmt.Fprint(w, string(jsonResponse)) w.WriteHeader(http.StatusOK) } @@ -897,7 +906,6 @@ func registereddevicesGET(w http.ResponseWriter, r *http.Request) { fmt.Fprint(w, string(jsonResponse)) w.WriteHeader(http.StatusOK) } - func applyFiltering(devicesList []sbi.DeviceInfo, filter []string) (devices []DeviceInfo, err error) { log.Debug(">>> applyFiltering") devices, err = convertDeviceInfosFromSbi_with_filter(devicesList, filter) @@ -1113,6 +1121,27 @@ func convertIotPlatformInfoToSbi(val IotPlatformInfo) (item sbi.IotPlatformInfo) return item } +func convertDeviceInfoFromSbi(dev sbi.DeviceInfo) (device DeviceInfo) { + device = DeviceInfo{ + DeviceAuthenticationInfo: dev.DeviceAuthenticationInfo, + Gpsi: dev.Gpsi, + Pei: dev.Pei, + Supi: dev.Supi, + Msisdn: dev.Msisdn, + Imei: dev.Imei, + Imsi: dev.Imsi, + Iccid: dev.Iccid, + DeviceId: dev.DeviceId, + RequestedIotPlatformId: dev.RequestedIotPlatformId, + RequestedUserTransportId: dev.RequestedUserTransportId, + ClientCertificate: dev.ClientCertificate, + Enabled: dev.Enabled, + } + //log.Debug("convertDeviceInfosFromSbi: devices: ", devices) + + return device +} + func convertDeviceInfosFromSbi(devicesList []sbi.DeviceInfo) (devices []DeviceInfo, err error) { devices = make([]DeviceInfo, len(devicesList)) for idx, item := range devicesList { // FIXME FSCOM Add Filter diff --git a/go-packages/meep-iot-mgr/iot-mgr.go b/go-packages/meep-iot-mgr/iot-mgr.go index 6c9313d54963609e08193939baebbf2d3d20c154..913af8812d0320c372efcb15b5bc6a6cc304c7ec 100644 --- a/go-packages/meep-iot-mgr/iot-mgr.go +++ b/go-packages/meep-iot-mgr/iot-mgr.go @@ -23,7 +23,6 @@ import ( "io" "io/ioutil" "net/http" - "reflect" "strconv" "strings" "sync" @@ -136,6 +135,13 @@ var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discover var devicesMap = map[string]DeviceInfo{} // Map device by deviceId var devicesPerPlatformMap = map[string][]string{} // Map deviceIds per platform +// Timer to refresh devices list for all IoT platform +const refreshTickerExpeary = 10 // 10 seconds + +var mutex sync.Mutex +var wg sync.WaitGroup +var refreshTicker *time.Ticker = nil + // Enable profiling const profiling = false @@ -174,10 +180,41 @@ func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) { // DeleteIotMgr - func (tm *IotMgr) DeleteIotMgr() (err error) { + stopRefreshTicker() return nil } +func startRefreshTicker() { + log.Debug("Starting refresh loop") + refreshTicker = time.NewTicker(refreshTickerExpeary * time.Second) + go func() { + for range refreshTicker.C { + // Refresh the list of devices + wg.Add(1) + err := populateDevicesPerIotPlatforms() + if err != nil { + log.Error(err) + } + wg.Done() + } + }() +} + +func stopRefreshTicker() { + if refreshTicker != nil { + // Refresh the list of devices + wg.Add(1) + refreshTicker.Stop() + refreshTicker = nil + registeredIotPlatformsMap = nil + devicesMap = nil + devicesPerPlatformMap = nil + wg.Done() + log.Debug("Refresh loop stopped") + } +} + func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() @@ -188,6 +225,10 @@ func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo } // else, Skip disabled platform + if len(registeredIotPlatformsMap) == 1 { + startRefreshTicker() + } + if profiling { now := time.Now() log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"])) @@ -211,6 +252,9 @@ func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { } if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok { delete(registeredIotPlatformsMap, iotPlatformId) + if len(registeredIotPlatformsMap) == 0 { + stopRefreshTicker() + } } if profiling { @@ -227,23 +271,17 @@ func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { log.Info(">>> GetDevices") + wg.Wait() + log.Info("GetDevices: After Synchro") + devices = make([]DeviceInfo, 0) if len(registeredIotPlatformsMap) == 0 { return devices, nil } - // Refresh the list of devices - for _, iotPlatform := range registeredIotPlatformsMap { - log.Info("GetDevices: processing: ", iotPlatform.IotPlatformId) - err := populateDevices(iotPlatform) - if err != nil { - log.Error("GetDevices: ", err) - continue - } - for _, v := range devicesPerPlatformMap[iotPlatform.IotPlatformId] { - log.Info("GetDevices: adding device: ", v) - devices = append(devices, devicesMap[v]) - } + for _, v := range devicesMap { + log.Info("GetDevices: adding device: ", v) + devices = append(devices, v) } // End of 'for' statement log.Info("GetDevices: devices: ", devices) @@ -251,6 +289,7 @@ func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { now := time.Now() log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"])) } + return devices, nil } @@ -261,11 +300,13 @@ func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { log.Info(">>> GetDevice: deviceId: ", deviceId) + wg.Wait() + log.Info("GetDevice: After Synchro") + if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err } else { - log.Info(" GetDevice: device: ", val) device = val } @@ -273,9 +314,46 @@ func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { now := time.Now() log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"])) } + log.Info("GetDevice: device: ", device) + return device, nil } +/* + * func populateDevicesPerIotPlatforms IoT devices for all registered Iot platform + * @return {struct} nil on success, error otherwise + */ +func populateDevicesPerIotPlatforms() error { + + mutex.Lock() + defer mutex.Unlock() + + if profiling { + profilingTimers["populateDevicesPerIotPlatforms"] = time.Now() + } + + if len(registeredIotPlatformsMap) == 0 { + return nil + } + + // Refresh the list of devices for all registered Iot platform + for _, iotPlatform := range registeredIotPlatformsMap { + //log.Debug("populateDevicesPerIotPlatforms: processing: ", iotPlatform.IotPlatformId) + err := populateDevices(iotPlatform) + if err != nil { + log.Error("populateDevicesPerIotPlatforms: ", err) + continue + } + } // End of 'for' statement + + if profiling { + now := time.Now() + log.Debug("populateDevicesPerIotPlatforms: ", now.Sub(profilingTimers["populateDevicesPerIotPlatforms"])) + } + + return nil +} + /* * func PopulateDevices IoT devices for the specified Iot platform * @param {string} iotPlatformId contains the IoT platform identifier @@ -291,9 +369,9 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { // 1. Get the list of the AE // Build the URL t := registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].CustomServicesTransportInfo[0] - log.Info("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) + //log.Debug("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name - log.Info("populateDevices: url=", url) + //log.Debug("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} @@ -311,7 +389,7 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { log.Error("populateDevices: ", err.Error()) return err } - log.Debug("populateDevices: response: ", string(response)) + //log.Debug("populateDevices: response: ", string(response)) var oneM2M_uril map[string][]string err = json.Unmarshal(response, &oneM2M_uril) @@ -319,8 +397,8 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { log.Error("populateDevices: ", err.Error()) return err } - log.Debug("populateDevices: oneM2M_uril: ", len(oneM2M_uril)) - log.Debug(oneM2M_uril) + //log.Debug("populateDevices: oneM2M_uril: ", len(oneM2M_uril)) + //log.Debug(oneM2M_uril) if _, ok := oneM2M_uril["m2m:uril"]; !ok { err := errors.New("populateDevices: Key not found: m2m:uril") log.Error(err.Error()) @@ -328,10 +406,10 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { } // Loop for each CIN and build the device list for _, v := range oneM2M_uril["m2m:uril"] { - log.Info("populateDevices: Processing key: ", v) + //log.Debug("populateDevices: Processing key: ", v) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + v - log.Info("populateDevices: url=", url) + //log.Debug("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} @@ -348,7 +426,7 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { log.Error("populateDevices: ", err.Error()) return err } - log.Debug("populateDevices: response: ", string(response)) + //log.Debug("populateDevices: response: ", string(response)) var oneM2M_cin map[string]map[string]interface{} err = json.Unmarshal(response, &oneM2M_cin) if err != nil { @@ -359,7 +437,7 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { //log.Debug("populateDevices: len(oneM2M_cin): ", len(oneM2M_cin)) //log.Debug("populateDevices: oneM2M_cin: ", oneM2M_cin) for _, m := range oneM2M_cin { - //log.Debug("==> ", i, " value is ", m) + ////log.Debug("==> ", i, " value is ", m) var device = DeviceInfo{} device.RequestedIotPlatformId = iotPlatformInfo.IotPlatformId device.RequestedUserTransportId = registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].UserTransportInfo[0].Id @@ -368,8 +446,8 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { // m is a map[string]interface. // loop over keys and values in the map. for k, v := range m { - log.Debug(k, " value is ", v) - log.Debug("populateDevices: type(v): ", reflect.TypeOf(v)) + //log.Debug(k, " value is ", v) + //log.Debug("populateDevices: type(v): ", reflect.TypeOf(v)) if k == "rn" { if item, ok := v.(string); ok { device.DeviceId = item @@ -431,6 +509,7 @@ func populateDevices(iotPlatformInfo IotPlatformInfo) error { now := time.Now() log.Debug("populateDevices: ", now.Sub(profilingTimers["populateDevices"])) } + return nil }