Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from cmd import Cmd
from capif_ops.provider_previous_register import PreviousRegister
from capif_ops.provider_register_capif import RegisterProvider
from capif_ops.provider_aef_publish_service import PublishService
from capif_ops.provider_delete import RemoveProvider
from capif_ops.provider_get_auth import PreviousAuth
from capif_ops.provider_aef_remove_service import RemoveService
from capif_ops.provider_remove_user import RemoveUser
from capif_ops.provider_get_publish_service import GetPublishService
from capif_ops.provider_register_events import AddEvents
from capif_ops.provider_admin_login import LoginAdmin
from capif_ops.provider_refresh_admin_token import refreshAdmin
import shlex
import subprocess
from art import *
from termcolor import colored
prev_register = PreviousRegister()
regiter_capif = RegisterProvider()
publish_service = PublishService()
remove_provider = RemoveProvider()
remove_service = RemoveService()
provider_auth = PreviousAuth()
remove_user = RemoveUser()
get_services = GetPublishService()
register_events = AddEvents()
admin_login = LoginAdmin()
refresh_admin = refreshAdmin()
class CAPIFProvider(Cmd):
def __init__(self):
Cmd.__init__(self)
self.prompt = "> "
self.intro = tprint("Welcome to Provider Console")
def emptyline(self):
"""Do nothing on empty input line"""
pass
# def preloop(self):
# #state = prev_register.execute_previous_register_provider("")
# self.previous_register_state = state
def precmd(self, line):
args = shlex.split(line)
if len(args) >= 1 and args[0] in ["goodbye"]:
print("The first argument is username")
return ""
elif len(args) >= 1 and args[0] not in ["->", "wall", "follows", "exit", "help"]:
pass
return line
def do_login_admin(self, input):
'Login with admin credentials to obtain the admin tokens'
args = input.split()
if len(args) < 2:
print(colored("\nLogin admin needs the username and password of the administrator.\n", "red"))
else:
arg1 = args[0]
arg2 = args[1]
arg3 = args[2] if len(args) > 2 else None
admin_login.execute_login_admin(arg1, arg2, arg3)
def do_refresh_admin(self, input):
'Refresh the admin access token'
refresh_admin.execute_refresh_admin(input)
def do_register_user(self, input):
'Register an user from the registry service'
prev_register.execute_previous_register_provider(input)
def do_register_provider(self, input):
'Register a provider to CAPIF instance'
args = input.split()
if len(args) < 1:
print(colored("\nRegister provider needs the name of the provider.\n", "red"))
else:
arg1 = args[0]
arg2 = args[1] if len(args) > 1 else None
regiter_capif.execute_register_provider(arg1, arg2)
def do_register_events(self, input):
'Register events for provider'
register_events.execute_add_events(input)
def do_publish_service(self, input):
'Publish Service in CAPIF'
args = input.split()
if len(args) < 1:
print(colored("\nPublish service needs a json.\n", "red"))
else:
arg1 = args[0]
arg2 = args[1] if len(args) > 1 else None
publish_service.execute_publish(arg1, arg2)
def do_provider_get_auth(self, input):
'Get jwt token to register provider in CAPIF'
provider_auth.execute_get_auth(input)
def do_remove_service(self, input):
'Remove published service in CAPIF'
args = input.split()
if len(args) < 1:
print(colored("\nRemove service needs an API name.\n", "red"))
else:
arg1 = args[0]
arg2 = args[1] if len(args) > 1 else None
remove_service.execute_remove(arg1, arg2)
def do_remove_provider(self, input):
'Remove provider registered from CAPIF'
remove_provider.execute_remove_provider(input)
def do_get_services(self, input):
'Returns the APIs published by the APF'
get_services.execute_get_publish(input)
def do_remove_user(self, input):
'Delete a user from the registry service'
remove_user.execute_remove_user(input)
def do_exit(self, input):
'Exit program'
print('\nExiting...')
return True
if __name__ == '__main__':
try:
CAPIFProvider().cmdloop()
except KeyboardInterrupt:
print('\nExiting...')