Packet queuing code

#ifndef _WFQ_H
#define _WFQ_H

#include <sys/types.h>
#include <sys/stream.h> 

/* The higher the priority the higher the number */
#define PREDICTED_CLASSES 2 /* that is classes 0, 1 and 2 */

/* Standard queue size ;) */
#define QUEUE_SIZE 65535

/* Termination flag */
#define TERMINATE 1

/* Available bandwidth */
#define BANDWIDTH 10000

#define wfq_create_table() wfq_init()
#define wfq_put_msg(m) wfq_enqueue(m)
#define wfq_get_msg() wfq_dequeue()
#define wfq_rm_msg() wfq_remove()

int wfq_p_size(mblk_t *p);
u_long wfq_p_saddr(mblk_t *p);
u_long wfq_p_daddr(mblk_t *p);
u_short wfq_p_sport(mblk_t *p);
u_short wfq_p_dport(mblk_t *p);

void wfq_init();
/*
 * Initializes the wfq module
 */

void wfq_enqueue(mblk_t *p);
/*
 * Enqueues the packet 'p' and schedules it for transmission
 * (if it is a guaranteed packet).
 */

mblk_t *wfq_dequeue();
/*
 * Dequeues a packet, the one that is schedulet for transmission
 * returns: a pointer to the packet or NULL if the queues are empty
 */

int wfq_remove();

#endif
#ifndef _WFQ_C
#define _WFQ_C

#include <stdio.h>
#include <sys/types.h>

#include <netinet/in.h>
#include <netinet/in_systm.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <sys/stream.h>

#include <sys/socket.h>
#include <net/if.h>
#include "ppp_str.h"

#include "fifo.h"
#include "table.h"
#include "wfq.h"

TABLE *tbl;      /* the flow stat[e/us] information */
int our_clock;       /* instead of a real time our_clock */
int old_pred_ts; /* the last timestamp of a predicted packet */
int bw;          /* available bandwidth */

/* the queues that serves the predicted flows */
FIFO *pred_queues[PREDICTED_CLASSES];

/* Kludge */
FIFO *to_be_removed;

int wfq_p_size(mblk_t *p) {

  struct ip *ip_hd;

  ip_hd=(struct ip *)((p->b_datap->db_base)+PPP_HDRLEN);
  return ip_hd->ip_hl;
}

u_long wfq_p_saddr(mblk_t *p) {

  struct ip *ip_hd;

  ip_hd=(struct ip *)((p->b_datap->db_base)+PPP_HDRLEN);
  return (u_long)(ip_hd->ip_src.s_addr);
}

u_long wfq_p_daddr(mblk_t *p) {

  struct ip *ip_hd;

  ip_hd=(struct ip *)((p->b_datap->db_base)+PPP_HDRLEN);
  return (u_long)(ip_hd->ip_dst.s_addr);
}

u_short wfq_p_sport(mblk_t *p) {

  struct ip *ip_hd;
  struct tcphdr *tcp_hd;

  ip_hd=(struct ip *)((p->b_datap->db_base)+PPP_HDRLEN);
  tcp_hd=(struct tcphdr *)((int)(ip_hd)+(ip_hd->ip_hl)*sizeof(int));
  return tcp_hd->th_sport;
}

u_short wfq_p_dport(mblk_t *p) {

  struct ip *ip_hd;
  struct tcphdr *tcp_hd;

  ip_hd=(struct ip *)((p->b_datap->db_base)+PPP_HDRLEN);
  tcp_hd=(struct tcphdr *)((int)(ip_hd)+(ip_hd->ip_hl)*sizeof(int));
  return tcp_hd->th_dport;
}



void wfq_init() {

  int i;
  int no_err=1;

  /* a mEngan kludge special fix cleanup thingy */
  to_be_removed=(FIFO *)NULL;
  
  /* create the table */
  if(!(tbl=table_create(PREDICTED_CLASSES))) {
    fprintf(stderr,"wfq_init: could not create table\n");
  }

  /* create and remember the fifo queues for the predicted classes */
  for(i=0;i<=PREDICTED_CLASSES;
      no_err=no_err&&((int)(pred_queues[i++]=fifo_create(QUEUE_SIZE))));

  if(!no_err) {
    fprintf(stderr,"wfq_init: could not create queues\n");
  }
}

