Handle short writes.
[buftee] / buftee.c
index feb213b..552a333 100644 (file)
--- a/buftee.c
+++ b/buftee.c
@@ -42,6 +42,7 @@
 #include <fcntl.h>
 #include <pthread.h>
 #include <signal.h>
+#include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #define READ_BUF_SIZE 4096
 #define SLOW_NSEC 4000
 
-// *** GLOBALS *****************************************************************
-
-// All queues are locked through one global mutex.
-static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// When a writer runs out of work to do, it sleeps on this global cond.
-static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
-
 // Asserted on receipt of SIGTERM, SIGINT.
-static volatile int stopping = 0;
-
-// *** (end globals) ***********************************************************
+static volatile int stopping = 0;  // Global.
 
-static void signal_handler(int _ignored_ __attribute__((__unused__))) {
+static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
   stopping = 1;
 }
 
@@ -90,12 +81,24 @@ struct writer_thread {
   // Each writer has its own queue.
   struct buf_queue queue;
 
+  // Pointer to a shared mutex which protects all queues.
+  pthread_mutex_t* mutex;
+
+  // When a writer runs out of work to do, it sleeps on a shared condition.
+  pthread_cond_t* cond;
+
+  // The writer thread exits when it's not running and its queue is empty.
+  volatile int running;
+
   STAILQ_ENTRY(writer_thread) entries;
 };
 
 // A list of writer threads.
 STAILQ_HEAD(writer_thread_list, writer_thread);  // struct writer_thread_list
 
+static struct writer_thread* logger = NULL;  // Global.
+static struct timespec logger_start;  // Global.
+
 static struct buf* alloc_buf(const char* const data, const int len) {
   assert(len > 0);
   struct buf* buf = malloc(sizeof(*buf));
@@ -179,18 +182,25 @@ static int gettid(void) {
   return (int)syscall(SYS_gettid);
 }
 
+static void _nblog_helper(int want_errno,
+                          int line,
+                          const char* func,
+                          const char* format,
+                          ...);  // Forward.
+
+//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt)
+#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt)
+
 static void warn_time(const char* desc,
                       const struct timespec* restrict start,
                       const struct timespec* restrict end) {
+  if (pthread_self() == logger->thread) {
+    return;
+  }
   struct timespec diff;
   time_diff(start, end, &diff);
   if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
-    char buf[128];
-    extern char *__progname;  // This is where glibc stashes argv[0].
-    snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n",
-            __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec);
-    // Best effort write to a non-blocking stderr.
-    (void)write(STDERR_FILENO, buf, strlen(buf));
+    nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
   }
 }
 
@@ -231,37 +241,42 @@ static void wait_until_writable(const int fd) {
     errx(1, "select() did not return writable fd = %d", fd);
 }
 
-static int xwrite(const int fd, struct buf* const buf) {
-  ssize_t write_ret;
-  int saved_errno;
-  struct timespec t0;
-  struct timespec t1;
+// Returns zero on EOF, dies on error, retries on short write.
+static int write_buf(const int fd, struct buf* const buf) {
+  char* data = buf->data;
+  size_t left = (size_t)buf->len;
 
   for (;;) {
+    struct timespec t0;
+    struct timespec t1;
     get_mono_time(&t0);
-    write_ret = write(fd, buf->data, (size_t)buf->len);
-    saved_errno = errno;
+    ssize_t write_ret = write(fd, data, left);
+    int saved_errno = errno;
     get_mono_time(&t1);
     warn_time("write()", &t0, &t1);
 
     errno = saved_errno;
     if (write_ret == -1) {
       if (errno == EAGAIN) {
-        warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+        nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
         wait_until_writable(fd);
         continue;
       }
-      err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+      err(1, "write(fd = %d, count = %d) failed", fd, (int)left);
     }
-    if (write_ret == 0)
+    if (write_ret == 0) {
       return 0;  // EOF
+    }
     assert(write_ret >= 0);
-    if (write_ret < buf->len)
-      err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
-          fd, buf->len, (int)write_ret);
-      // FIXME: handle this
-    assert(write_ret == buf->len);
-    return (int)write_ret;
+    if (write_ret < (ssize_t)left) {
+      nblogx("short write(fd = %d, count = %d): wrote %d",
+             fd, (int)left, (int)write_ret);
+      left -= write_ret;
+      data += write_ret;
+      continue;
+    }
+    assert(write_ret == (ssize_t)left);
+    return 1;
   }
 }
 
