Commit 2a7eb545 authored by Sebastien Merle's avatar Sebastien Merle
Browse files

Fix for topology API changes, add test script and update demo documentation.

parent 4290e982
Loading
Loading
Loading
Loading
+10 −1
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@

% API functions
-export([start_link/0]).
-export([is_ready/0]).

% Behaviour gen_statem functions
-export([init/1]).
@@ -39,7 +40,13 @@
%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

start_link() ->
    gen_statem:start_link(?MODULE, [], []).
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).

is_ready() ->
    case whereis(?MODULE) of
        undefined -> false;
        _ -> gen_statem:call(?MODULE, is_ready)
    end.


%%% BEHAVIOUR gen_statem FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -129,6 +136,8 @@ handle_event(info, {'DOWN', Ref, process, Pid, Reason}, ready,
    ?LOG_ERROR("Context subscription error: ~p", [Info]),
    {next_state, subscribe, Data2};
%-- ANY STATE ------------------------------------------------------------------
handle_event({call, _From}, is_ready, State, _Data) ->
    {keep_state_and_data, [{reply, State =:= ready}]};
handle_event(info, Msg, StateName, _Data) ->
    ?LOG_WARNING("Unexpected context message in state ~w: ~p", [StateName, Msg]),
    keep_state_and_data.
+24 −13
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@
%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

start_link() ->
    gen_statem:start_link(?MODULE, [], []).
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).

context_ready(Context) ->
    gen_statem:cast(?MODULE, {context_ready, Context}).
@@ -55,24 +55,27 @@ topology_event(Event) ->
    gen_statem:cast(?MODULE, {topology_event, Event}).

request_lsp(ServiceMap) ->
    gen_statem:cast(?MODULE, {request_lsp, ServiceMap}).
    gen_statem:call(?MODULE, {request_lsp, ServiceMap}).

delete_lsp(ServiceId) ->
    gen_statem:cast(?MODULE, {delete_lsp, ServiceId}).
    gen_statem:call(?MODULE, {delete_lsp, ServiceId}).


%%% BEHAVIOUR gen_statem FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