int wfq_open_flow(int addr, int port, int type, int class_or_bw) {
  
  TABLE_Q_ID *flow_id;
  TABLE_GUAR *info_bw;
  TABLE_PRED *info_class;
  TABLE_ENTRY *entry;
  FIFO *queue;
  int errflag=0;

  /* addr/port to flow_id translation */
  flow_id=table_q_id(addr, port);
  
  /* we do different things for different types of flows */
  switch(type) {
    
  case(GUAR):
    /* fix bw info */
    info_bw=table_guar(class_or_bw);

    /* decrease available bw */
    if(class_or_bw<=bw) {

      bw-=class_or_bw;

    } else {

      fprintf(stderr,"wfq_open: no bandwidth available\n");
      errflag++;
    }
    
    if(!errflag) {

      /* create a fifo queue for this flow */
      if(!(queue=fifo_create(QUEUE_SIZE))) {

	fprintf(stderr,"wfq_open: could not create fifo queue\n");
	errflag++;
      }
    }
    
    if(!errflag) {

      /* insert the flow information into the table */
      if(table_insert_entry(tbl,queue,flow_id,type,(int *)info_bw)<0) {

	fprintf(stderr,"wfq_open: could not insert table entry\n");
	errflag++;
      }
    }
    
    if(!errflag) {

      /* initialize the timestamp */
      entry=table_lookup(tbl,flow_id);
      TABLE_TS(entry)=our_clock;
    }
    break;
    
  case(PRED):
    /* fix class info */
    info_class=table_pred(class_or_bw);
    
    /* get correct class (queue) */
    if(class_or_bw<0||class_or_bw>PREDICTED_CLASSES) {

      fprintf(stderr,"wfq_open: incorrect predicted class\n");
      errflag++;

    } else {

      queue=pred_queues[class_or_bw-1];
    }
    
    if(!errflag) {

      /* insert the flow information into the table */
      if(table_insert_entry(tbl,queue,flow_id,type,(int *)info_class)<0) {

	fprintf(stderr,"wfq_open: could not insert table entry\n");
	errflag++;
      }
    }
    break;

  default:
    fprintf(stderr,"wfq_open: unknown type\n");
  }
  
  if(errflag) {

    fprintf(stderr,"wfq_open: failed!\n");
    return(!errflag); /* maybe a panic instead! */
  }
}
    
int wfq_close_flow(int addr, int port) {

  TABLE_Q_ID *flow_id;
  TABLE_ENTRY *entry;
  int errflag=0;

  /* addr/port to flow_id translation */
  flow_id=table_q_id(addr,port);

  /* find table entry */
  if(!(entry=table_lookup(tbl,flow_id))) {

    fprintf(stderr,"wfq_close: attempt to close a nonexisting flow\n");
    errflag++;
  }

  if(!errflag) {

    /* is the queue empty or not? */
    if(!fifo_empty(TABLE_QUEUE(entry))) {

      /* not empty : mark it for termination */
      TABLE_TERMFLAG(entry)=TERMINATE;

    } else {
      
      /* empty : terminate it */
      if(!fifo_destroy(TABLE_QUEUE(entry))) {
	
	fprintf(stderr,"wfq_close_flow: could not remove fifo queue\n");
	errflag++;
      }
      
      /* increase available bw */
      bw+=TABLE_BW(entry);
      
      if(table_remove_entry(tbl,flow_id)<0) {

	fprintf(stderr,"wfq_close_flow: could not remove table entry\n");
	errflag++;
      }
    }
  }
  
  if(errflag) {
    fprintf(stderr,"wfq_close_flow: termination failed\n");
    return(!errflag);
  }
}

