Loading go-apps/meep-app-enablement/server/app-enablement.go +1 −1 Original line number Diff line number Diff line Loading @@ -177,7 +177,7 @@ func Init() (err error) { } log.Info("Service Management created") // Initialize App Support err = as.Init(sandboxName, mepName, hostUrl, mqLocal, redisAddr, &mutex, capifClient) err = as.Init(sandboxName, mepName, hostUrl, mqLocal, redisAddr, &mutex) if err != nil { return err } Loading go-apps/meep-app-enablement/server/app-support/app-support.go +1 −71 Original line number Diff line number Diff line Loading @@ -29,7 +29,6 @@ import ( "time" sm "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/service-mgmt" cc "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/capif-client" apps "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-applications" dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading Loading @@ -84,21 +83,19 @@ var subMgr *subs.SubscriptionMgr var appStore *apps.ApplicationStore var appInfoMap map[string]map[string]string var gracefulTerminateMap = map[string]chan bool{} var capifClient *cc.CapifClient func notImplemented(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusNotImplemented) } func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex, capifCl *cc.CapifClient) (err error) { func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex) (err error) { redisAddr = redisAddr_ sandboxName = sandbox hostUrl = host mqLocal = msgQueue mutex = globalMutex mepName = mep capifClient = capifCl // Initialize app info cache appInfoMap = make(map[string]map[string]string) Loading Loading @@ -1517,34 +1514,6 @@ func appRegistrationPOST(w http.ResponseWriter, r *http.Request) { log.Info("appRegistrationPOST: Generated resource URI:", resourceURI) w.Header().Set("Location", resourceURI) // CAPIF integration — publish registration asynchronously if capifClient != nil && capifClient.IsReady() { go func(appInfo AppInfo) { host := hostUrl.Hostname() port := 443 if hostUrl.Port() != "" { if p, err := strconv.Atoi(hostUrl.Port()); err == nil { port = p } } uri := basePath + "registrations/" + appInfo.AppInstanceId apiId, err := capifClient.PublishServiceAPI( appInfo.AppName+"-"+sandboxName, "v2", uri, appInfo.AppName+" Registration", "MEC app registration: "+appInfo.AppName, host, port, ) if err != nil { log.Error("CAPIF publish failed for registration ", appInfo.AppInstanceId, ": ", err.Error()) return } storeCapifMapping("reg:"+appInfo.AppInstanceId, apiId) }(appInfo) } // Send JSON response with status 201 Created jsonResponse := convertAppInfoToJson(&appInfo) w.WriteHeader(http.StatusCreated) Loading Loading @@ -1744,19 +1713,6 @@ func appRegistrationDELETE(w http.ResponseWriter, r *http.Request) { return } // CAPIF integration — unpublish registration asynchronously if capifClient != nil && capifClient.IsReady() { go func(appInstanceId string) { apiId := getCapifMapping("reg:" + appInstanceId) if apiId != "" { if err := capifClient.UnpublishServiceAPI(apiId); err != nil { log.Error("CAPIF unpublish failed for registration ", appInstanceId, ": ", err.Error()) } deleteCapifMapping("reg:" + appInstanceId) } }(appInstanceId) } // Send response on successful deletion of registration w.WriteHeader(http.StatusNoContent) } Loading Loading @@ -2192,29 +2148,3 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) { w.WriteHeader(code) fmt.Fprint(w, jsonResponse) } // CAPIF mapping helpers — store appInstanceId -> CAPIF apiId in Redis func storeCapifMapping(localId string, apiId string) { key := baseKey + "capif:reg:" + localId err := rc.JSONSetEntry(key, ".", "\""+apiId+"\"") if err != nil { log.Error("Failed to store CAPIF mapping for ", localId, ": ", err.Error()) } } func getCapifMapping(localId string) string { key := baseKey + "capif:reg:" + localId val, err := rc.JSONGetEntry(key, ".") if err != nil || val == "" { return "" } return strings.Trim(val, "\"") } func deleteCapifMapping(localId string) { key := baseKey + "capif:reg:" + localId err := rc.JSONDelEntry(key, ".") if err != nil { log.Error("Failed to delete CAPIF mapping for ", localId, ": ", err.Error()) } } go-apps/meep-app-enablement/server/capif-client/capif-client.go +148 −11 Original line number Diff line number Diff line Loading @@ -44,10 +44,15 @@ const ( // CapifClient manages the lifecycle of a CAPIF provider session type CapifClient struct { mu sync.RWMutex reInitMu sync.Mutex // serializes concurrent re-init attempts config CapifConfig enabled bool ready bool // readyCh is closed exactly once when the client first becomes ready readyCh chan struct{} closeReadyOnce sync.Once // Auth state from Register server adminToken string refreshToken string Loading Loading @@ -88,6 +93,10 @@ type CapifClient struct { publishedAPIs map[string]string } // errSessionExpired is returned by doPublish when the server rejects the request with 401 Unauthorized, // indicating the CAPIF session (access token / certs) has expired and must be re-established. var errSessionExpired = errors.New("CAPIF session expired (401 Unauthorized)") // capifDialContext returns a DialContext function that resolves CAPIF_HOSTNAME // to CAPIF_REGISTER_HOSTNAME. This is needed because the CAPIF core hostname // (e.g. "capifcore") is not DNS-resolvable from inside the Kubernetes pod, Loading @@ -111,6 +120,7 @@ func NewCapifClient(cfg CapifConfig) *CapifClient { config: cfg, enabled: cfg.Enabled, publishedAPIs: make(map[string]string), readyCh: make(chan struct{}), } c.registerHTTPClient = &http.Client{ Timeout: 30 * time.Second, Loading Loading @@ -172,6 +182,7 @@ func (c *CapifClient) Init() error { c.mu.Lock() c.ready = true c.mu.Unlock() c.closeReadyOnce.Do(func() { close(c.readyCh) }) log.Info("CAPIF client initialized successfully") return nil } Loading @@ -186,6 +197,18 @@ func (c *CapifClient) IsReady() bool { return c.enabled && c.ready } // WaitUntilReady blocks until the CAPIF client finishes initialization or timeout elapses. // Returns true if ready, false if timeout elapsed first. // Goroutines spawned before Init() completes will block here rather than being silently dropped. func (c *CapifClient) WaitUntilReady(timeout time.Duration) bool { select { case <-c.readyCh: return true case <-time.After(timeout): return false } } // login performs POST /login with Basic Auth (admin credentials) against the Register server func (c *CapifClient) login() error { url := fmt.Sprintf("https://%s:%s/login", c.config.RegisterHostname, c.config.RegisterPort) Loading Loading @@ -422,11 +445,11 @@ func (c *CapifClient) onboardProvider() error { } // Build TLS mutual auth clients with custom dialer for CAPIF hostname resolution c.apfTLSClient, err = buildMutualTLSClient(c.apfCert, c.apfKey, c.caRoot, c.capifDialContext) c.apfTLSClient, err = buildMutualTLSClient(c.apfCert, c.apfKey, c.capifDialContext) if err != nil { return fmt.Errorf("failed to build APF TLS client: %w", err) } c.amfTLSClient, err = buildMutualTLSClient(c.amfCert, c.amfKey, c.caRoot, c.capifDialContext) c.amfTLSClient, err = buildMutualTLSClient(c.amfCert, c.amfKey, c.capifDialContext) if err != nil { return fmt.Errorf("failed to build AMF TLS client: %w", err) } Loading @@ -435,8 +458,8 @@ func (c *CapifClient) onboardProvider() error { return nil } // PublishServiceAPI publishes a service API to CAPIF. Returns the CAPIF apiId. func (c *CapifClient) PublishServiceAPI(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { // doPublish performs a single publish attempt using a snapshot of the current session state. func (c *CapifClient) doPublish(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { c.mu.RLock() if !c.ready { c.mu.RUnlock() Loading @@ -445,6 +468,7 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource apfId := c.apfId aefId := c.aefId publishUrl := c.ccfPublishUrl apfTLSClient := c.apfTLSClient c.mu.RUnlock() // Build the publish URL, replacing <apfId> placeholder if present Loading Loading @@ -511,7 +535,7 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource } req.Header.Set("Content-Type", "application/json") resp, err := c.apfTLSClient.Do(req) resp, err := apfTLSClient.Do(req) if err != nil { return "", fmt.Errorf("publish request failed: %w", err) } Loading @@ -522,6 +546,9 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource return "", err } if resp.StatusCode == http.StatusUnauthorized { return "", errSessionExpired } if resp.StatusCode != http.StatusCreated { return "", fmt.Errorf("publish returned %d: %s", resp.StatusCode, string(respBody)) } Loading @@ -544,6 +571,32 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource return apiId, nil } // PublishServiceAPI publishes a service API to CAPIF. Returns the CAPIF apiId. // Only a 401 Unauthorized response triggers re-init (session expiry). // 403 duplicate-name conflicts and transient network errors are returned as-is without re-init. func (c *CapifClient) PublishServiceAPI(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { apiId, err := c.doPublish(apiName, version, resourceURI, resourceName, description, interfaceAddr, interfacePort) if err == nil { return apiId, nil } // Only re-init on confirmed session expiry (401). For anything else (network errors, // 403 duplicate name, 404, etc.) just surface the error — no re-init. if !errors.Is(err, errSessionExpired) { return "", err } // Session expired — re-establish and retry once log.Warn("CAPIF session expired, attempting re-init") c.mu.Lock() c.ready = false c.mu.Unlock() if reInitErr := c.reInit(); reInitErr != nil { return "", fmt.Errorf("CAPIF re-init failed: %v (original: %v)", reInitErr, err) } return c.doPublish(apiName, version, resourceURI, resourceName, description, interfaceAddr, interfacePort) } // UnpublishServiceAPI removes a published service API from CAPIF func (c *CapifClient) UnpublishServiceAPI(apiId string) error { c.mu.RLock() Loading Loading @@ -571,6 +624,11 @@ func (c *CapifClient) UnpublishServiceAPI(apiId string) error { } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { // Already removed from CAPIF (e.g. pod restarted, provider re-onboarded) — not an error log.Warn("CAPIF unpublish: apiId=", apiId, " not found on server, already removed") return nil } if resp.StatusCode != http.StatusNoContent { body, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf("unpublish returned %d: %s", resp.StatusCode, string(body)) Loading @@ -580,6 +638,69 @@ func (c *CapifClient) UnpublishServiceAPI(apiId string) error { return nil } // reInit re-establishes the CAPIF session after cert or token expiry. // It is safe to call concurrently: only one goroutine performs the work at a time; // any goroutine that arrives after the session is already recovered returns immediately. func (c *CapifClient) reInit() error { c.reInitMu.Lock() defer c.reInitMu.Unlock() // If another goroutine already recovered the session while we waited, skip re-init c.mu.RLock() alreadyReady := c.ready c.mu.RUnlock() if alreadyReady { return nil } log.Info("CAPIF re-init starting...") // Best-effort offboard the stale provider registration if c.amfTLSClient != nil && c.apiProvDomId != "" { offboardURL := fmt.Sprintf("https://%s:%s/%s/%s", c.config.CapifHostname, c.config.CapifPort, c.ccfApiOnboardingUrl, c.apiProvDomId) if req, err := http.NewRequest("DELETE", offboardURL, nil); err == nil { req.Header.Set("Content-Type", "application/json") if resp, err := c.amfTLSClient.Do(req); err != nil { log.Warn("CAPIF re-init offboard failed (ignored): ", err.Error()) } else { resp.Body.Close() } } } // Smart re-auth: try getAuth with the existing user to avoid creating a new account authOk := false if c.userName != "" { if err := c.getAuth(); err == nil { authOk = true } } if !authOk { if err := c.login(); err != nil { return fmt.Errorf("re-init login failed: %w", err) } if err := c.createUser(); err != nil { return fmt.Errorf("re-init createUser failed: %w", err) } if err := c.getAuth(); err != nil { return fmt.Errorf("re-init getAuth failed: %w", err) } } if err := c.onboardProvider(); err != nil { return fmt.Errorf("re-init onboardProvider failed: %w", err) } c.mu.Lock() c.ready = true c.mu.Unlock() // No-op if readyCh was already closed during initial Init() c.closeReadyOnce.Do(func() { close(c.readyCh) }) log.Info("CAPIF re-init successful") return nil } // Cleanup offboards the provider, deletes the user, and cleans up resources func (c *CapifClient) Cleanup() { if !c.enabled { Loading @@ -596,6 +717,22 @@ func (c *CapifClient) Cleanup() { log.Info("CAPIF client cleanup starting...") // Unpublish all published services before offboarding the provider. // This prevents 403 "Already registered" errors on the next startup. c.mu.RLock() publishedCopy := make(map[string]string, len(c.publishedAPIs)) for k, v := range c.publishedAPIs { publishedCopy[k] = v } c.mu.RUnlock() for apiName, apiId := range publishedCopy { if err := c.UnpublishServiceAPI(apiId); err != nil { log.Warn("CAPIF cleanup: unpublish failed for ", apiName, ": ", err.Error()) } else { log.Info("CAPIF cleanup: unpublished ", apiName) } } // Offboard provider using AMF cert (matching notebook convention) if c.amfTLSClient != nil && c.apiProvDomId != "" { offboardURL := fmt.Sprintf("https://%s:%s/%s/%s", Loading go-apps/meep-app-enablement/server/capif-client/certs.go +6 −9 Original line number Diff line number Diff line Loading @@ -64,7 +64,7 @@ func generateCSR(cn string, org string, country string) ([]byte, *rsa.PrivateKey // certPEM is the signed certificate from CAPIF, privateKey is the corresponding RSA key, // caRootPEM is the CA root certificate from getAuth, and dialContext is a custom dialer // for CAPIF hostname resolution. func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, caRootPEM string, dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) (*http.Client, error) { func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) (*http.Client, error) { // Encode private key to PEM keyPEM := pem.EncodeToMemory(&pem.Block{ Type: "RSA PRIVATE KEY", Loading @@ -77,15 +77,12 @@ func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, caRootPEM return nil, errors.New("failed to parse client certificate: " + err.Error()) } // Parse CA root caCertPool := x509.NewCertPool() if !caCertPool.AppendCertsFromPEM([]byte(caRootPEM)) { return nil, errors.New("failed to parse CA root certificate") } // The CAPIF server may use a self-signed cert or a cert from any CA. // Matching the Python notebook (verify=False), we skip server cert verification // while still presenting our client certificate for mutual TLS authentication. tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: true, //nolint:gosec // intentional: matches notebook verify=False } transport := &http.Transport{ Loading go-apps/meep-app-enablement/server/service-mgmt/service-mgmt.go +14 −1 Original line number Diff line number Diff line Loading @@ -368,8 +368,21 @@ func appServicesPOST(w http.ResponseWriter, r *http.Request) { } // CAPIF integration — publish service asynchronously if capifClient != nil && capifClient.IsReady() { if capifClient != nil { go func(sInfo *ServiceInfo, appId string) { if !capifClient.WaitUntilReady(15 * time.Minute) { log.Warn("CAPIF publish skipped: client did not become ready in time for service ", sInfo.SerName) return } // If a stale CAPIF mapping exists in Redis (e.g. from a previous pod lifecycle), // unpublish it first to avoid 403 "Already registered service with same api name". if oldApiId := getCapifMapping(sInfo.SerInstanceId); oldApiId != "" { log.Info("CAPIF: removing stale mapping for ", sInfo.SerName, " (apiId=", oldApiId, ")") if err := capifClient.UnpublishServiceAPI(oldApiId); err != nil { log.Warn("CAPIF stale unpublish failed (ignored): ", err.Error()) } deleteCapifMapping(sInfo.SerInstanceId) } host := hostUrl.Hostname() port := 443 if hostUrl.Port() != "" { Loading Loading
go-apps/meep-app-enablement/server/app-enablement.go +1 −1 Original line number Diff line number Diff line Loading @@ -177,7 +177,7 @@ func Init() (err error) { } log.Info("Service Management created") // Initialize App Support err = as.Init(sandboxName, mepName, hostUrl, mqLocal, redisAddr, &mutex, capifClient) err = as.Init(sandboxName, mepName, hostUrl, mqLocal, redisAddr, &mutex) if err != nil { return err } Loading
go-apps/meep-app-enablement/server/app-support/app-support.go +1 −71 Original line number Diff line number Diff line Loading @@ -29,7 +29,6 @@ import ( "time" sm "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/service-mgmt" cc "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/capif-client" apps "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-applications" dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading Loading @@ -84,21 +83,19 @@ var subMgr *subs.SubscriptionMgr var appStore *apps.ApplicationStore var appInfoMap map[string]map[string]string var gracefulTerminateMap = map[string]chan bool{} var capifClient *cc.CapifClient func notImplemented(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusNotImplemented) } func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex, capifCl *cc.CapifClient) (err error) { func Init(sandbox string, mep string, host *url.URL, msgQueue *mq.MsgQueue, redisAddr_ string, globalMutex *sync.Mutex) (err error) { redisAddr = redisAddr_ sandboxName = sandbox hostUrl = host mqLocal = msgQueue mutex = globalMutex mepName = mep capifClient = capifCl // Initialize app info cache appInfoMap = make(map[string]map[string]string) Loading Loading @@ -1517,34 +1514,6 @@ func appRegistrationPOST(w http.ResponseWriter, r *http.Request) { log.Info("appRegistrationPOST: Generated resource URI:", resourceURI) w.Header().Set("Location", resourceURI) // CAPIF integration — publish registration asynchronously if capifClient != nil && capifClient.IsReady() { go func(appInfo AppInfo) { host := hostUrl.Hostname() port := 443 if hostUrl.Port() != "" { if p, err := strconv.Atoi(hostUrl.Port()); err == nil { port = p } } uri := basePath + "registrations/" + appInfo.AppInstanceId apiId, err := capifClient.PublishServiceAPI( appInfo.AppName+"-"+sandboxName, "v2", uri, appInfo.AppName+" Registration", "MEC app registration: "+appInfo.AppName, host, port, ) if err != nil { log.Error("CAPIF publish failed for registration ", appInfo.AppInstanceId, ": ", err.Error()) return } storeCapifMapping("reg:"+appInfo.AppInstanceId, apiId) }(appInfo) } // Send JSON response with status 201 Created jsonResponse := convertAppInfoToJson(&appInfo) w.WriteHeader(http.StatusCreated) Loading Loading @@ -1744,19 +1713,6 @@ func appRegistrationDELETE(w http.ResponseWriter, r *http.Request) { return } // CAPIF integration — unpublish registration asynchronously if capifClient != nil && capifClient.IsReady() { go func(appInstanceId string) { apiId := getCapifMapping("reg:" + appInstanceId) if apiId != "" { if err := capifClient.UnpublishServiceAPI(apiId); err != nil { log.Error("CAPIF unpublish failed for registration ", appInstanceId, ": ", err.Error()) } deleteCapifMapping("reg:" + appInstanceId) } }(appInstanceId) } // Send response on successful deletion of registration w.WriteHeader(http.StatusNoContent) } Loading Loading @@ -2192,29 +2148,3 @@ func errHandlerProblemDetails(w http.ResponseWriter, error string, code int) { w.WriteHeader(code) fmt.Fprint(w, jsonResponse) } // CAPIF mapping helpers — store appInstanceId -> CAPIF apiId in Redis func storeCapifMapping(localId string, apiId string) { key := baseKey + "capif:reg:" + localId err := rc.JSONSetEntry(key, ".", "\""+apiId+"\"") if err != nil { log.Error("Failed to store CAPIF mapping for ", localId, ": ", err.Error()) } } func getCapifMapping(localId string) string { key := baseKey + "capif:reg:" + localId val, err := rc.JSONGetEntry(key, ".") if err != nil || val == "" { return "" } return strings.Trim(val, "\"") } func deleteCapifMapping(localId string) { key := baseKey + "capif:reg:" + localId err := rc.JSONDelEntry(key, ".") if err != nil { log.Error("Failed to delete CAPIF mapping for ", localId, ": ", err.Error()) } }
go-apps/meep-app-enablement/server/capif-client/capif-client.go +148 −11 Original line number Diff line number Diff line Loading @@ -44,10 +44,15 @@ const ( // CapifClient manages the lifecycle of a CAPIF provider session type CapifClient struct { mu sync.RWMutex reInitMu sync.Mutex // serializes concurrent re-init attempts config CapifConfig enabled bool ready bool // readyCh is closed exactly once when the client first becomes ready readyCh chan struct{} closeReadyOnce sync.Once // Auth state from Register server adminToken string refreshToken string Loading Loading @@ -88,6 +93,10 @@ type CapifClient struct { publishedAPIs map[string]string } // errSessionExpired is returned by doPublish when the server rejects the request with 401 Unauthorized, // indicating the CAPIF session (access token / certs) has expired and must be re-established. var errSessionExpired = errors.New("CAPIF session expired (401 Unauthorized)") // capifDialContext returns a DialContext function that resolves CAPIF_HOSTNAME // to CAPIF_REGISTER_HOSTNAME. This is needed because the CAPIF core hostname // (e.g. "capifcore") is not DNS-resolvable from inside the Kubernetes pod, Loading @@ -111,6 +120,7 @@ func NewCapifClient(cfg CapifConfig) *CapifClient { config: cfg, enabled: cfg.Enabled, publishedAPIs: make(map[string]string), readyCh: make(chan struct{}), } c.registerHTTPClient = &http.Client{ Timeout: 30 * time.Second, Loading Loading @@ -172,6 +182,7 @@ func (c *CapifClient) Init() error { c.mu.Lock() c.ready = true c.mu.Unlock() c.closeReadyOnce.Do(func() { close(c.readyCh) }) log.Info("CAPIF client initialized successfully") return nil } Loading @@ -186,6 +197,18 @@ func (c *CapifClient) IsReady() bool { return c.enabled && c.ready } // WaitUntilReady blocks until the CAPIF client finishes initialization or timeout elapses. // Returns true if ready, false if timeout elapsed first. // Goroutines spawned before Init() completes will block here rather than being silently dropped. func (c *CapifClient) WaitUntilReady(timeout time.Duration) bool { select { case <-c.readyCh: return true case <-time.After(timeout): return false } } // login performs POST /login with Basic Auth (admin credentials) against the Register server func (c *CapifClient) login() error { url := fmt.Sprintf("https://%s:%s/login", c.config.RegisterHostname, c.config.RegisterPort) Loading Loading @@ -422,11 +445,11 @@ func (c *CapifClient) onboardProvider() error { } // Build TLS mutual auth clients with custom dialer for CAPIF hostname resolution c.apfTLSClient, err = buildMutualTLSClient(c.apfCert, c.apfKey, c.caRoot, c.capifDialContext) c.apfTLSClient, err = buildMutualTLSClient(c.apfCert, c.apfKey, c.capifDialContext) if err != nil { return fmt.Errorf("failed to build APF TLS client: %w", err) } c.amfTLSClient, err = buildMutualTLSClient(c.amfCert, c.amfKey, c.caRoot, c.capifDialContext) c.amfTLSClient, err = buildMutualTLSClient(c.amfCert, c.amfKey, c.capifDialContext) if err != nil { return fmt.Errorf("failed to build AMF TLS client: %w", err) } Loading @@ -435,8 +458,8 @@ func (c *CapifClient) onboardProvider() error { return nil } // PublishServiceAPI publishes a service API to CAPIF. Returns the CAPIF apiId. func (c *CapifClient) PublishServiceAPI(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { // doPublish performs a single publish attempt using a snapshot of the current session state. func (c *CapifClient) doPublish(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { c.mu.RLock() if !c.ready { c.mu.RUnlock() Loading @@ -445,6 +468,7 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource apfId := c.apfId aefId := c.aefId publishUrl := c.ccfPublishUrl apfTLSClient := c.apfTLSClient c.mu.RUnlock() // Build the publish URL, replacing <apfId> placeholder if present Loading Loading @@ -511,7 +535,7 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource } req.Header.Set("Content-Type", "application/json") resp, err := c.apfTLSClient.Do(req) resp, err := apfTLSClient.Do(req) if err != nil { return "", fmt.Errorf("publish request failed: %w", err) } Loading @@ -522,6 +546,9 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource return "", err } if resp.StatusCode == http.StatusUnauthorized { return "", errSessionExpired } if resp.StatusCode != http.StatusCreated { return "", fmt.Errorf("publish returned %d: %s", resp.StatusCode, string(respBody)) } Loading @@ -544,6 +571,32 @@ func (c *CapifClient) PublishServiceAPI(apiName string, version string, resource return apiId, nil } // PublishServiceAPI publishes a service API to CAPIF. Returns the CAPIF apiId. // Only a 401 Unauthorized response triggers re-init (session expiry). // 403 duplicate-name conflicts and transient network errors are returned as-is without re-init. func (c *CapifClient) PublishServiceAPI(apiName string, version string, resourceURI string, resourceName string, description string, interfaceAddr string, interfacePort int) (string, error) { apiId, err := c.doPublish(apiName, version, resourceURI, resourceName, description, interfaceAddr, interfacePort) if err == nil { return apiId, nil } // Only re-init on confirmed session expiry (401). For anything else (network errors, // 403 duplicate name, 404, etc.) just surface the error — no re-init. if !errors.Is(err, errSessionExpired) { return "", err } // Session expired — re-establish and retry once log.Warn("CAPIF session expired, attempting re-init") c.mu.Lock() c.ready = false c.mu.Unlock() if reInitErr := c.reInit(); reInitErr != nil { return "", fmt.Errorf("CAPIF re-init failed: %v (original: %v)", reInitErr, err) } return c.doPublish(apiName, version, resourceURI, resourceName, description, interfaceAddr, interfacePort) } // UnpublishServiceAPI removes a published service API from CAPIF func (c *CapifClient) UnpublishServiceAPI(apiId string) error { c.mu.RLock() Loading Loading @@ -571,6 +624,11 @@ func (c *CapifClient) UnpublishServiceAPI(apiId string) error { } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { // Already removed from CAPIF (e.g. pod restarted, provider re-onboarded) — not an error log.Warn("CAPIF unpublish: apiId=", apiId, " not found on server, already removed") return nil } if resp.StatusCode != http.StatusNoContent { body, _ := ioutil.ReadAll(resp.Body) return fmt.Errorf("unpublish returned %d: %s", resp.StatusCode, string(body)) Loading @@ -580,6 +638,69 @@ func (c *CapifClient) UnpublishServiceAPI(apiId string) error { return nil } // reInit re-establishes the CAPIF session after cert or token expiry. // It is safe to call concurrently: only one goroutine performs the work at a time; // any goroutine that arrives after the session is already recovered returns immediately. func (c *CapifClient) reInit() error { c.reInitMu.Lock() defer c.reInitMu.Unlock() // If another goroutine already recovered the session while we waited, skip re-init c.mu.RLock() alreadyReady := c.ready c.mu.RUnlock() if alreadyReady { return nil } log.Info("CAPIF re-init starting...") // Best-effort offboard the stale provider registration if c.amfTLSClient != nil && c.apiProvDomId != "" { offboardURL := fmt.Sprintf("https://%s:%s/%s/%s", c.config.CapifHostname, c.config.CapifPort, c.ccfApiOnboardingUrl, c.apiProvDomId) if req, err := http.NewRequest("DELETE", offboardURL, nil); err == nil { req.Header.Set("Content-Type", "application/json") if resp, err := c.amfTLSClient.Do(req); err != nil { log.Warn("CAPIF re-init offboard failed (ignored): ", err.Error()) } else { resp.Body.Close() } } } // Smart re-auth: try getAuth with the existing user to avoid creating a new account authOk := false if c.userName != "" { if err := c.getAuth(); err == nil { authOk = true } } if !authOk { if err := c.login(); err != nil { return fmt.Errorf("re-init login failed: %w", err) } if err := c.createUser(); err != nil { return fmt.Errorf("re-init createUser failed: %w", err) } if err := c.getAuth(); err != nil { return fmt.Errorf("re-init getAuth failed: %w", err) } } if err := c.onboardProvider(); err != nil { return fmt.Errorf("re-init onboardProvider failed: %w", err) } c.mu.Lock() c.ready = true c.mu.Unlock() // No-op if readyCh was already closed during initial Init() c.closeReadyOnce.Do(func() { close(c.readyCh) }) log.Info("CAPIF re-init successful") return nil } // Cleanup offboards the provider, deletes the user, and cleans up resources func (c *CapifClient) Cleanup() { if !c.enabled { Loading @@ -596,6 +717,22 @@ func (c *CapifClient) Cleanup() { log.Info("CAPIF client cleanup starting...") // Unpublish all published services before offboarding the provider. // This prevents 403 "Already registered" errors on the next startup. c.mu.RLock() publishedCopy := make(map[string]string, len(c.publishedAPIs)) for k, v := range c.publishedAPIs { publishedCopy[k] = v } c.mu.RUnlock() for apiName, apiId := range publishedCopy { if err := c.UnpublishServiceAPI(apiId); err != nil { log.Warn("CAPIF cleanup: unpublish failed for ", apiName, ": ", err.Error()) } else { log.Info("CAPIF cleanup: unpublished ", apiName) } } // Offboard provider using AMF cert (matching notebook convention) if c.amfTLSClient != nil && c.apiProvDomId != "" { offboardURL := fmt.Sprintf("https://%s:%s/%s/%s", Loading
go-apps/meep-app-enablement/server/capif-client/certs.go +6 −9 Original line number Diff line number Diff line Loading @@ -64,7 +64,7 @@ func generateCSR(cn string, org string, country string) ([]byte, *rsa.PrivateKey // certPEM is the signed certificate from CAPIF, privateKey is the corresponding RSA key, // caRootPEM is the CA root certificate from getAuth, and dialContext is a custom dialer // for CAPIF hostname resolution. func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, caRootPEM string, dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) (*http.Client, error) { func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) (*http.Client, error) { // Encode private key to PEM keyPEM := pem.EncodeToMemory(&pem.Block{ Type: "RSA PRIVATE KEY", Loading @@ -77,15 +77,12 @@ func buildMutualTLSClient(certPEM string, privateKey *rsa.PrivateKey, caRootPEM return nil, errors.New("failed to parse client certificate: " + err.Error()) } // Parse CA root caCertPool := x509.NewCertPool() if !caCertPool.AppendCertsFromPEM([]byte(caRootPEM)) { return nil, errors.New("failed to parse CA root certificate") } // The CAPIF server may use a self-signed cert or a cert from any CA. // Matching the Python notebook (verify=False), we skip server cert verification // while still presenting our client certificate for mutual TLS authentication. tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: true, //nolint:gosec // intentional: matches notebook verify=False } transport := &http.Transport{ Loading
go-apps/meep-app-enablement/server/service-mgmt/service-mgmt.go +14 −1 Original line number Diff line number Diff line Loading @@ -368,8 +368,21 @@ func appServicesPOST(w http.ResponseWriter, r *http.Request) { } // CAPIF integration — publish service asynchronously if capifClient != nil && capifClient.IsReady() { if capifClient != nil { go func(sInfo *ServiceInfo, appId string) { if !capifClient.WaitUntilReady(15 * time.Minute) { log.Warn("CAPIF publish skipped: client did not become ready in time for service ", sInfo.SerName) return } // If a stale CAPIF mapping exists in Redis (e.g. from a previous pod lifecycle), // unpublish it first to avoid 403 "Already registered service with same api name". if oldApiId := getCapifMapping(sInfo.SerInstanceId); oldApiId != "" { log.Info("CAPIF: removing stale mapping for ", sInfo.SerName, " (apiId=", oldApiId, ")") if err := capifClient.UnpublishServiceAPI(oldApiId); err != nil { log.Warn("CAPIF stale unpublish failed (ignored): ", err.Error()) } deleteCapifMapping(sInfo.SerInstanceId) } host := hostUrl.Hostname() port := 443 if hostUrl.Port() != "" { Loading