Handle short writes.
[buftee] / buftee.c
index b69b318..552a333 100644 (file)
--- a/buftee.c
+++ b/buftee.c
@@ -87,6 +87,9 @@ struct writer_thread {
   // When a writer runs out of work to do, it sleeps on a shared condition.
   pthread_cond_t* cond;
 
+  // The writer thread exits when it's not running and its queue is empty.
+  volatile int running;
+
   STAILQ_ENTRY(writer_thread) entries;
 };
 
@@ -185,7 +188,7 @@ static void _nblog_helper(int want_errno,
                           const char* format,
                           ...);  // Forward.
 
-#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
 #define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
 
 static void warn_time(const char* desc,
@@ -238,16 +241,17 @@ static void wait_until_writable(const int fd) {
     errx(1, "select() did not return writable fd = %d", fd);
 }
 
-static int xwrite(const int fd, struct buf* const buf) {
-  ssize_t write_ret;
-  int saved_errno;
-  struct timespec t0;
-  struct timespec t1;
+// Returns zero on EOF, dies on error, retries on short write.
+static int write_buf(const int fd, struct buf* const buf) {
+  char* data = buf->data;
+  size_t left = (size_t)buf->len;
 
   for (;;) {
+    struct timespec t0;
+    struct timespec t1;
     get_mono_time(&t0);
-    write_ret = write(fd, buf->data, (size_t)buf->len);
-    saved_errno = errno;
+    ssize_t write_ret = write(fd, data, left);
+    int saved_errno = errno;
     get_mono_time(&t1);
     warn_time("write()", &t0, &t1);
 
@@ -258,17 +262,21 @@ static int xwrite(const int fd, struct buf* const buf) {
         wait_until_writable(fd);
         continue;
       }
-      err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+      err(1, "write(fd = %d, count = %d) failed", fd, (int)left);
     }
-    if (write_ret == 0)
+    if (write_ret == 0) {
       return 0;  // EOF
+    }
     assert(write_ret >= 0);
-    if (write_ret < buf->len)
-      err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
-          fd, buf->len, (int)write_ret);
-      // FIXME: handle this
-    assert(write_ret == buf->len);
-    return (int)write_ret;
+    if (write_ret < (ssize_t)left) {
+      nblogx("short write(fd = %d, count = %d): wrote %d",
+             fd, (int)left, (int)write_ret);
+      left -= write_ret;
+      data += write_ret;
+      continue;
+    }
+    assert(write_ret == (ssize_t)left);
+    return 1;
   }
 }
 
@@ -305,31 +313,37 @@ static void unlock(pthread_mutex_t* mutex) {
 static void* writer_routine(void *arg) {
   struct writer_thread* my = arg;
   struct buf* buf = NULL;
+
+  nblogx("writer thread for fd %d starting", my->fd);
   lock(my->mutex);
   for (;;) {
-    while (!stopping && STAILQ_EMPTY(&my->queue)) {
+    while (my->running && STAILQ_EMPTY(&my->queue)) {
       // Sleep.
       pthread_cond_wait(my->cond, my->mutex);
     }
-    if (!STAILQ_EMPTY(&my->queue))
+    if (!STAILQ_EMPTY(&my->queue)) {
       buf = dequeue(&my->queue);
+    }
     unlock(my->mutex);
 
-    if (stopping) break;
-    assert(buf != NULL);
+    if (buf == NULL) {
+      assert(!my->running);
+      break;
+    }
 
     // Write.
-    int write_ret = xwrite(my->fd, buf);
-    if (write_ret == 0) {
+    if (write_buf(my->fd, buf) == 0) {
       errx(1, "fd %d hit EOF", my->fd);
     }
-    assert(write_ret == buf->len);
 
     // Unreference buffer, freeing it if we have to.
     lock(my->mutex);
     unref_buf(buf);
+    buf = NULL;
+  }
+  if (arg != logger) {
+    nblogx("thread exiting cleanly");
   }
-  nblogx("thread exiting cleanly");
   return NULL;
 }
 
@@ -342,6 +356,7 @@ static void add_writer_thread(struct writer_thread_list* list,
   writer->fd = fd;
   writer->mutex = shared_queue_mutex;
   writer->cond = shared_wakeup_cond;
+  writer->running = 1;
   STAILQ_INIT(&writer->queue);
   STAILQ_INSERT_TAIL(list, writer, entries);
   xpthread_create(&writer->thread, writer_routine, writer);
@@ -363,6 +378,7 @@ static void init_logger_thread(void) {
   pthread_mutex_init(logger->mutex, NULL);
   logger->cond = malloc(sizeof(*logger->cond));
   pthread_cond_init(logger->cond, NULL);
+  logger->running = 1;
   STAILQ_INIT(&logger->queue);
   xpthread_create(&logger->thread, writer_routine, logger);
 }
@@ -372,10 +388,7 @@ static void _nblog_helper(int want_errno,
                           const char* func,
                           const char* format,
                           ...) {
-  int saved_errno;
-  if (want_errno)
-    saved_errno = errno;
-
+  int saved_errno = errno;
   va_list va;
   va_start(va, format);
 
@@ -387,7 +400,7 @@ static void _nblog_helper(int want_errno,
 
   // Prefix.
   char buf[512];
-  size_t len;
+  int len;
   extern char *__progname;  // This is where glibc stashes argv[0].
   len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
                  __progname, line, func, gettid(),
@@ -480,9 +493,14 @@ int main(int argc, char **argv) {
     unlock(&shared_queue_mutex);
   }
 
-  // Wake and join threads.
+  nblogx("stopping: draining writer threads");
   lock(&shared_queue_mutex);
-  stopping = 1;
+  {
+    struct writer_thread* writer;
+    STAILQ_FOREACH(writer, &writers, entries) {
+      writer->running = 0;
+    }
+  }
   xpthread_cond_broadcast(&shared_wakeup_cond);
   unlock(&shared_queue_mutex);
   {
@@ -496,16 +514,23 @@ int main(int argc, char **argv) {
   while (!STAILQ_EMPTY(&writers)) {
     struct writer_thread* writer = STAILQ_FIRST(&writers);
     STAILQ_REMOVE_HEAD(&writers, entries);
-    // FIXME: free its queue?
+    if (!STAILQ_EMPTY(&writer->queue)) {
+      nblogx("queue for fd %d is not empty", writer->fd);
+    }
     free(writer);
   }
 
   // Clean up logger thread.
-  nblogx("writer threads stopped");
+  nblogx("writer threads stopped, stopping logger");
   lock(logger->mutex);
+  logger->running = 0;
   xpthread_cond_broadcast(logger->cond);
   unlock(logger->mutex);
   xpthread_join(logger->thread);
+  pthread_mutex_destroy(logger->mutex);
+  free(logger->mutex);
+  pthread_cond_destroy(logger->cond);
+  free(logger->cond);
   free(logger);
 
   warnx("exiting cleanly");