void wfq_enqueue(mblk_t *p) {
  
  int p_size, p_addr, p_port;
  TABLE_Q_ID *flow_id;
  TABLE_ENTRY *entry;
  int timestamp;
  int all_empty=0;
  int i;

  /* these must be fixed */
  p_size=wfq_p_size(p);
  p_addr=wfq_p_saddr(p);
  p_port=wfq_p_sport(p);
  
  /* addr/port to flow_id translation */
  flow_id=table_q_id(p_addr,p_port);
  
  /* There must be an entry ;) */
  if(entry=table_lookup(tbl,flow_id)) {

    if(TABLE_TERMFLAG(entry)) {
      fprintf(stderr,"wfq_enqueue: attempt to enqueue in terminated flow\n");

      /* throw into ASAP? */
      if(fifo_insert((int *)p,p_size,0,pred_queues[0])) {

	fprintf(stderr,"wfq_enqueue: failed to insert into ASAP queue\n");
      }
      return;
    }
    
    switch(TABLE_TYPE(entry)) {
      
    case GUAR:
      if(fifo_empty(TABLE_QUEUE(entry))) {

	/* Empty queue => use our "our_clock" */
	timestamp=(our_clock+p_size)/TABLE_BW(entry);

      } else {

	/* Queue not empty => use old timestamp */
	timestamp=(TABLE_TS(entry)+p_size)/TABLE_BW(entry);
      }
      
      /* Remember old timestamp */
      TABLE_TS(entry)=timestamp;

      /* Bump our_clock */
      our_clock=timestamp;
      break;

    case PRED: 
      /* Do not calculate timestamp here */
      timestamp=0;

      /* is this really correct (or necessary)??? */

      /*
       * if all predicted queues was empty then set the 
       * timestamp of the predicted flows to 'our_clock'
       */

      /* check the predicted queues */
      for(i=0;i<=PREDICTED_CLASSES;
	  all_empty=all_empty&&fifo_empty(pred_queues[i++]));

      /* are they empty? */
      if(all_empty) old_pred_ts=our_clock;
      break;

    default:
      /* ERROR */
      fprintf(stderr,"wfq_enqueue: unknown packet type\n");
    }
    
    if(!fifo_insert((int *)p, p_size, timestamp, TABLE_QUEUE(entry))) {

      fprintf(stderr,"wfq_enqueue: unable to insert packet\n");
    }
    
  } else {

    fprintf(stderr,"wfq_enqueue: no such flow_id\n");
    /* should go into the ASAP queue */

    if(!fifo_insert((int *)p, p_size, timestamp, pred_queues[0])) {

      fprintf(stderr,"wfq_enqueue: unable to insert packet\n");
    }
  }
}

mblk_t *wfq_dequeue() {

  FIFO *p_queue=(FIFO *)NULL;
  FIFO *g_queue=(FIFO *)NULL;
  FIFO_ITEM *p,*g;
  TABLE_ENTRY *entry, *prev;
  int p_ts,g_ts,ts;
  int i=PREDICTED_CLASSES;
  
  /* find the highest priority predicted queue that is not empty */
  while(i>=0) {

    if(p_queue||fifo_empty(pred_queues[i])) 

      i--;

    else

      p_queue=pred_queues[i];
  }
  
  /* did we find one? */
  if(p_queue) {

    /* calculate timestamp */
    p=fifo_first(p_queue);
    p_ts=(old_pred_ts+FIFO_SIZE(p))/bw;

  } else {

    /* bump the old timestamp */
    old_pred_ts=our_clock;
    p_ts=-1;
  }
    
  /* decide upon a guaranteed queue */
  if(entry=table_first(tbl,GUAR)) {

    /* is there anything in the queue */
    if(!fifo_empty(TABLE_QUEUE(entry))) 

      /* get hold of the timestamp */
      g_ts=FIFO_TS(fifo_first(TABLE_QUEUE(entry)));

    else

      /* queue empty */
      g_ts=-1;

    prev=entry;

    /* search through all the guaranteed flows */
    while(entry=table_next(tbl,prev)) {

      /* is there anything in the queue */
      if(!fifo_empty(TABLE_QUEUE(entry))) {
	
	/* get the timestamp */
	ts=FIFO_TS(fifo_first(TABLE_QUEUE(entry)));

	/* select the smallest */
	g_ts<ts?g_ts:ts;
      }

      /* next iteration */
      prev=entry;
    }

    /* get hold of the selected packet, if any */
    if(g_ts>=0) 

      g=fifo_first(TABLE_QUEUE(entry));

    else

      g=(FIFO_ITEM *)NULL;  
  }

  /* no packet to dequeue */
  if(g_ts==p_ts==-1) {
    return (mblk_t *)NULL;
  }
  
  /* return the packet with the smallest timestamp */
  if(g_ts<=p_ts) {

    to_be_removed=TABLE_QUEUE(entry);
    return (mblk_t *)FIFO_DATA(g);

  } else {
    
    to_be_removed=p_queue;
    return (mblk_t *)FIFO_DATA(p);
    
  }
}

int wfq_remove() {

  if(to_be_removed) 
    return fifo_remove(to_be_removed);
  else
    return 0;
}


#endif

Signalling Protocol (RSVP)

The main goal of this implementation is to make a networking device that can schedule IP traffic on existing network interfaces. Currently scheduling of packets is conducted according to a scheme known as Weighted Fair Queuing [2]. The device understands the RSVP protocol [3], interprets it and uses information from the protocol to set parameters in the WFQ scheduler. In order to make clear which terms refer to the architecture the following diagram should be used as reference

                RSVP (or protocol)                      
                                                        user
                ------------------------------------
                                                       kernel
                Scheduler (or Traffic Control Module)

In this text we use RSVP and protocol interchangeably, likewise so for scheduler and Traffic control module. The boundary between the protocol and the scheduler is also evident in their separation across user and kernel spaces respectively. Interactions between the two modules is discussed later. An important secondary goal is to without any modifications to the device driver. Other queuing schedulers [1] have been written but require modifications to the driver or the IP stack in some way. This report will start with a basic introduction to the system demonstrated with a simple 3 entity system, 2 end systems and a single router. In subsequent sections the internals of the scheduler are explained followed by some details about the protocol. Some examples of how to use the system are given followed by how to extend/change and if need be debug the system.

Basic architecture

The basic architecture of the system is as shown in the illustration below, all components run the rsvpd daemon, however only the router is responsible for making requests to the Operating System (OS) for resources. The presence of RSVP daemons on the hosts are to enable the communication with the RSVP on the router. The hosts can also use an application program RTAP supplied with the public domain implementation of RSVP from ISI (Currently the router must run rsvp in order to make reservations on the router).

       HOST                   Router                   HOST
        
        |                                               |
        v                                               v
        RSVP    <-------->     RSVP    <-------->      RSVP    
                                |
                                V      
                              Traffic
                              Control

Example Session

The following is given as to show what an RSVP session can look like driven from a user supplied program (RTAP).

Sender

T1> session udp 192.168.15.141
T1: rapi_session => sid= 1, fd= 3
T1> sender 192.168.15.155 [t 11111 22 ]
rapi_sender() OK
---------------------------------------------------------------
T1: Resv Event --   Session= 192.168.15.141:0
192.168.15.155:*   [G [111(20) p=1K m=200 M=1.5K] R=22 S=1.11K]
---------------------------------------------------------------

Receiver

T1> session udp 192.168.15.141
T1: rapi_session => sid= 1, fd= 3
---------------------------------------------------------------
T1: Path Event --   Session= 192.168.15.141:0
        sicsgen:*        [T [11.1K(22) p=Inf m=0 M=65.5K]]
                 [T [11.1K(22) p=Inf m=0 M=65.5K]]  G={0 0 0 0}
---------------------------------------------------------------
T1> reserve ff 192.168.15.155 [g 22 1111 111 20 1000 200 1500 ]

The user input commands are shown prefixed by T1> and the response by RTAP by T1:. The sender and receiver agree on a “session identifier” which is normally the receivers address and a port number. Thus both enter the session command. Next the sender advertises it’s traffic spec with the sender command and it’s interface address (optionally a port as well if the sender is sending multiple sessions), The receiver should see this as a Path Event, shown between in the receivers session. The receiver make a reservation to the sender specified thus:

  • command reserve
  • style Fixed Filter (FF)
  • type of service, “g” guaranteed
  • traffic spec

Finally the sender may (depends on the network configuration) see a message as a result of this reserve action. More details on how to drive an RSVP session can be found in man 8 rtap. The traffic specifications are explained in more detail in the Integrated Services section.

Scheduler

Platform

The code will as far is as known, will work on all BSD systems. This includes OpenBSD and NetBSD, little effort should be needed to port it to systems which have TCP/IP implementations based on BSD systems. As stated no change has been made to the internal structures of the TCP/IP or the driver so constructing the driver to work with systems that use STREAMS for example should not be difficult.

Duality

The objective to use such a device is it looks like a normal character device to the user space programs and a normal networking device to the kernel code. Therefore the device exhibits a “duality”depending on whether the view is from user space or the kernel, each perspective is described in following sections, where developers familiar with devices will be able to comprehend the devices use and likewise for the kernel and driver developers.

User perspective

Open

For the user space programmer the scheduler looks like a normal device. There are two ways in which to open/close the device, from the RSVP protocol and manual “ifconfig” command.

                           ifconfig       RSVP
                              |            |   
                              |            |       
                              V            V
                        +-----------------------+
                        |         open          | System call
                        +-----------------------+
                        |        wfqopen        | Kernel
                        |                       |
                        +-----------------------+

In order for the user to open the device a command similar to the one shown below should be issued, how to attach a command like this to RSVP is discussed in the Kernel/RSVP interface section.

        
                wfqfd = open("/dev/wfq", RDWR);

