Commit c6baf2ff authored by Georgios P. Katsikas's avatar Georgios P. Katsikas
Browse files

feat: functional P4 driver



This commit adds support for P4 device and
resource management by implementing:
 * key P4 entities as Python objects
 * a context class for turning P4 info files into context
 * common functions around P4
 * a manager class which exploits P4 entities and context to manage P4 devices
 * GetConfig() RPC
 * SetConfig() RPC, and
 * DeleteConfig() RPC
 * basic internal and unit tests

Signed-off-by: default avatarGeorgios Katsikas <katsikas.gp@gmail.com>
parent 1c312bc1
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -10,6 +10,9 @@ pytz==2021.3
redis==4.1.2
requests==2.27.1
xmltodict==0.12.0
tabulate
ipaddress
macaddress

# pip's dependency resolver does not take into account installed packages.
# p4runtime does not specify the version of grpcio/protobuf it needs, so it tries to install latest one
+0 −1
Original line number Diff line number Diff line
@@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+607 −0
Original line number Diff line number Diff line
@@ -13,39 +13,117 @@
# limitations under the License.

"""
P4 driver utilities.
P4Runtime client.
"""

import logging
import queue
import sys
import enum
import threading
from functools import wraps
from typing import NamedTuple
import grpc
import google.protobuf.text_format
from google.rpc import code_pb2
from google.rpc import status_pb2, code_pb2

from p4.v1 import p4runtime_pb2
from p4.v1 import p4runtime_pb2_grpc

P4_ATTR_DEV_ID = 'id'
P4_ATTR_DEV_NAME = 'name'
P4_ATTR_DEV_VENDOR = 'vendor'
P4_ATTR_DEV_HW_VER = 'hw_ver'
P4_ATTR_DEV_SW_VER = 'sw_ver'
P4_ATTR_DEV_PIPECONF = 'pipeconf'
STREAM_ATTR_ARBITRATION = "arbitration"
STREAM_ATTR_PACKET = "packet"
STREAM_ATTR_DIGEST = "digest"
STREAM_ATTR_IDLE_NOT = "idle_timeout_notification"
STREAM_ATTR_UNKNOWN = "unknown"

P4_VAL_DEF_VENDOR = 'Unknown'
P4_VAL_DEF_HW_VER = 'BMv2 simple_switch'
P4_VAL_DEF_SW_VER = 'Stratum'
P4_VAL_DEF_PIPECONF = 'org.onosproject.pipelines.fabric'
LOGGER = logging.getLogger(__name__)

STREAM_ATTR_ARBITRATION = 'arbitration'
STREAM_ATTR_PACKET = 'packet'
STREAM_ATTR_DIGEST = 'digest'
STREAM_ATTR_UNKNOWN = 'unknown'

LOGGER = logging.getLogger(__name__)
class P4RuntimeErrorFormatException(Exception):
    """
    P4Runtime error format exception.
    """


# Used to iterate over the p4.Error messages in a gRPC error Status object
class P4RuntimeErrorIterator:
    """
    P4Runtime error iterator.

    Attributes
    ----------
    grpc_error : object
        gRPC error
    """

    def __init__(self, grpc_error):
        assert grpc_error.code() == grpc.StatusCode.UNKNOWN
        self.grpc_error = grpc_error

        error = None
        # The gRPC Python package does not have a convenient way to access the
        # binary details for the error: they are treated as trailing metadata.
        for meta in self.grpc_error.trailing_metadata():
            if meta[0] == "grpc-status-details-bin":
                error = status_pb2.Status()
                error.ParseFromString(meta[1])
                break
        if error is None:
            raise P4RuntimeErrorFormatException("No binary details field")

        if len(error.details) == 0:
            raise P4RuntimeErrorFormatException(
                "Binary details field has empty Any details repeated field")
        self.errors = error.details
        self.idx = 0

    def __iter__(self):
        return self

    def __next__(self):
        while self.idx < len(self.errors):
            p4_error = p4runtime_pb2.Error()
            one_error_any = self.errors[self.idx]
            if not one_error_any.Unpack(p4_error):
                raise P4RuntimeErrorFormatException(
                    "Cannot convert Any message to p4.Error")
            if p4_error.canonical_code == code_pb2.OK:
                continue
            val = self.idx, p4_error
            self.idx += 1
            return val
        raise StopIteration