@@ -298,41 +313,53 @@ static void unlock(pthread_mutex_t* mutex) {
 static void* writer_routine(void *arg) {
   struct writer_thread* my = arg;
   struct buf* buf = NULL;
-  lock(&shared_queue_mutex);
+
+  nblogx("writer thread for fd %d starting", my->fd);
+  lock(my->mutex);
   for (;;) {
-    while (!stopping && STAILQ_EMPTY(&my->queue)) {
+    while (my->running && STAILQ_EMPTY(&my->queue)) {
       // Sleep.
-      pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
+      pthread_cond_wait(my->cond, my->mutex);
     }
-    if (!STAILQ_EMPTY(&my->queue))
+    if (!STAILQ_EMPTY(&my->queue)) {
       buf = dequeue(&my->queue);
-    unlock(&shared_queue_mutex);
+    }
+    unlock(my->mutex);
 
-    if (stopping) break;
-    assert(buf != NULL);
+    if (buf == NULL) {
+      assert(!my->running);
+      break;
+    }
 
     // Write.
-    int write_ret = xwrite(my->fd, buf);
-    if (write_ret == 0) {
+    if (write_buf(my->fd, buf) == 0) {
       errx(1, "fd %d hit EOF", my->fd);
     }
-    assert(write_ret == buf->len);
 
     // Unreference buffer, freeing it if we have to.
-    lock(&shared_queue_mutex);
+    lock(my->mutex);
     unref_buf(buf);
+    buf = NULL;
+  }
+  if (arg != logger) {
+    nblogx("thread exiting cleanly");
   }
-  warnx("thread exiting cleanly");
   return NULL;
 }
 
-static void add_writer_thread(struct writer_thread_list* list, const int fd) {
-  set_nonblocking(fd);
+static void add_writer_thread(struct writer_thread_list* list,
+                              int fd,
+                              pthread_mutex_t* shared_queue_mutex,
+                              pthread_cond_t* shared_wakeup_cond) {
   struct writer_thread* writer = malloc(sizeof(*writer));
+  set_nonblocking(fd);
   writer->fd = fd;
-  STAILQ_INIT(&(writer->queue));
+  writer->mutex = shared_queue_mutex;
+  writer->cond = shared_wakeup_cond;
+  writer->running = 1;
+  STAILQ_INIT(&writer->queue);
   STAILQ_INSERT_TAIL(list, writer, entries);
-  xpthread_create(&(writer->thread), writer_routine, writer);
+  xpthread_create(&writer->thread, writer_routine, writer);
 }
 
 static void xpthread_cond_broadcast(pthread_cond_t* cond) {
@@ -342,6 +369,61 @@ static void xpthread_cond_broadcast(pthread_cond_t* cond) {
   err(1, "pthread_cond_broadcast(%p) failed", cond);
 }
 
+static void init_logger_thread(void) {
+  assert(logger == NULL);
+  get_mono_time(&logger_start);
+  logger = malloc(sizeof(*logger));
+  logger->fd = STDERR_FILENO;
+  logger->mutex = malloc(sizeof(*logger->mutex));
+  pthread_mutex_init(logger->mutex, NULL);
+  logger->cond = malloc(sizeof(*logger->cond));
+  pthread_cond_init(logger->cond, NULL);
+  logger->running = 1;
+  STAILQ_INIT(&logger->queue);
+  xpthread_create(&logger->thread, writer_routine, logger);
+}
+
+static void _nblog_helper(int want_errno,
+                          int line,
+                          const char* func,
+                          const char* format,
+                          ...) {
+  int saved_errno = errno;
+  va_list va;
+  va_start(va, format);
+
+  // Timing.
+  struct timespec now;
+  struct timespec diff;
+  get_mono_time(&now);
+  time_diff(&logger_start, &now, &diff);
+
+  // Prefix.
+  char buf[512];
+  int len;
+  extern char *__progname;  // This is where glibc stashes argv[0].
+  len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ",
+                 __progname, line, func, gettid(),
+                 (int)diff.tv_sec, (int)diff.tv_nsec);
+
+  // Format message.
+  len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
+
+  if (want_errno) {
+    len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
+                    strerror(saved_errno), saved_errno);
+  }
+
+  len += snprintf(buf + len, sizeof(buf) - len, "\n");
+  struct buf* b = alloc_buf(buf, len);
+
+  // Enqueue.
+  lock(logger->mutex);
+  enqueue(&logger->queue, b);
+  xpthread_cond_broadcast(logger->cond);
+  unlock(logger->mutex);
+}
+
 static void xpthread_join(pthread_t thread) {
   int ret = pthread_join(thread, NULL);
   if (ret == 0) return;
@@ -349,29 +431,46 @@ static void xpthread_join(pthread_t thread) {
   err(1, "pthread_join(%lu) failed", thread);
 }
 
+static void sig_continue(int _ignored_ __attribute__((__unused__))) {
+  set_nonblocking(STDERR_FILENO);
+}
+
 int main(int argc, char **argv) {
+  init_logger_thread();
+  nblogx("starting");
+
+  pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+  pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
+
   struct writer_thread_list writers;
   STAILQ_INIT(&writers);
 
-  if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
-  if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
   //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
-  set_nonblocking(STDERR_FILENO);
+  sig_continue(0);
 
   // On Linux, making STDOUT non-blocking has the side-effect of
   // also making STDIN nonblocking.
-  add_writer_thread(&writers, STDOUT_FILENO);
+  add_writer_thread(&writers,
+                    STDOUT_FILENO,
+                    &shared_queue_mutex,
+                    &shared_wakeup_cond);
 
   // Process cmdline args.
   for (int i = 1; i < argc; i++) {
-    add_writer_thread(&writers, make_file(argv[i]));
+    add_writer_thread(&writers,
+                      make_file(argv[i]),
+                      &shared_queue_mutex,
+                      &shared_wakeup_cond);
   }
 
   // Reader loop.
   while (!stopping) {
     wait_until_readable(STDIN_FILENO);
     if (stopping) {
-      warnx("stopping after select()");
+      nblogx("stopping after select()");
       break;
     }
 
@@ -379,7 +478,7 @@ int main(int argc, char **argv) {
     char data[READ_BUF_SIZE];
     int read_ret = xread(STDIN_FILENO, data, sizeof(data));
     if (read_ret == 0) {
-      warnx("stdin hit EOF");
+      nblogx("stdin hit EOF");
       break;
     }
     struct buf* buf = alloc_buf(data, read_ret);
@@ -387,29 +486,53 @@ int main(int argc, char **argv) {
     // Enqueue.
     lock(&shared_queue_mutex);
     struct writer_thread* writer;
-    STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
+    STAILQ_FOREACH(writer, &writers, entries) {
+      enqueue(&(writer->queue), buf);
+    }
     xpthread_cond_broadcast(&shared_wakeup_cond);
     unlock(&shared_queue_mutex);
   }
 
-  // Wake and join threads.
-  stopping = 1;
+  nblogx("stopping: draining writer threads");
   lock(&shared_queue_mutex);
+  {
+    struct writer_thread* writer;
+    STAILQ_FOREACH(writer, &writers, entries) {
+      writer->running = 0;
+    }
+  }
   xpthread_cond_broadcast(&shared_wakeup_cond);
   unlock(&shared_queue_mutex);
   {
     struct writer_thread* writer;
-    STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
+    STAILQ_FOREACH(writer, &writers, entries) {
+      xpthread_join(writer->thread);
+    }
   }
 
   // Free writer list.
   while (!STAILQ_EMPTY(&writers)) {
     struct writer_thread* writer = STAILQ_FIRST(&writers);
     STAILQ_REMOVE_HEAD(&writers, entries);
-    // FIXME: free its queue?
+    if (!STAILQ_EMPTY(&writer->queue)) {
+      nblogx("queue for fd %d is not empty", writer->fd);
+    }
     free(writer);
   }
 
+  // Clean up logger thread.
+  nblogx("writer threads stopped, stopping logger");
+  lock(logger->mutex);
+  logger->running = 0;
+  xpthread_cond_broadcast(logger->cond);
+  unlock(logger->mutex);
+  xpthread_join(logger->thread);
+  pthread_mutex_destroy(logger->mutex);
+  free(logger->mutex);
+  pthread_cond_destroy(logger->cond);
+  free(logger->cond);
+  free(logger);
+
   warnx("exiting cleanly");
   return 0;
 }