/*
   async.c
*/

#include "videoport.h"

#define SPEW(x)

/* diskio ------------------------------------------------------ */

/* states for diskio.state */
#define RUNME 0           /* main thread can V startsema now */
#define NEEDP 1           /* main thread must P on donesema */
#define PPENDING 2        /* main thread must select on donesema */

typedef ssize_t (*iovfunc)(int, const struct iovec *, int); /* readv/writev */

typedef struct _diskio /* totally non-encapsulated and proud of it! */
{
  /* === public interface -- write these before issue, read these freely */

  int fd;
  off64_t offset;
  struct iovec *iov;
  int iovcnt;
  iovfunc func; /* readv/writev */

  /* sub_thread fills these in when i/o completes */
  off64_t lseek_rc;
  int lseek_errno; /* not defined if lseek succeeded */
  int rc; /* readv/writev: not defined if lseek failed */
  int errno; /* not defined if readv/writev succeeded or lseek failed */

  /* === private interface -- you'll get in trouble if you read these
   * without reading the source code.  use the videoport_ functions instead.
   *
   * each diskio in the main thread starts a sub-thread which it manages.
   * they communicate in roughly this way:
   * sub_thread()
   * {
   *   while (1)
   *      P on startsema (blocks)
   *      do the readv/writev
   *      V on donesema to tell main thread we're done (does not block)
   * }
   * main_thread()
   * {
   *   for(;;)
   *     {
   *       fill in fd, offset, iov, iofunc
   *       V on startsema to set sub-thread off (does not block)
   *       P on donesema to wait for sub-thread (pollable semaphore)
   *     }
   * }
   * startsema: P: sub-thread blocks on this as signal to issue i/o
   *                 V: main thread releases this to initiate i/o
   * donesema: P: main thread blocks on this to wait on completion of i/o
   *                   (via the fd of a pollable semaphore)
   *                V: sub-thread releases this when i/o completes
   */
  int sub_thread_pid;
  usptr_t *arena;
  usema_t *startsema; /* normal semaphore */
  usema_t *donesema;  /* pollable semaphore */
  int donesemafd;     /* fd for donesema */
  int state;

  /* main thread sets this before waking sub-thread to say goodbye */
  int dieflag;
  
} _diskio;

typedef _diskio *diskio;

/* diskio: sub-thread ---------------------------------------------------- */

int sumvectors(struct iovec *iov, int iovcnt)
{
  int i, ret=0;
  for(i=0; i < iovcnt; i++)
    ret += iov[i].iov_len;
  return ret;
}

void diskio_sub_thread(void /*_diskio*/ *arg)
{
  diskio me = (diskio)arg;
  
  /* make it so I'll just die when my parent does */
  OSIGC(signal(SIGHUP, SIG_DFL));
  OC(prctl(PR_TERMCHILD));

  while(1)
    {
      int sz;
      
      /* wait for main thread to tell us to go */
      OC(uspsema(me->startsema));
      
      if (me->dieflag)
        break;
      
      sz = sumvectors(me->iov, me->iovcnt);

      /* seek to offset */
      me->lseek_rc = lseek64(me->fd, me->offset, SEEK_SET);
      if (me->lseek_rc < 0)
        me->lseek_errno = oserror();
      else
        {
          /* call readv/writev */
          me->rc = (*me->func)(me->fd, me->iov, me->iovcnt);
          if (me->rc < 0)
            me->errno = oserror();
        }
      
      /* tell main thread we're done */
      OC(usvsema(me->donesema));
    }
  
  printf("diskio thread %p dying now\n", getpid());
}

/* disk: main thread ---------------------------------------------------- */