class P4RuntimeWriteException(Exception):
    """
    P4Runtime write exception handler.

    Attributes
    ----------
    grpc_error : object
        gRPC error
    """

    def __init__(self, grpc_error):
        assert grpc_error.code() == grpc.StatusCode.UNKNOWN
        super().__init__()
        self.errors = []
        try:
            error_iterator = P4RuntimeErrorIterator(grpc_error)
            for error_tuple in error_iterator:
                self.errors.append(error_tuple)
        except P4RuntimeErrorFormatException as ex:
            raise P4RuntimeException(grpc_error) from ex

    def __str__(self):
        message = "Error(s) during Write:\n"
        for idx, p4_error in self.errors:
            code_name = code_pb2._CODE.values_by_number[
                p4_error.canonical_code].name
            message += f"\t* At index {idx}: {code_name}, " \
                       f"'{p4_error.message}'\n"
        return message


class P4RuntimeException(Exception):
@@ -63,28 +141,117 @@ class P4RuntimeException(Exception):
        self.grpc_error = grpc_error

    def __str__(self):
        return str('P4Runtime RPC error (%s): %s',
                   self.grpc_error.code().name(), self.grpc_error.details())
        message = f"P4Runtime RPC error ({self.grpc_error.code().name}): " \
                  f"{self.grpc_error.details()}"
        return message


