X-Git-Url: https://unix4lyfe.org/gitweb/buftee/blobdiff_plain/c5cadceead92752d3da1feb7bffdad9f19ea01b1..HEAD:/buftee.c diff --git a/buftee.c b/buftee.c index 8623515..552a333 100644 --- a/buftee.c +++ b/buftee.c @@ -241,16 +241,17 @@ static void wait_until_writable(const int fd) { errx(1, "select() did not return writable fd = %d", fd); } -static int xwrite(const int fd, struct buf* const buf) { - ssize_t write_ret; - int saved_errno; - struct timespec t0; - struct timespec t1; +// Returns zero on EOF, dies on error, retries on short write. +static int write_buf(const int fd, struct buf* const buf) { + char* data = buf->data; + size_t left = (size_t)buf->len; for (;;) { + struct timespec t0; + struct timespec t1; get_mono_time(&t0); - write_ret = write(fd, buf->data, (size_t)buf->len); - saved_errno = errno; + ssize_t write_ret = write(fd, data, left); + int saved_errno = errno; get_mono_time(&t1); warn_time("write()", &t0, &t1); @@ -261,17 +262,21 @@ static int xwrite(const int fd, struct buf* const buf) { wait_until_writable(fd); continue; } - err(1, "write(fd = %d, count = %d) failed", fd, buf->len); + err(1, "write(fd = %d, count = %d) failed", fd, (int)left); } - if (write_ret == 0) + if (write_ret == 0) { return 0; // EOF + } assert(write_ret >= 0); - if (write_ret < buf->len) - err(1, "write(fd = %d, count = %d) stopped short (returned %d)", - fd, buf->len, (int)write_ret); - // FIXME: handle this - assert(write_ret == buf->len); - return (int)write_ret; + if (write_ret < (ssize_t)left) { + nblogx("short write(fd = %d, count = %d): wrote %d", + fd, (int)left, (int)write_ret); + left -= write_ret; + data += write_ret; + continue; + } + assert(write_ret == (ssize_t)left); + return 1; } } @@ -327,18 +332,18 @@ static void* writer_routine(void *arg) { } // Write. - int write_ret = xwrite(my->fd, buf); - if (write_ret == 0) { + if (write_buf(my->fd, buf) == 0) { errx(1, "fd %d hit EOF", my->fd); } - assert(write_ret == buf->len); // Unreference buffer, freeing it if we have to. lock(my->mutex); unref_buf(buf); buf = NULL; } - nblogx("thread exiting cleanly"); + if (arg != logger) { + nblogx("thread exiting cleanly"); + } return NULL; } @@ -509,7 +514,9 @@ int main(int argc, char **argv) { while (!STAILQ_EMPTY(&writers)) { struct writer_thread* writer = STAILQ_FIRST(&writers); STAILQ_REMOVE_HEAD(&writers, entries); - // FIXME: free its queue? + if (!STAILQ_EMPTY(&writer->queue)) { + nblogx("queue for fd %d is not empty", writer->fd); + } free(writer); } @@ -520,6 +527,10 @@ int main(int argc, char **argv) { xpthread_cond_broadcast(logger->cond); unlock(logger->mutex); xpthread_join(logger->thread); + pthread_mutex_destroy(logger->mutex); + free(logger->mutex); + pthread_cond_destroy(logger->cond); + free(logger->cond); free(logger); warnx("exiting cleanly");