init([]) ->
    ?LOG_INFO("Starting server...", []),
    {ok, wait_context, #data{}}.
    case tfte_context:is_ready() of
        false -> {ok, wait_context, #data{}};
        true -> {ok, ready, #data{}}
    end.

callback_mode() -> [handle_event_function, state_enter].

%-- WAIT_CONTEXT STATE ---------------------------------------------------------
handle_event(enter, _, wait_context, _Data) ->
    keep_state_and_data;
handle_event(cast, {context_ready, _Context}, wait_contex, Data) ->
handle_event(cast, {context_ready, _Context}, wait_context, Data) ->
    ?LOG_DEBUG("Teraflow context initialized: ~p", [_Context]),
    tfte_topology:context_updated(),
    {next_state, ready, Data};
@@ -92,20 +95,20 @@ handle_event(cast, {topology_ready, _Topology}, ready, _Data) ->
handle_event(cast, {topology_event, _Event}, ready, _Data) ->
    ?LOG_DEBUG("Teraflow topology event: ~p", [_Event]),
    keep_state_and_data;
handle_event({call, _From}, {request_lsp, ServiceMap}, ready, Data) ->
handle_event({call, From}, {request_lsp, ServiceMap}, ready, Data) ->
    #{service_id := ServiceId} = ServiceMap,
    ?LOG_DEBUG("Teraflow service ~s requested its LSPs",
               [format_service_id(ServiceId)]),
    {Result, Data2} = do_request_lsp(Data, ServiceMap),
    {keep_state, Data2, [{reply, Result}]};
handle_event(cast, {delete_lsp, ServiceId}, ready, Data) ->
    {keep_state, Data2, [{reply, From, Result}]};
handle_event({call, From}, {delete_lsp, ServiceId}, ready, Data) ->
    ?LOG_DEBUG("Teraflow service ~s delete its LSPs",
              [format_service_id(ServiceId)]),
    {Result, Data2} = do_delete_lsp(Data, ServiceId),
    {keep_state, Data2, [{reply, Result}]};
    {keep_state, Data2, [{reply, From, Result}]};
%-- ANY STATE ------------------------------------------------------------------
handle_event(EventType, EventContent, State, Data) ->
    ?LOG_WARNING(Data, "Unexpected ~w event in state ~w: ~w",
handle_event(EventType, EventContent, State, _Data) ->
    ?LOG_WARNING("Unexpected tfte_server ~w event in state ~w: ~w",
                 [EventType, State, EventContent]),
    keep_state_and_data.

@@ -125,12 +128,14 @@ format_service_id(#{context_id := #{context_uuid := #{uuid := ContextName}},

do_request_lsp(#data{services = Services} = Data,
               #{service_type := 'SERVICETYPE_TE'} = ServiceMap) ->
    try

    #{service_config := Config,
      service_endpoint_ids := Endpoints,
      service_id := ServiceId} = ServiceMap,
    #{binding_label := BindingLabel1, symbolic_name := SymbolicName1}
    #{<<"binding_label">> := BindingLabel1, <<"symbolic_name">> := SymbolicName1}
        = tfte_util:custom_config(Config, <<"/lsp-fw">>),
    #{binding_label := BindingLabel2, symbolic_name := SymbolicName2}
    #{<<"binding_label">> := BindingLabel2, <<"symbolic_name">> := SymbolicName2}
        = tfte_util:custom_config(Config, <<"/lsp-bw">>),
    [#{device_id := #{device_uuid := #{uuid := Id1}}},
     #{device_id := #{device_uuid := #{uuid := Id2}}}] = Endpoints,
@@ -152,6 +157,12 @@ do_request_lsp(#data{services = Services} = Data,
                    Data2 = Data#data{services = Services2},
                    {{ok, 'SERVICESTATUS_ACTIVE'}, Data2}
            end
    end

    catch T:E:S ->
        ?LOG_ERROR("Error while requesintg LSP: ~p:~p", [T, E]),
        ?LOG_ERROR("Stacktrace: ~p", [S]),
        erlang:raise(T, E, S)
    end.

do_delete_lsp(Data, ServiceId) ->
+5 −5
Original line number Diff line number Diff line
@@ -23,13 +23,13 @@ request_lsp(Ctx, Service) ->
    ?LOG_INFO("Requesting LSP: ~p", [Service]),
    try tfte_server:request_lsp(Service) of
        {ok, Status} ->
            {ok, Status, Ctx};
            {ok, #{service_status => Status}, Ctx};
        {error, Reason} ->
            ?LOG_INFO("Error while requesting LSP: ~p", [Reason]),
            {ok, 'SERVICESTATUS_UNDEFINED', Ctx}
            {ok, #{service_status => 'SERVICESTATUS_UNDEFINED'}, Ctx}
    catch E:R:S ->
        ?LOG_ERROR("Error while requesting LSP: ~p:~p ~p", [E, R, S]),
        {ok, 'SERVICESTATUS_UNDEFINED', Ctx}
        {ok, #{service_status => 'SERVICESTATUS_UNDEFINED'}, Ctx}
    end.

update_lsp(_Ctx, _ServiceId) ->
@@ -43,8 +43,8 @@ delete_lsp(Ctx, ServiceId) ->
            {ok, Status, Ctx};
        {error, Reason} ->
            ?LOG_INFO("Error while deleting LSP: ~p", [Reason]),
            {ok, 'SERVICESTATUS_UNDEFINED', Ctx}
            {ok, #{service_status => 'SERVICESTATUS_UNDEFINED'}, Ctx}
    catch E:R:S ->
        ?LOG_ERROR("Error while deleting LSP: ~p:~p ~p", [E, R, S]),
        {ok, 'SERVICESTATUS_UNDEFINED', Ctx}
        {ok, #{service_status => 'SERVICESTATUS_UNDEFINED'}, Ctx}
    end.
+52 −49
Original line number Diff line number Diff line
@@ -29,7 +29,8 @@
    sub :: term() | undefined,
    obj :: map() | undefined,
    devices = #{} :: map(),
    links = #{} :: map()
    links = #{} :: map(),
    names = #{} :: map()
}).


@@ -42,7 +43,7 @@
%%% API FUNCTIONS %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

start_link() ->
    gen_statem:start_link(?MODULE, [], []).
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).

context_updated() ->
    gen_statem:cast(?MODULE, context_updated).
@@ -69,9 +70,15 @@ handle_event(state_timeout, do_retrieve, retrieve, #data{uuid = UUID} = Data) ->
    case get_object(UUID) of
        error ->
            {keep_state_and_data, [{state_timeout, ?RETRIEVE_RETRY_TIMEOUT, do_retrieve}]};
        {ok, Topology} ->
        {ok, #{device_ids := Devices, link_ids := Links } = Topology} ->
            case {length(Devices), length(Links)} of
                {D, L} when D =:= 0; L =:= 0 ->
                    ?LOG_WARNING("Got topology, but there is missing devices or links", []),
                    {keep_state_and_data, [{state_timeout, 1000, do_retrieve}]};
                _ ->
                    ?LOG_DEBUG("Got topology: ~p", [Topology]),
                    {next_state, subscribe, Data#data{obj = Topology}}
            end
    end;
handle_event(cast, context_updated, retrieve, _Data) ->
    {keep_state_and_data, [{state_timeout, 0, do_retrieve}]};
@@ -197,79 +204,74 @@ post_topology_event({link_deleted, Id}) ->
    epce_ted:link_deleted(Id).

update_devices(#data{devices = OldDevices} = Data, DeviceIds, Events) ->
    {NewDevices, Events2} = update_devices(OldDevices, #{}, DeviceIds, Events),
    {Data#data{devices = NewDevices}, Events2}.
    update_devices(Data, OldDevices, #{}, DeviceIds, Events).

update_devices(OldDevices, NewDevices, [], Events) ->
    Events2 = [{device_deleted, post_process_device_id(I)}
update_devices(Data, OldDevices, NewDevices, [], Events) ->
    #data{names = Names} = Data,
    Events2 = [{device_deleted, maps:get(I, Names, undefined)}
               || I <- maps:keys(OldDevices)] ++ Events,
    {NewDevices, Events2};
update_devices(OldDevices, NewDevices, [Id | Rest], Events) ->
    case get_device(Id) of
        error -> throw({device_retrieval_error, Id});
    {Data#data{devices = NewDevices}, Events2};
update_devices(Data, OldDevices, NewDevices, [GivenId | Rest], Events) ->
    case get_device(GivenId) of
        error -> throw({device_retrieval_error, GivenId});
        {ok, Device} ->
            Id2 = post_process_device_id(Id),
            Device2 = post_process_device(Device),
            Device2 = #{id := Id, real_id := RealId} = post_process_device(Device),
            #data{names = Names} = Data,
            Data2 = Data#data{names = Names#{RealId => Id}},
            NewDevices2 = NewDevices#{Id => Device},
            case maps:take(Id, OldDevices) of
                error ->
                    % New device
                    Events2 = [{device_added, Id2, Device2} | Events],
                    update_devices(OldDevices, NewDevices2, Rest, Events2);
                    Events2 = [{device_added, Id, Device2} | Events],
                    update_devices(Data2, OldDevices, NewDevices2, Rest, Events2);
                {Device, OldDevices2} ->
                    % Device did not change
                    update_devices(OldDevices2, NewDevices2, Rest, Events);
                    update_devices(Data2, OldDevices2, NewDevices2, Rest, Events);
                {_OldDevice, OldDevices2} ->
                    % Device changed
                    Events2 = [{device_updated, Id2, Device2} | Events],
                    update_devices(OldDevices2, NewDevices2, Rest, Events2)
                    Events2 = [{device_updated, Id, Device2} | Events],
                    update_devices(Data2, OldDevices2, NewDevices2, Rest, Events2)
            end
    end.

update_links(#data{links = OldLinks} = Data, LinksIds, Events) ->
    {NewLinks, Events2} = update_links(OldLinks, #{}, LinksIds, Events),
    {Data#data{links = NewLinks}, Events2}.
    update_links(Data, OldLinks, #{}, LinksIds, Events).

update_links(OldLinks, NewLinks, [], Events) ->
update_links(Data, OldLinks, NewLinks, [], Events) ->
    Events2 = [{link_deleted, post_process_link_id(I)}
               || I <- maps:keys(OldLinks)] ++ Events,
    {NewLinks, Events2};
update_links(OldLinks, NewLinks, [Id | Rest], Events) ->
    {Data#data{links = NewLinks}, Events2};
update_links(Data, OldLinks, NewLinks, [Id | Rest], Events) ->
    case get_link(Id) of
        error -> throw({link_retrieval_error, Id});
        {ok, Link} ->
            Id2 = post_process_link_id(Id),
            Link2 = post_process_link(Link),
            Link2 = post_process_link(Data, Link),
            NewLinks2 = NewLinks#{Id => Link},
            case maps:take(Id, OldLinks) of
                error ->
                    % New Link
                    Events2 = [{link_added, Id2, Link2} | Events],
                    update_links(OldLinks, NewLinks2, Rest, Events2);
                    update_links(Data, OldLinks, NewLinks2, Rest, Events2);
                {Link, OldLinks2} ->
                    % Link did not change
                    update_links(OldLinks2, NewLinks2, Rest, Events);
                    update_links(Data, OldLinks2, NewLinks2, Rest, Events);
                {_OldLink, OldLinks2} ->
                    % Link changed
                    Events2 = [{link_updated, Id2, Link2} | Events],
                    update_links(OldLinks2, NewLinks2, Rest, Events2)
                    update_links(Data, OldLinks2, NewLinks2, Rest, Events2)
            end
    end.

post_process_device_id(#{device_uuid := #{uuid := Name}}) ->
    Name.

post_process_device(Device) ->
    #{id => device_id(Device),
post_process_device(#{device_id := Id, name :=  Name} = Device) ->
    #{id => Name,
      real_id => Id,
      type => device_type(Device),
      pcc_address => device_pcc_address(Device),
      mpls_label => device_mpls_label(Device),
      status => device_status(Device),
      endpoints => device_endpoints(Device)}.

device_id(#{device_id := Id}) ->
    post_process_device_id(Id).

device_type(#{device_type := Type}) ->
    Type.

@@ -312,33 +314,34 @@ device_endpoints(#{device_endpoints := Endpoints}, Acc) ->
    device_endpoints(Endpoints, Acc);
device_endpoints([], Acc) ->
    lists:reverse(Acc);
device_endpoints([#{endpoint_id := #{endpoint_uuid := #{uuid := Name}}} | Rest], Acc) ->
device_endpoints([#{name := Name} | Rest], Acc) ->
    device_endpoints(Rest, [Name | Acc]).

post_process_link_id(#{link_uuid := #{uuid := Name}}) ->
    Name.

post_process_link(Link) ->
post_process_link(Data, Link) ->
    #{id => link_id(Link),
      endpoints => link_endpoints(Link)}.
      endpoints => link_endpoints(Data, Link)}.

link_id(#{link_id := Id}) ->
    post_process_link_id(Id).

link_endpoints(Link) ->
    link_endpoints(Link, []).
link_endpoints(Data, Link) ->
    link_endpoints(Data, Link, []).

link_endpoints(#{link_endpoint_ids := Endpoints}, Acc) ->
    link_endpoints(Endpoints, Acc);
link_endpoints([], Acc) ->
link_endpoints(Data, #{link_endpoint_ids := Endpoints}, Acc) ->
    link_endpoints(Data, Endpoints, Acc);
link_endpoints(_Data, [], Acc) ->
    lists:reverse(Acc);
link_endpoints([#{device_id := #{device_uuid := #{uuid := DevName}},
link_endpoints(Data, [#{device_id := RealId,
                        endpoint_uuid := #{uuid := EndpointName}} | Rest], Acc) ->
    #data{names = Names} = Data,
    Endpoint = #{
        device => DevName,
        device => maps:get(RealId, Names, undefined),
        endpoint => EndpointName
    },
    link_endpoints(Rest, [Endpoint | Acc]).
    link_endpoints(Data, Rest, [Endpoint | Acc]).


%-- GRPC UNTILITY FUNCTION -----------------------------------------------------
+94 −0
Original line number Diff line number Diff line
# Simple script to test GRPC calls to the TE service.
# First get the TE service IP using:
# > kubectl -n tfs get services
# Change it in this script and run with:
# > PYTHONPATH=./src python test_te_service.py

import json, sys
from common.proto.context_pb2 import ConfigActionEnum, Service, ServiceStatusEnum, ServiceTypeEnum
from common.tools.grpc.Tools import grpc_message_to_json_string
from service.client.TEServiceClient import TEServiceClient

#  {"name": "", 
#     "service_config": {
#        "config_rules": [
#           {
#             "action": "CONFIGACTION_SET",
#              "custom": {
#                 "resource_key": "/lsp-fw",
#                 "resource_value": "{\n\"binding_label\": 1111,\n\"symbolic_name\": \"foo\"\n}"}},
#           {
#              "action": "CONFIGACTION_SET",
#              "custom": {
#                 "resource_key": "/lsp-bw",
#                 "resource_value": "{\n\"binding_label\": 6666,\n\"symbolic_name\": \"bar\"\n}"}}]},
#        "service_constraints": [],
#        "service_endpoint_ids": [
#           {"device_id": {"device_uuid": {"uuid": "RT1"}}, "endpoint_uuid": {"uuid": "eth-src"}},
#           {"device_id": {"device_uuid": {"uuid": "RT6"}}, "endpoint_uuid": {"uuid": "eth-dst"}}],
#        "service_id": {"context_id": {"context_uuid": {"uuid": "admin"}},
#        "service_uuid": {"uuid": "2c025055-bf6c-4250-8560-cf62f2d29e72"}},
#        "service_status": {"service_status": "SERVICESTATUS_PLANNED"},
#        "service_type": "SERVICETYPE_TE"}

service = Service()
service.service_id.context_id.context_uuid.uuid = 'admin'
service.service_id.service_uuid.uuid = 'test-te-service'

service.service_type = ServiceTypeEnum.SERVICETYPE_TE
service.service_status.service_status = ServiceStatusEnum.SERVICESTATUS_PLANNED

# SRC Endpoint:
src_endpoint_id = service.service_endpoint_ids.add()
src_endpoint_id.device_id.device_uuid.uuid = 'RT1'
src_endpoint_id.endpoint_uuid.uuid = 'eth-src'

# DST Endpoint:
dst_endpoint_id = service.service_endpoint_ids.add()
dst_endpoint_id.device_id.device_uuid.uuid = 'RT6'
dst_endpoint_id.endpoint_uuid.uuid = 'eth-dst'

# # Capacity SLA
# sla_capacity = service.service_constraints.add()
# sla_capacity.sla_capacity.capacity_gbps = 10.0

# # Latency SLA
# sla_latency = service.service_constraints.add()
# sla_latency.sla_latency.e2e_latency_ms = 20.0

# Example config rules:
config_rule_1 = service.service_config.config_rules.add()
config_rule_1.action = ConfigActionEnum.CONFIGACTION_SET
config_rule_1.custom.resource_key = '/lsp-fw'
config_rule_1.custom.resource_value = json.dumps({
    'binding_label': 1111, 'symbolic_name': "foo"
})

config_rule_2 = service.service_config.config_rules.add()
config_rule_2.action = ConfigActionEnum.CONFIGACTION_SET
config_rule_2.custom.resource_key = '/lsp-bw'
config_rule_2.custom.resource_value = json.dumps({
    'binding_label': 6666, 'symbolic_name': "bar"
})

def main():
    # Connect:
    te_service_client = TEServiceClient(host='XXX.XXX.XXX.XXX', port=10030)

    # RequestLSP
    print('request:', grpc_message_to_json_string(service))
    service_status = te_service_client.RequestLSP(service)
    print('response:', grpc_message_to_json_string(service_status))

    # DeleteLSP
    #print('request:', grpc_message_to_json_string(service))
    #service_status = te_service_client.DeleteLSP(service)
    #print('response:', grpc_message_to_json_string(service_status))

    # Close:
    te_service_client.close()

    return 0

if __name__ == '__main__':
    sys.exit(main())
Loading