diskio diskio_open(usptr_t *arena)
{
  diskio me;

  NC(me = malloc(sizeof(_diskio)));

  me->dieflag = FALSE;
  
  /* create semaphores */
  me->arena = arena;

  /* initially: stop sub-thread */
  ONC(me->startsema = usnewsema(arena,0));
  
  /* initially: stop main thread */
  ONC(me->donesema = usnewpollsema(arena,0));
  OC(me->donesemafd = usopenpollsema(me->donesema, 0600));

  /* make a thread */
  OC(me->sub_thread_pid = sproc(diskio_sub_thread, PR_SALL, me));
  
  /* initially we have nothing to do */
  me->state = RUNME;

  return me;
}

void diskio_close(diskio me)
{
  assert(me->state == RUNME);

  /* v sub-thread with doneflag. it will exit. */
  me->dieflag = TRUE;
  OC(usvsema(me->startsema));

  /* wait for sub-thread to really die */
  waitpid(me->sub_thread_pid, NULL, 0);
  
  /* thread is dead. hose semaphores. */
  
  OC(usclosepollsema(me->donesema)); /* also closes fd */
  OC(usfreepollsema(me->donesema, me->arena));
  me->donesema = NULL;
  me->donesemafd = -1;
  usfreesema(me->startsema, me->arena);
  me->startsema = NULL;

  free(me);
}

/* returns TRUE if you can issue a disk I/O on this diskio now.
 *
 * a diskio will be ready when you first open it.
 *
 * a diskio could become ready to read/write within diskio_checkfdset()
 * OR diskio_issue().  fortunately, you can still select() in all cases
 * (see diskio_setfdset()).
 */
int diskio_ready(diskio me)
{
  return !me->dieflag && me->state == RUNME;
}

/*
 * you must call this function before the select() in your main loop.
 * it will fill in fd's that it wants to select on.  if it needs to
 * wake up at a UST which is less than *wakeust, it will modify *wakeust
 * to that UST.
 *
 * if you call this function when the diskio is ready, it will use
 * wakeust to cause your select() to wake immediately.
 */
void diskio_setfdset(diskio me, 
                     fd_set *readset, fd_set *writeset, fd_set *exceptset,
                     stamp_t *wakeust)
{
  if (me->state == PPENDING)
    FD_SET(me->donesemafd, readset);
  else if (me->state == RUNME)
    *wakeust = 0; /* ready to run -- wake up immediately */
}

/*
 * you must call this after your select() loop.
 */
void diskio_checkfdset(diskio me, 
                       fd_set *readset, fd_set *writeset, fd_set *exceptset)
{
  if (FD_ISSET(me->donesemafd, readset) &&
      me->state == PPENDING)
    {
      /* yay -- write completed. */
      me->state = RUNME;
    }
}

/*
 * when diskio_ready() returns true, you:
 * - fill in fd, offset, iov, iovcnt, and func (readv/writev)
 * - call diskio_issue()
 *
 * I/O will be done when diskio_ready() next returns true.
 * at this time you can examine diskio->lseek_rc, diskio_lseek_errno,
 * diskio->rc, and diskio->errno to get return values and errnos.
 */
void diskio_issue(diskio me)
{
  int rc;

  assert(me->state == RUNME);
  
  /* v sub-thread so it starts I/O */

  OC(usvsema(me->startsema));
  
  /* 
   * now we'll need to test for when this is done.
   * thanks to the ANNOYING pollable semaphores, it could be done now!
   */
  me->state = NEEDP;

  /* try and acquire "done" semaphore */
  OC(rc = uspsema(me->donesema)); 
  switch(rc)
    {
    case 0: 
      /* semaphore not available yet--we must let user select on it */
      me->state = PPENDING;
      break;
    case 1:
      /* hurray--sub-thread is done */
      me->state = RUNME;
      break;
    default:
      assert(rc == 0 || rc == 1);
      break;
    }
  
  assert(me->state != NEEDP);
}

/* main ------------------------------------------------------------ */

usptr_t *getarena()
{
  usptr_t *arena;
  char *fname = (char*) tempnam("/usr/tmp", ".vda");

  /*
   * make sure that temp files are unlinked automatically
   */
  (void) usconfig(CONF_ARENATYPE, US_SHAREDONLY);
  
  /*
   * use fastest form of semaphores
   */
  (void) usconfig(CONF_LOCKTYPE, US_NODEBUG);
  
  ONC(arena = usinit(fname));

  return arena;
}