def parse_p4runtime_error(fun):
def parse_p4runtime_write_error(func):
    """
    Parse P4Runtime write error.

    :param func: function
    :return: parsed error
    """

    @wraps(func)
    def handle(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except grpc.RpcError as ex:
            if ex.code() != grpc.StatusCode.UNKNOWN:
                raise ex
            raise P4RuntimeWriteException(ex) from None

    return handle


def parse_p4runtime_error(func):
    """
    Parse P4Runtime error.

    :param fun: function
    :param func: function
    :return: parsed error
    """
    @wraps(fun)

    @wraps(func)
    def handle(*args, **kwargs):
        try:
            return fun(*args, **kwargs)
        except grpc.RpcError as rpc_ex:
            raise P4RuntimeException(rpc_ex) from None
        except Exception as ex:
            raise Exception(ex) from None
            return func(*args, **kwargs)
        except grpc.RpcError as ex:
            raise P4RuntimeException(ex) from None

    return handle


class SSLOptions(NamedTuple):
    """
    Tuple of SSL options.
    """
    insecure: bool
    cacert: str = None
    cert: str = None
    key: str = None


def read_pem_file(path):
    """
    Load and read PEM file.

    :param path: path to PEM file
    :return: file descriptor
    """
    try:
        with open(path, "rb") as f_d:
            return f_d.read()
    except (FileNotFoundError, IOError, OSError):
        logging.critical("Cannot read from PEM file '%s'", path)
        sys.exit(1)


@enum.unique
class WriteOperation(enum.Enum):
    """
    Write Operations.
    """
    insert = 1
    update = 2
    delete = 3


def select_operation(mode):
    """
    Select P4 operation based upon the operation mode.

    :param mode: operation mode
    :return: P4 operation protobuf object
    """
    if mode == WriteOperation.insert:
        return p4runtime_pb2.Update.INSERT
    if mode == WriteOperation.update:
        return p4runtime_pb2.Update.UPDATE
    if mode == WriteOperation.delete:
        return p4runtime_pb2.Update.DELETE
    return None


def select_entity_type(entity, update):
    """
    Select P4 entity type for an update.

    :param entity: P4 entity object
    :param update: update operation
    :return: the correct update entity or None
    """
    if isinstance(entity, p4runtime_pb2.TableEntry):
        return update.entity.table_entry
    if isinstance(entity, p4runtime_pb2.ActionProfileGroup):
        return update.entity.action_profile_group
    if isinstance(entity, p4runtime_pb2.ActionProfileMember):
        return update.entity.action_profile_member
    return None


class P4RuntimeClient:
    """
    P4Runtime client.
@@ -99,24 +266,53 @@ class P4RuntimeClient:
        Mastership election ID
    role_name : str
        Role name (optional)
    ssl_options: tuple
        SSL options" named tuple (optional)
    """
    def __init__(self, device_id, grpc_address, election_id, role_name=None):

    def __init__(self, device_id, grpc_address,
                 election_id, role_name=None, ssl_options=None):
        self.device_id = device_id
        self.election_id = election_id
        self.role_name = role_name
        if ssl_options is None:
            self.ssl_options = SSLOptions(True)
        else:
            self.ssl_options = ssl_options
        LOGGER.debug(
            "Connecting to device %d at %s", device_id, grpc_address)

        if self.ssl_options.insecure:
            logging.debug("Using insecure channel")
            self.channel = grpc.insecure_channel(grpc_address)
        else:
            # root certificates are retrieved from a default location
            # chosen by gRPC runtime unless the user provides
            # custom certificates.
            root_certificates = None
            if self.ssl_options.cacert is not None:
                root_certificates = read_pem_file(self.ssl_options.cacert)
            certificate_chain = None
            if self.ssl_options.cert is not None:
                certificate_chain = read_pem_file(self.ssl_options.cert)
            private_key = None
            if self.ssl_options.key is not None:
                private_key = read_pem_file(self.ssl_options.key)
            creds = grpc.ssl_channel_credentials(root_certificates, private_key,
                                                 certificate_chain)
            self.channel = grpc.secure_channel(grpc_address, creds)
        self.stream_in_q = None
        self.stream_out_q = None
        self.stream = None
        self.stream_recv_thread = None
        LOGGER.debug(
            'Connecting to device %d at %s', device_id, grpc_address)
        self.channel = grpc.insecure_channel(grpc_address)
        self.stub = p4runtime_pb2_grpc.P4RuntimeStub(self.channel)

        try:
            self.set_up_stream()
        except P4RuntimeException:
            LOGGER.critical('Failed to connect to P4Runtime server')
            LOGGER.critical("Failed to connect to P4Runtime server")
            sys.exit(1)
        LOGGER.info("P4Runtime client is successfully invoked")

    def set_up_stream(self):
        """
@@ -128,35 +324,38 @@ class P4RuntimeClient:
            STREAM_ATTR_ARBITRATION: queue.Queue(),
            STREAM_ATTR_PACKET: queue.Queue(),
            STREAM_ATTR_DIGEST: queue.Queue(),
            STREAM_ATTR_IDLE_NOT: queue.Queue(),
            STREAM_ATTR_UNKNOWN: queue.Queue(),
        }

        def stream_req_iterator():
            while True:
                st_p = self.stream_out_q.get()
                if st_p is None:
                stream_p = self.stream_out_q.get()
                if stream_p is None:
                    break
                yield st_p
                yield stream_p

        def stream_recv_wrapper(stream):
            @parse_p4runtime_error
            def stream_recv():
                for st_p in stream:
                    if st_p.HasField(STREAM_ATTR_ARBITRATION):
                        self.stream_in_q[STREAM_ATTR_ARBITRATION].put(st_p)
                    elif st_p.HasField(STREAM_ATTR_PACKET):
                        self.stream_in_q[STREAM_ATTR_PACKET].put(st_p)
                    elif st_p.HasField(STREAM_ATTR_DIGEST):
                        self.stream_in_q[STREAM_ATTR_DIGEST].put(st_p)
                for stream_p in stream:
                    if stream_p.HasField("arbitration"):
                        self.stream_in_q["arbitration"].put(stream_p)
                    elif stream_p.HasField("packet"):
                        self.stream_in_q["packet"].put(stream_p)
                    elif stream_p.HasField("digest"):
                        self.stream_in_q["digest"].put(stream_p)
                    else:
                        self.stream_in_q[STREAM_ATTR_UNKNOWN].put(st_p)
                        self.stream_in_q["unknown"].put(stream_p)

            try:
                stream_recv()
            except P4RuntimeException as ex:
                LOGGER.critical('StreamChannel error, closing stream')
                LOGGER.critical(ex)
                logging.critical("StreamChannel error, closing stream")
                logging.critical(ex)
                for k in self.stream_in_q:
                    self.stream_in_q[k].put(None)

        self.stream = self.stub.StreamChannel(stream_req_iterator())
        self.stream_recv_thread = threading.Thread(
            target=stream_recv_wrapper, args=(self.stream,))
@@ -180,15 +379,14 @@ class P4RuntimeClient:

        rep = self.get_stream_packet(STREAM_ATTR_ARBITRATION, timeout=2)
        if rep is None:
            LOGGER.critical('Failed to establish session with server')
            logging.critical("Failed to establish session with server")
            sys.exit(1)
        is_primary = (rep.arbitration.status.code == code_pb2.OK)
        LOGGER.debug('Session established, client is %s',
                        'primary' if is_primary else 'backup')
        logging.debug("Session established, client is '%s'",
                      "primary" if is_primary else "backup")
        if not is_primary:
            LOGGER.warning(
                'You are not the primary client, '
                'you only have read access to the server')
            print("You are not the primary client,"
                  "you only have read access to the server")

    def get_stream_packet(self, type_, timeout=1):
        """
@@ -199,7 +397,7 @@ class P4RuntimeClient:
        :return: message or None
        """
        if type_ not in self.stream_in_q:
            LOGGER.critical('Unknown stream type %s', type_)
            print("Unknown stream type 's"'', type_)
            return None
        try:
            msg = self.stream_in_q[type_].get(timeout=timeout)
@@ -214,8 +412,7 @@ class P4RuntimeClient:

        :return: P4Info object.
        """

        LOGGER.debug('Retrieving P4Info file')
        logging.debug("Retrieving P4Info file")
        req = p4runtime_pb2.GetForwardingPipelineConfigRequest()
        req.device_id = self.device_id
        req.response_type = \
@@ -232,8 +429,7 @@ class P4RuntimeClient:
        :param bin_path: path to the binary file
        :return:
        """

        LOGGER.debug('Setting forwarding pipeline config')
        logging.debug("Setting forwarding pipeline config")
        req = p4runtime_pb2.SetForwardingPipelineConfigRequest()
        req.device_id = self.device_id
        if self.role_name is not None:
@@ -243,24 +439,23 @@ class P4RuntimeClient:
        election_id.low = self.election_id[1]
        req.action = \
            p4runtime_pb2.SetForwardingPipelineConfigRequest.VERIFY_AND_COMMIT
        with open(p4info_path, 'r', encoding='utf8') as f_1:
            with open(bin_path, 'rb', encoding='utf8') as f_2:
        with open(p4info_path, "r", encoding="utf-8") as f_info:
            with open(bin_path, "rb") as f_bin:
                try:
                    google.protobuf.text_format.Merge(
                        f_1.read(), req.config.p4info)
                        f_info.read(), req.config.p4info)
                except google.protobuf.text_format.ParseError:
                    LOGGER.error('Error when parsing P4Info')
                    logging.error("Error when parsing P4Info")
                    raise
                req.config.p4_device_config = f_2.read()
                req.config.p4_device_config = f_bin.read()
        return self.stub.SetForwardingPipelineConfig(req)

    def tear_down(self):
        """
        Tear connection with the gRPC server down.
        """

        if self.stream_out_q:
            LOGGER.debug('Cleaning up stream')
            logging.debug("Cleaning up stream")
            self.stream_out_q.put(None)
        if self.stream_in_q:
            for k in self.stream_in_q:
@@ -268,4 +463,145 @@ class P4RuntimeClient:
        if self.stream_recv_thread:
            self.stream_recv_thread.join()
        self.channel.close()
        # avoid a race condition if channel deleted when process terminates
        del self.channel

    @parse_p4runtime_write_error
    def __write(self, entity, mode=WriteOperation.insert):
        """
        Perform a write operation.

        :param entity: P4 entity to write
        :param mode: operation mode (defaults to insert)
        :return: void
        """
        if isinstance(entity, (list, tuple)):
            for ent in entity:
                self.__write(ent)
            return
        req = self.__get_new_write_request()
        update = req.updates.add()
        update.type = select_operation(mode)
        msg_entity = select_entity_type(entity, update)
        if not msg_entity:
            msg = f"{mode.name} operation for entity {entity.__name__}" \
                  f"not supported"
            raise P4RuntimeWriteException(msg)
        msg_entity.CopyFrom(entity)
        self.__simple_write(req)

    def __get_new_write_request(self):
        """
        Create a new write request message.

        :return: write request message
        """
        req = p4runtime_pb2.WriteRequest()
        req.device_id = self.device_id
        if self.role_name is not None:
            req.role = self.role_name
        election_id = req.election_id
        election_id.high = self.election_id[0]
        election_id.low = self.election_id[1]
        return req

    @parse_p4runtime_write_error
    def __simple_write(self, req):
        """
        Send a write operation into the wire.

        :param req: write operation request
        :return: void
        """
        try:
            return self.stub.Write(req)
        except grpc.RpcError as ex:
            if ex.code() != grpc.StatusCode.UNKNOWN:
                raise ex
            raise P4RuntimeWriteException(ex) from ex

    @parse_p4runtime_write_error
    def insert(self, entity):
        """
        Perform an insert write operation.

        :param entity: P4 entity to insert
        :return: void
        """
        return self.__write(entity, WriteOperation.insert)

    @parse_p4runtime_write_error
    def update(self, entity):
        """
        Perform an update write operation.

        :param entity: P4 entity to update
        :return: void
        """
        return self.__write(entity, WriteOperation.update)

    @parse_p4runtime_write_error
    def delete(self, entity):
        """
        Perform a delete write operation.

        :param entity: P4 entity to delete
        :return: void
        """
        return self.__write(entity, WriteOperation.delete)

    @parse_p4runtime_write_error
    def write(self, req):
        """
        Write device operation.

        :param req: write request message
        :return: status
        """
        req.device_id = self.device_id
        if self.role_name is not None:
            req.role = self.role_name
        election_id = req.election_id
        election_id.high = self.election_id[0]
        election_id.low = self.election_id[1]
        return self.__simple_write(req)

    @parse_p4runtime_write_error
    def write_update(self, update):
        """
        Update device operation.

        :param update: update request message
        :return: status
        """
        req = self.__get_new_write_request()
        req.updates.extend([update])
        return self.__simple_write(req)

    # Decorator is useless here: in case of server error,
    # the exception is raised during the iteration (when next() is called).
    @parse_p4runtime_error
    def read_one(self, entity):
        """
        Read device operation.

        :param entity: P4 entity for which the read is issued
        :return: status
        """
        req = p4runtime_pb2.ReadRequest()
        if self.role_name is not None:
            req.role = self.role_name
        req.device_id = self.device_id
        req.entities.extend([entity])
        return self.stub.Read(req)

    @parse_p4runtime_error
    def api_version(self):
        """
        P4Runtime API version.

        :return: API version hex
        """
        req = p4runtime_pb2.CapabilitiesRequest()
        rep = self.stub.Capabilities(req)
        return rep.p4runtime_api_version
+445 −0

File added.

Preview size limit exceeded, changes collapsed.

+284 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading