/* async.c */ #include "videoport.h" #define SPEW(x) /* diskio ------------------------------------------------------ */ /* states for diskio.state */ #define RUNME 0 /* main thread can V startsema now */ #define NEEDP 1 /* main thread must P on donesema */ #define PPENDING 2 /* main thread must select on donesema */ typedef ssize_t (*iovfunc)(int, const struct iovec *, int); /* readv/writev */ typedef struct _diskio /* totally non-encapsulated and proud of it! */ { /* === public interface -- write these before issue, read these freely */ int fd; off64_t offset; struct iovec *iov; int iovcnt; iovfunc func; /* readv/writev */ /* sub_thread fills these in when i/o completes */ off64_t lseek_rc; int lseek_errno; /* not defined if lseek succeeded */ int rc; /* readv/writev: not defined if lseek failed */ int errno; /* not defined if readv/writev succeeded or lseek failed */ /* === private interface -- you'll get in trouble if you read these * without reading the source code. use the videoport_ functions instead. * * each diskio in the main thread starts a sub-thread which it manages. * they communicate in roughly this way: * sub_thread() * { * while (1) * P on startsema (blocks) * do the readv/writev * V on donesema to tell main thread we're done (does not block) * } * main_thread() * { * for(;;) * { * fill in fd, offset, iov, iofunc * V on startsema to set sub-thread off (does not block) * P on donesema to wait for sub-thread (pollable semaphore) * } * } * startsema: P: sub-thread blocks on this as signal to issue i/o * V: main thread releases this to initiate i/o * donesema: P: main thread blocks on this to wait on completion of i/o * (via the fd of a pollable semaphore) * V: sub-thread releases this when i/o completes */ int sub_thread_pid; usptr_t *arena; usema_t *startsema; /* normal semaphore */ usema_t *donesema; /* pollable semaphore */ int donesemafd; /* fd for donesema */ int state; /* main thread sets this before waking sub-thread to say goodbye */ int dieflag; } _diskio; typedef _diskio *diskio; /* diskio: sub-thread ---------------------------------------------------- */ int sumvectors(struct iovec *iov, int iovcnt) { int i, ret=0; for(i=0; i < iovcnt; i++) ret += iov[i].iov_len; return ret; } void diskio_sub_thread(void /*_diskio*/ *arg) { diskio me = (diskio)arg; /* make it so I'll just die when my parent does */ OSIGC(signal(SIGHUP, SIG_DFL)); OC(prctl(PR_TERMCHILD)); while(1) { int sz; /* wait for main thread to tell us to go */ OC(uspsema(me->startsema)); if (me->dieflag) break; sz = sumvectors(me->iov, me->iovcnt); /* seek to offset */ me->lseek_rc = lseek64(me->fd, me->offset, SEEK_SET); if (me->lseek_rc < 0) me->lseek_errno = oserror(); else { /* call readv/writev */ me->rc = (*me->func)(me->fd, me->iov, me->iovcnt); if (me->rc < 0) me->errno = oserror(); } /* tell main thread we're done */ OC(usvsema(me->donesema)); } printf("diskio thread %p dying now\n", getpid()); } /* disk: main thread ---------------------------------------------------- */ diskio diskio_open(usptr_t *arena) { diskio me; NC(me = malloc(sizeof(_diskio))); me->dieflag = FALSE; /* create semaphores */ me->arena = arena; /* initially: stop sub-thread */ ONC(me->startsema = usnewsema(arena,0)); /* initially: stop main thread */ ONC(me->donesema = usnewpollsema(arena,0)); OC(me->donesemafd = usopenpollsema(me->donesema, 0600)); /* make a thread */ OC(me->sub_thread_pid = sproc(diskio_sub_thread, PR_SALL, me)); /* initially we have nothing to do */ me->state = RUNME; return me; } void diskio_close(diskio me) { assert(me->state == RUNME); /* v sub-thread with doneflag. it will exit. */ me->dieflag = TRUE; OC(usvsema(me->startsema)); /* wait for sub-thread to really die */ waitpid(me->sub_thread_pid, NULL, 0); /* thread is dead. hose semaphores. */ OC(usclosepollsema(me->donesema)); /* also closes fd */ OC(usfreepollsema(me->donesema, me->arena)); me->donesema = NULL; me->donesemafd = -1; usfreesema(me->startsema, me->arena); me->startsema = NULL; free(me); } /* returns TRUE if you can issue a disk I/O on this diskio now. * * a diskio will be ready when you first open it. * * a diskio could become ready to read/write within diskio_checkfdset() * OR diskio_issue(). fortunately, you can still select() in all cases * (see diskio_setfdset()). */ int diskio_ready(diskio me) { return !me->dieflag && me->state == RUNME; } /* * you must call this function before the select() in your main loop. * it will fill in fd's that it wants to select on. if it needs to * wake up at a UST which is less than *wakeust, it will modify *wakeust * to that UST. * * if you call this function when the diskio is ready, it will use * wakeust to cause your select() to wake immediately. */ void diskio_setfdset(diskio me, fd_set *readset, fd_set *writeset, fd_set *exceptset, stamp_t *wakeust) { if (me->state == PPENDING) FD_SET(me->donesemafd, readset); else if (me->state == RUNME) *wakeust = 0; /* ready to run -- wake up immediately */ } /* * you must call this after your select() loop. */ void diskio_checkfdset(diskio me, fd_set *readset, fd_set *writeset, fd_set *exceptset) { if (FD_ISSET(me->donesemafd, readset) && me->state == PPENDING) { /* yay -- write completed. */ me->state = RUNME; } } /* * when diskio_ready() returns true, you: * - fill in fd, offset, iov, iovcnt, and func (readv/writev) * - call diskio_issue() * * I/O will be done when diskio_ready() next returns true. * at this time you can examine diskio->lseek_rc, diskio_lseek_errno, * diskio->rc, and diskio->errno to get return values and errnos. */ void diskio_issue(diskio me) { int rc; assert(me->state == RUNME); /* v sub-thread so it starts I/O */ OC(usvsema(me->startsema)); /* * now we'll need to test for when this is done. * thanks to the ANNOYING pollable semaphores, it could be done now! */ me->state = NEEDP; /* try and acquire "done" semaphore */ OC(rc = uspsema(me->donesema)); switch(rc) { case 0: /* semaphore not available yet--we must let user select on it */ me->state = PPENDING; break; case 1: /* hurray--sub-thread is done */ me->state = RUNME; break; default: assert(rc == 0 || rc == 1); break; } assert(me->state != NEEDP); } /* main ------------------------------------------------------------ */ usptr_t *getarena() { usptr_t *arena; char *fname = (char*) tempnam("/usr/tmp", ".vda"); /* * make sure that temp files are unlinked automatically */ (void) usconfig(CONF_ARENATYPE, US_SHAREDONLY); /* * use fastest form of semaphores */ (void) usconfig(CONF_LOCKTYPE, US_NODEBUG); ONC(arena = usinit(fname)); return arena; } void async_spew(int *diskio_available, int ndiskios) { int d; for(d=0; d < ndiskios; d++) printf("%c", diskio_available[d] ? '.' : 'x'); printf(" "); } int main(int argc, char **argv) { usptr_t *arena; videoport p; int direction = (tolower(*argv[1]) == 'v') ? VIDEOPORT_DIRECTION_VM : VIDEOPORT_DIRECTION_MV; char *filename = argv[2]; int issue_count; int complete_count; diskperformance diskperf = diskperformance_open(); int nbytes, totalbytes; int d,f; struct dioattr dioinfo; int vector_chunksize; int vector_memalign; int ndiskios = 4; /* number of readvs/writevs at a time */ int ndiskios_available; int nfields_available; int nvectors = 10; /* number of fields per readv/writev */ int *diskio_available; diskio *diskios; videofield ***fields; NC(p = videoport_open(direction, 30, VIDEOPORT_ADVISE_NOACCESS)); if (direction == VIDEOPORT_DIRECTION_VM) { printf("writing input video to file %s\n", filename); } else /* direction == VIDEOPORT_DIRECTION_MV */ { printf("reading video from file %s for output\n", filename); } arena = getarena(); NC(diskio_available = malloc(ndiskios * sizeof(int))); NC(diskios = malloc(ndiskios * sizeof(diskio))); NC(fields = malloc(ndiskios * sizeof(videofield **))); for(d=0; d < ndiskios; d++) { NC(diskios[d] = diskio_open(arena)); NC(diskios[d]->iov = malloc(nvectors * sizeof(struct iovec))); NC(fields[d] = malloc(nvectors * sizeof(videofield *))); if (direction == VIDEOPORT_DIRECTION_VM) { OC(diskios[d]->fd = open(filename, O_RDWR|O_TRUNC|O_CREAT|O_DIRECT, 0644)); } else /* direction == VIDEOPORT_DIRECTION_MV */ { OC(diskios[d]->fd = open(filename, O_RDONLY|O_DIRECT, 0644)); } } nbytes = p->max_valid_bytes_per_field; totalbytes = nbytes * nvectors; /* get direct I/O constraints for these fds */ OC(fcntl(diskios[0]->fd, F_DIOINFO, &dioinfo)); vector_chunksize = max(dioinfo.d_miniosz, getpagesize()); vector_memalign = max(dioinfo.d_mem, getpagesize()); assert(totalbytes >= vector_chunksize); assert(totalbytes <= dioinfo.d_maxiosz); assert((nbytes % vector_chunksize) == 0); issue_count = 0; complete_count = 0; diskperformance_datapoint(diskperf, 0, p); for(d=0; d < ndiskios; d++) diskio_available[d] = 1; ndiskios_available = ndiskios; nfields_available = videoport_available_field_count(p); for(;;) { /* * while we have idle diskios and enough video fields to * make a whole diskio (nvectors), issue diskio requests. */ while(ndiskios_available > 0 && nfields_available >= nvectors) { /* find a free diskio */ for(d=0; d < ndiskios; d++) if (diskio_available[d]) break; assert(d != ndiskios); /* issue nvectors fields on that diskio */ diskios[d]->offset = ((off64_t)issue_count) * totalbytes; issue_count++; if (direction == VIDEOPORT_DIRECTION_VM) diskios[d]->func = writev; else diskios[d]->func = readv; diskios[d]->iovcnt = nvectors; for(f=0; f < nvectors; f++) { fields[d][f] = videoport_get_one_field(p); assert((((uintptr_t)fields[d][f]->pixels) % vector_memalign) == 0); diskios[d]->iov[f].iov_base = fields[d][f]->pixels; diskios[d]->iov[f].iov_len = nbytes; } diskio_issue(diskios[d]); /* the diskio may have already completed, but we'll catch * this after the select() below. */ diskio_available[d] = 0; ndiskios_available--; nfields_available -= nvectors; SPEW(async_spew(diskio_available, ndiskios)); SPEW(printf("%d issue (%d avail)\n", d, ndiskios_available)); } /* * wait for one or more diskio and nvectors or more fields * to become available. */ while (ndiskios_available == 0 || nfields_available < nvectors) { fd_set readset, writeset, exceptset; stamp_t wakeust; struct timeval tv, *tvp; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exceptset); wakeust = LONGLONG_MAX; /* * only select on something if we don't already have it. * otherwise, we'll hard spin. */ if (nfields_available < nvectors) { videoport_setfillpoint(p, nvectors); videoport_setfdset(p, &readset, &writeset, &exceptset, &wakeust); } for(d=0; d < ndiskios; d++) if (!diskio_available[d]) diskio_setfdset(diskios[d], &readset, &writeset, &exceptset, &wakeust); if (wakeust == LONGLONG_MAX) tvp = NULL; else { stamp_t now, time_till_wakeup; dmGetUST((unsigned long long *)(&now)); time_till_wakeup = wakeust - now; if (time_till_wakeup < 0) { tv.tv_sec = 0; tv.tv_usec = 0; } else { tv.tv_sec = (long)(time_till_wakeup / 1000000000LL); time_till_wakeup -= tv.tv_sec * 1000000000LL; tv.tv_usec = (long)(time_till_wakeup / 1000); } tvp = &tv; } OC(select(getdtablehi(), &readset, &writeset, &exceptset, tvp)); if (nfields_available < nvectors) { videoport_checkfdset(p, &readset, &writeset, &exceptset); } for(d=0; d < ndiskios; d++) if (!diskio_available[d]) diskio_checkfdset(diskios[d], &readset, &writeset, &exceptset); /* see how many fields we have now */ nfields_available = videoport_available_field_count(p); /* handle the diskios which have completed */ { for(d=0; d < ndiskios; d++) if (!diskio_available[d] && diskio_ready(diskios[d])) { /* a diskio completed */ if (diskios[d]->lseek_rc < 0) { setoserror(diskios[d]->lseek_errno); perror_exit("lseek64"); } if (diskios[d]->rc < 0) { setoserror(diskios[d]->errno); perror_exit("readv/writev"); } else if (diskios[d]->rc != totalbytes) { error_exit("EOF or out of disk space"); } for(f=0; f < nvectors; f++) videoport_put_one_field(p, fields[d][f]); diskio_available[d] = 1; ndiskios_available++; SPEW(async_spew(diskio_available, ndiskios)); SPEW(printf("%d complete (%d avail)\n", d, ndiskios_available)); complete_count++; diskperformance_datapoint(diskperf, ((off64_t)complete_count) * totalbytes, p); } } } } }