A file descriptor is returned by the system as a result of issuing the open call, the path and name of the device should be specified as a string and the flags indicating how the device should be opened. Write is necessary to use the “IO control” (ioctl) system call (man 2 ioctl). Thereafter the wfqfd file descriptor can be used as with normal file operations, read/write3/close. Should the open fail for some reason, then -1 will be returned, this should be checked for and reported to the application program. We use the file descriptor particularly for ioctls allowing parameters to set in the kernel called from the RSVP protocol, described next.

Ioctl

It is intended that most communication will be done via RSVP, however there might be cases where the user would like to control the interface without issuing RSVP commands, i.e. to reset the interface for example, for this see the Stand alone section.

The following code shows how to call the scheduler to perform a certain function, in this case “WFQ_ENABLE” a #defined value that can be matched in the kernel when the ioctl is executed. Additionally a pointer to a structure is passed that contains the data to be passed to the scheduler, for example:

        struct interface {
                char  eth_name[IFNAMSIZ];
                u_int eth_len;
        }
        
        bzero((void *)interface, IFNAMSIZ);
        strcpy(interface.eth_nam, "ep0");
        interface.eth_len = strlen("ep0");    

        if(ioctl(wfqfd, WFQ_ENABLE, &interface) < 0) {
            log(LOG_DEBUG, "Error in setting up scheduler");
            return TC_ERR;
        }
        return TC_OK;

Read

Reading from the device is accomplished through the read system call. Typically a call would look like :

        bytes_read = read(wfqfd, buf, sizeof(buf)); 

The device reads the requested number of bytes (sizeof(bytes)) and places them in an array of characters, named buf in this example. A possible use is to return the number of bytes output in a flow.The following section describes how the kernel implements requests from the user for more background see [4].

Kernel perspective

Described in the following section is an explanation of the code implemented and rationale behind the implementation. 2 flows of control are possible in the kernel, one for handling the protocol and one for the data stream. The protocol is dealt with first in the file system interface description. The skeleton file tc_test.c in the RSVP distribution provides the main interface functions from the user space RSVP daemon to the kernel. The function prototypes are found in rsvp_var.h. If the RSVP daemon is compiled with the SCHEDULE flag the routines in tc_test.c will be called automatically from the daemon (all the calls are done from the rsvp_resv.c file).

File system Interface

The code is loosely based on a network pseudo device that has a character device interface (cdevsw)4. We use the character device interface to pass and obtain information to and from the kernel. Packets do not leave the kernel in the flow of data.

The sequence of events from the user to the kernel is shown in the illustration:

        RSVP            open
        ifconfig ->     read           ->      /dev/wfq0
                        ioctl
                        close                        |  
USER                                                 v    
================================================================
KERNEL                                                          
                        wfqinit
     scheduler   <-     wfqdelete      <-       wfqioctl 
                        etc.

The above shows the flow of control from the RSVP daemon or a user “ifconfig” command to the scheduler. Once interpreted by the system call they are, to the kernel, essentially the same.

The dispatcher on the kernel side is a function called wfqioctl. Character device drivers in UNIX are expected to provide an interface to user space programs which implement common UNIX file system commands, read, write, select etc. The ioctl call is used to control a device and manipulate device parameters of the device and it is this system call that interprets the RSVP protocol.

Once a ioctl is called by a user space process the OS will dispatch it to the correct device (via the file descriptor) and and hence call the ioctl for the device specified.

Init/Delete

Within the kernel each flag in the ioctl system call is replicated so user specified command call can be matched:

    ioctl(wfqfd, WFQ_ENABLE, &interface)                   User
        
    #define WFQ_ENABLE _IOW('t', 94, struct interface)     Kernel

Exact details of how this is done is outside the scope of this report. However in the wfqioctl() function each of requests is matched in a switch statement and a corresponding function is called to carry out this request.

Setting the interface up involves the following steps:

  • Check to see if the interface is already up
  • Substitute the output of the given interface for a scheduling one
  • Globally mark the interface as up
  • Initialise a Best Effort flow (flow 0)

Similarly setting the interface down, either by “ifconfig down” or by killing the RSVP daemon with the “kill” command will close the device by as follows:

  • Check to see if the interface is already down
  • Put back the original output handler
  • Globally mark the interface as down
  • Delete the Best Effort flow (flow 0)

Note: the events above apply only to setting up and down the device. Adding and removing flows of data is the responsibility of the protocol. This process as well as handling the data is described in the next sections.