from cmd import Cmd from capif_ops.provider_previous_register import PreviousRegister from capif_ops.provider_register_capif import RegisterProvider from capif_ops.provider_aef_publish_service import PublishService from capif_ops.provider_delete import RemoveProvider from capif_ops.provider_get_auth import PreviousAuth from capif_ops.provider_aef_remove_service import RemoveService from capif_ops.provider_remove_user import RemoveUser from capif_ops.provider_get_publish_service import GetPublishService from capif_ops.provider_register_events import AddEvents from capif_ops.provider_admin_login import LoginAdmin from capif_ops.provider_refresh_admin_token import refreshAdmin import shlex import subprocess from art import * from termcolor import colored prev_register = PreviousRegister() regiter_capif = RegisterProvider() publish_service = PublishService() remove_provider = RemoveProvider() remove_service = RemoveService() provider_auth = PreviousAuth() remove_user = RemoveUser() get_services = GetPublishService() register_events = AddEvents() admin_login = LoginAdmin() refresh_admin = refreshAdmin() class CAPIFProvider(Cmd): def __init__(self): Cmd.__init__(self) self.prompt = "> " self.intro = tprint("Welcome to Provider Console") def emptyline(self): """Do nothing on empty input line""" pass # def preloop(self): # #state = prev_register.execute_previous_register_provider("") # self.previous_register_state = state def precmd(self, line): args = shlex.split(line) if len(args) >= 1 and args[0] in ["goodbye"]: print("The first argument is username") return "" elif len(args) >= 1 and args[0] not in ["->", "wall", "follows", "exit", "help"]: pass return line def do_login_admin(self, input): 'Login with admin credentials to obtain the admin tokens' args = input.split() if len(args) < 2: print(colored("\nLogin admin needs the username and password of the administrator.\n", "red")) else: arg1 = args[0] arg2 = args[1] arg3 = args[2] if len(args) > 2 else None admin_login.execute_login_admin(arg1, arg2, arg3) def do_refresh_admin(self, input): 'Refresh the admin access token' refresh_admin.execute_refresh_admin(input) def do_register_user(self, input): 'Register an user from the registry service' prev_register.execute_previous_register_provider(input) def do_register_provider(self, input): 'Register a provider to CAPIF instance' args = input.split() if len(args) < 1: print(colored("\nRegister provider needs the name of the provider.\n", "red")) else: arg1 = args[0] arg2 = args[1] if len(args) > 1 else None regiter_capif.execute_register_provider(arg1, arg2) def do_register_events(self, input): 'Register events for provider' register_events.execute_add_events(input) def do_publish_service(self, input): 'Publish Service in CAPIF' args = input.split() if len(args) < 1: print(colored("\nPublish service needs a json.\n", "red")) else: arg1 = args[0] arg2 = args[1] if len(args) > 1 else None publish_service.execute_publish(arg1, arg2) def do_provider_get_auth(self, input): 'Get jwt token to register provider in CAPIF' provider_auth.execute_get_auth(input) def do_remove_service(self, input): 'Remove published service in CAPIF' args = input.split() if len(args) < 1: print(colored("\nRemove service needs an API name.\n", "red")) else: arg1 = args[0] arg2 = args[1] if len(args) > 1 else None remove_service.execute_remove(arg1, arg2) def do_remove_provider(self, input): 'Remove provider registered from CAPIF' remove_provider.execute_remove_provider(input) def do_get_services(self, input): 'Returns the APIs published by the APF' get_services.execute_get_publish(input) def do_remove_user(self, input): 'Delete a user from the registry service' remove_user.execute_remove_user(input) def do_exit(self, input): 'Exit program' print('\nExiting...') return True if __name__ == '__main__': try: CAPIFProvider().cmdloop() except KeyboardInterrupt: print('\nExiting...')