Commit 53b386e6 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

postgres transaction concurrency fixes

parent 02fc12a3
Loading
Loading
Loading
Loading
+8 −5
Original line number Diff line number Diff line
@@ -124,6 +124,14 @@ func runAutomation() {
	var poaMap map[string]*am.Poa
	var err error

	ge.mutex.Lock()
	defer ge.mutex.Unlock()

	// Movement - Update UE positions & recalculate UE measurements
	if ge.automation[AutoTypeMovement] {
		runAutoMovement()
	}

	// Get UE & POA geodata
	ueMap, err = ge.assetMgr.GetAllUe()
	if err != nil {
@@ -136,11 +144,6 @@ func runAutomation() {
		return
	}

	// Movement
	if ge.automation[AutoTypeMovement] {
		runAutoMovement()
	}

	// Mobility
	if ge.automation[AutoTypeMobility] {
		runAutoMobility(ueMap)
+5 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"net/http"
	"os"
	"strings"
	"sync"
	"time"

	dataModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model"
@@ -93,6 +94,7 @@ type GisEngine struct {
	automation       map[string]bool
	automationTicker *time.Ticker
	updateTime       time.Time
	mutex            sync.Mutex
}

var ge *GisEngine
@@ -203,6 +205,9 @@ func Run() (err error) {

// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	ge.mutex.Lock()
	defer ge.mutex.Unlock()

	switch msg.Message {
	case mq.MsgScenarioActivate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
+37 −3
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package gisassetmgr
import (
	"database/sql"
	"errors"
	"sort"
	"strings"
	"time"

@@ -1312,7 +1313,15 @@ func (am *AssetMgr) DeleteAllUe() (err error) {
		proStart = time.Now()
	}

	_, err = am.db.Exec(`DELETE FROM ` + UeTable)
	// !!! IMPORTANT NOTE !!!
	// In order to prevent transaction deadlock, make sure delete order is consistent;
	// in this case alphabetically using UE name.
	_, err = am.db.Exec(`DELETE FROM ` + UeTable + `
	WHERE name IN (
		SELECT name
		FROM ` + UeTable + `
		ORDER BY name COLLATE "C"
	)`)
	if err != nil {
		log.Error(err.Error())
		return err
@@ -1432,6 +1441,9 @@ func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) {
	}

	// Set new position
	// !!! IMPORTANT NOTE !!!
	// In order to prevent transaction deadlock, make sure update order is consistent;
	// in this case alphabetically using UE name.
	query := `UPDATE ` + UeTable + `
	SET position =
		CASE
@@ -1446,7 +1458,14 @@ func (am *AssetMgr) AdvanceAllUePosition(increment float32) (err error) {
				END
		END,
		path_fraction = (path_fraction + ($1 * path_increment)) %2
	WHERE path_velocity > 0`
	FROM (
		SELECT name
		FROM ` + UeTable + `
		WHERE path_velocity > 0
		ORDER BY name COLLATE "C"
		FOR UPDATE
	) as moving_ue
	WHERE ` + UeTable + `.name = moving_ue.name`
	_, err = am.db.Exec(query, increment)
	if err != nil {
		log.Error(err.Error())
@@ -1682,8 +1701,23 @@ func (am *AssetMgr) updateUeInfo(ueMap map[string]*Ue) (err error) {
		_ = tx.Commit()
	}()

	// !!! IMPORTANT NOTE !!!
	// In order to prevent transaction deadlock, make sure update order is consistent;
	// in this case alphabetically using UE name.

	// Sort UE names alphabetically
	ueNames := make([]string, len(ueMap))
	i := 0
	for ueName := range ueMap {
		ueNames[i] = ueName
		i++
	}
	sort.Strings(ueNames)

	// For each UE, run POA Selection & Measurement calculations
	for ueName, ue := range ueMap {
	for _, ueName := range ueNames {
		// Get UE info
		ue := ueMap[ueName]

		// Update POA Selection
		selectedPoa := selectPoa(ue)