void async_spew(int *diskio_available, int ndiskios)
{
  int d;
  for(d=0; d < ndiskios; d++)
    printf("%c", diskio_available[d] ? '.' : 'x');
  printf(" ");
}

int main(int argc, char **argv)
{
  usptr_t *arena;
  videoport p;
  int direction = (tolower(*argv[1]) == 'v') ?
    VIDEOPORT_DIRECTION_VM : VIDEOPORT_DIRECTION_MV;
  char *filename = argv[2];
  int issue_count;
  int complete_count;
  diskperformance diskperf = diskperformance_open();
  int nbytes, totalbytes;
  int d,f;
  struct dioattr dioinfo;
  int vector_chunksize;
  int vector_memalign;
  int ndiskios = 4; /* number of readvs/writevs at a time */ 
  int ndiskios_available;
  int nfields_available;
  int nvectors = 10; /* number of fields per readv/writev  */
  int *diskio_available;
  diskio *diskios;
  videofield ***fields;

  NC(p = videoport_open(direction, 30, VIDEOPORT_ADVISE_NOACCESS));

  if (direction == VIDEOPORT_DIRECTION_VM)
    {
      printf("writing input video to file %s\n", filename);
    }
  else /* direction == VIDEOPORT_DIRECTION_MV */
    {
      printf("reading video from file %s for output\n", filename);
    }

  arena = getarena();

  NC(diskio_available = malloc(ndiskios * sizeof(int)));
  NC(diskios = malloc(ndiskios * sizeof(diskio)));
  NC(fields = malloc(ndiskios * sizeof(videofield **)));
      
  for(d=0; d < ndiskios; d++)
    {
      NC(diskios[d] = diskio_open(arena));
      NC(diskios[d]->iov = malloc(nvectors * sizeof(struct iovec)));
      NC(fields[d] = malloc(nvectors * sizeof(videofield *)));

      if (direction == VIDEOPORT_DIRECTION_VM)
        {
          OC(diskios[d]->fd = 
             open(filename, O_RDWR|O_TRUNC|O_CREAT|O_DIRECT, 0644));
        }
      else /* direction == VIDEOPORT_DIRECTION_MV */
        {
          OC(diskios[d]->fd = 
             open(filename, O_RDONLY|O_DIRECT, 0644));
        }
    }
  
  nbytes = p->max_valid_bytes_per_field;
  totalbytes = nbytes * nvectors;
  
  /* get direct I/O constraints for these fds */
  OC(fcntl(diskios[0]->fd, F_DIOINFO, &dioinfo));
  vector_chunksize = max(dioinfo.d_miniosz, getpagesize());
  vector_memalign = max(dioinfo.d_mem, getpagesize());
  assert(totalbytes >= vector_chunksize);
  assert(totalbytes <= dioinfo.d_maxiosz);
  assert((nbytes % vector_chunksize) == 0);
  
  issue_count = 0;
  complete_count = 0;
  diskperformance_datapoint(diskperf, 0, p);

  for(d=0; d < ndiskios; d++)
    diskio_available[d] = 1;
  ndiskios_available = ndiskios;

  nfields_available = videoport_available_field_count(p);

  for(;;)
    {
      /*
       * while we have idle diskios and enough video fields to
       * make a whole diskio (nvectors), issue diskio requests.
       */
      while(ndiskios_available > 0 && nfields_available >= nvectors)
        {
          /* find a free diskio */
          for(d=0; d < ndiskios; d++)
            if (diskio_available[d])
              break;
          assert(d != ndiskios);
          
          /* issue nvectors fields on that diskio */
          diskios[d]->offset = ((off64_t)issue_count) * totalbytes;
          issue_count++;
          if (direction == VIDEOPORT_DIRECTION_VM)
            diskios[d]->func = writev;
          else
            diskios[d]->func = readv;
          diskios[d]->iovcnt = nvectors;
          for(f=0; f < nvectors; f++)
            {
              fields[d][f] = videoport_get_one_field(p);
              assert((((uintptr_t)fields[d][f]->pixels) % vector_memalign)
                     == 0);
              diskios[d]->iov[f].iov_base = fields[d][f]->pixels;
              diskios[d]->iov[f].iov_len = nbytes;
            }
          diskio_issue(diskios[d]);
          /* the diskio may have already completed, but we'll catch
           * this after the select() below.
           */
          diskio_available[d] = 0;
          ndiskios_available--;
          nfields_available -= nvectors;
          SPEW(async_spew(diskio_available, ndiskios));
          SPEW(printf("%d issue (%d avail)\n", 
                      d, ndiskios_available));
        }
      
      /*
       * wait for one or more diskio and nvectors or more fields 
       * to become available.
       */
      while (ndiskios_available == 0 || nfields_available < nvectors)
        {
          fd_set readset, writeset, exceptset;
          stamp_t wakeust;
          struct timeval tv, *tvp;
          
          FD_ZERO(&readset);
          FD_ZERO(&writeset);
          FD_ZERO(&exceptset);
          wakeust = LONGLONG_MAX;
          
          /* 
           * only select on something if we don't already have it.
           * otherwise, we'll hard spin.
           */
          if (nfields_available < nvectors)
            {
              videoport_setfillpoint(p, nvectors);
              videoport_setfdset(p, &readset, &writeset, &exceptset,
                                 &wakeust);
            }
          for(d=0; d < ndiskios; d++)
            if (!diskio_available[d])
              diskio_setfdset(diskios[d], &readset, &writeset, &exceptset,
                              &wakeust);
          
          if (wakeust == LONGLONG_MAX)
            tvp = NULL;
          else
            {
              stamp_t now, time_till_wakeup;
              dmGetUST((unsigned long long *)(&now));
              time_till_wakeup = wakeust - now;
              if (time_till_wakeup < 0)
                { tv.tv_sec = 0;  tv.tv_usec = 0; }
              else
                {
                  tv.tv_sec = (long)(time_till_wakeup /  1000000000LL);
                  time_till_wakeup -= tv.tv_sec * 1000000000LL;
                  tv.tv_usec = (long)(time_till_wakeup / 1000);
                }
              tvp = &tv;
            }
          
          OC(select(getdtablehi(), &readset, &writeset, &exceptset, tvp));
          
          if (nfields_available < nvectors)
            {
              videoport_checkfdset(p, &readset, &writeset, &exceptset);
            }
          for(d=0; d < ndiskios; d++)
            if (!diskio_available[d])
              diskio_checkfdset(diskios[d], &readset, &writeset, &exceptset);

          /* see how many fields we have now */
          nfields_available = videoport_available_field_count(p);

          /* handle the diskios which have completed */
          {
            for(d=0; d < ndiskios; d++)
              if (!diskio_available[d] &&
                  diskio_ready(diskios[d]))
                {
                  /* a diskio completed */
                  if (diskios[d]->lseek_rc < 0)
                    { setoserror(diskios[d]->lseek_errno); 
                      perror_exit("lseek64"); }
                  if (diskios[d]->rc < 0)
                    { setoserror(diskios[d]->errno); 
                      perror_exit("readv/writev"); }
                  else if (diskios[d]->rc != totalbytes)
                    { error_exit("EOF or out of disk space"); }
                  
                  for(f=0; f < nvectors; f++)
                    videoport_put_one_field(p, fields[d][f]);
                  diskio_available[d] = 1;
                  ndiskios_available++;
                  SPEW(async_spew(diskio_available, ndiskios));
                  SPEW(printf("%d complete (%d avail)\n",
                              d, ndiskios_available));
                  complete_count++;
                  diskperformance_datapoint(diskperf, 
                                            ((off64_t)complete_count) *
                                            totalbytes, p);
                }
          }
        }
    }
}

  

