Use a separate thread for logging to stderr.
authorEmil Mikulic <emikulic@gmail.com>
Sun, 24 Mar 2013 14:16:47 +0000 (01:16 +1100)
committerEmil Mikulic <emikulic@gmail.com>
Sun, 24 Mar 2013 17:27:14 +0000 (04:27 +1100)
buftee.c

index 1f22a6a..651e6f3 100644 (file)
--- a/buftee.c
+++ b/buftee.c
@@ -42,6 +42,7 @@
 #include <fcntl.h>
 #include <pthread.h>
 #include <signal.h>
+#include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -52,7 +53,7 @@
 #define SLOW_NSEC 4000
 
 // Asserted on receipt of SIGTERM, SIGINT.
-static volatile int stopping = 0;
+static volatile int stopping = 0;  // Global.
 
 static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
   stopping = 1;
@@ -92,6 +93,9 @@ struct writer_thread {
 // A list of writer threads.
 STAILQ_HEAD(writer_thread_list, writer_thread);  // struct writer_thread_list
 
+static struct writer_thread* logger = NULL;  // Global.
+static struct timespec logger_start;  // Global.
+
 static struct buf* alloc_buf(const char* const data, const int len) {
   assert(len > 0);
   struct buf* buf = malloc(sizeof(*buf));
@@ -175,18 +179,18 @@ static int gettid(void) {
   return (int)syscall(SYS_gettid);
 }
 
+static void nblogx(const char* format, ...);  // Forward.
+
 static void warn_time(const char* desc,
                       const struct timespec* restrict start,
                       const struct timespec* restrict end) {
+  if (pthread_self() == logger->thread) {
+    return;
+  }
   struct timespec diff;
   time_diff(start, end, &diff);
   if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
-    char buf[128];
-    extern char *__progname;  // This is where glibc stashes argv[0].
-    snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n",
-            __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec);
-    // Best effort write to a non-blocking stderr.
-    (void)write(STDERR_FILENO, buf, strlen(buf));
+    nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
   }
 }
 
@@ -243,7 +247,7 @@ static int xwrite(const int fd, struct buf* const buf) {
     errno = saved_errno;
     if (write_ret == -1) {
       if (errno == EAGAIN) {
-        warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+        nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
         wait_until_writable(fd);
         continue;
       }
@@ -318,7 +322,7 @@ static void* writer_routine(void *arg) {
     lock(my->mutex);
     unref_buf(buf);
   }
-  warnx("thread exiting cleanly");
+  nblogx("thread exiting cleanly");
   return NULL;
 }
 
@@ -343,6 +347,70 @@ static void xpthread_cond_broadcast(pthread_cond_t* cond) {
   err(1, "pthread_cond_broadcast(%p) failed", cond);
 }
 
+static void init_logger_thread(void) {
+  assert(logger == NULL);
+  get_mono_time(&logger_start);
+  logger = malloc(sizeof(*logger));
+  logger->fd = STDERR_FILENO;
+  logger->mutex = malloc(sizeof(*logger->mutex));
+  pthread_mutex_init(logger->mutex, NULL);
+  logger->cond = malloc(sizeof(*logger->cond));
+  pthread_cond_init(logger->cond, NULL);
+  STAILQ_INIT(&logger->queue);
+  xpthread_create(&logger->thread, writer_routine, logger);
+}
+
+static void _nblog_helper(int want_errno,
+                          int saved_errno,
+                          const char* format,
+                          va_list va) {
+  // Timing.
+  struct timespec now;
+  struct timespec diff;
+  get_mono_time(&now);
+  time_diff(&logger_start, &now, &diff);
+
+  // Prefix.
+  char buf[512];
+  size_t len;
+  extern char *__progname;  // This is where glibc stashes argv[0].
+  len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ",
+                 __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec);
+
+  // Format message.
+  len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
+
+  if (want_errno) {
+    len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
+                    strerror(saved_errno), saved_errno);
+  }
+
+  len += snprintf(buf + len, sizeof(buf) - len, "\n");
+  struct buf* b = alloc_buf(buf, len);
+
+  // Enqueue.
+  lock(logger->mutex);
+  enqueue(&logger->queue, b);
+  xpthread_cond_broadcast(logger->cond);
+  unlock(logger->mutex);
+}
+
+// nblog() is like warn() but non-blocking.
+static void nblog(const char* format, ...) {
+  int saved_errno = errno;
+  va_list va;
+  va_start(va, format);
+  _nblog_helper(1, saved_errno, format, va);
+  va_end(va);
+}
+
+static void nblogx(const char* format, ...) {
+  va_list va;
+  va_start(va, format);
+  _nblog_helper(0, 0, format, va);
+  va_end(va);
+}
+
 static void xpthread_join(pthread_t thread) {
   int ret = pthread_join(thread, NULL);
   if (ret == 0) return;
@@ -355,6 +423,9 @@ static void sig_continue(int _ignored_ __attribute__((__unused__))) {
 }
 
 int main(int argc, char **argv) {
+  init_logger_thread();
+  nblogx("starting");
+
   pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
   pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
 
@@ -386,7 +457,7 @@ int main(int argc, char **argv) {
   while (!stopping) {
     wait_until_readable(STDIN_FILENO);
     if (stopping) {
-      warnx("stopping after select()");
+      nblogx("stopping after select()");
       break;
     }
 
@@ -394,7 +465,7 @@ int main(int argc, char **argv) {
     char data[READ_BUF_SIZE];
     int read_ret = xread(STDIN_FILENO, data, sizeof(data));
     if (read_ret == 0) {
-      warnx("stdin hit EOF");
+      nblogx("stdin hit EOF");
       break;
     }
     struct buf* buf = alloc_buf(data, read_ret);
@@ -429,6 +500,14 @@ int main(int argc, char **argv) {
     free(writer);
   }
 
+  // Clean up logger thread.
+  nblogx("writer threads stopped");
+  lock(logger->mutex);
+  xpthread_cond_broadcast(logger->cond);
+  unlock(logger->mutex);
+  xpthread_join(logger->thread);
+  free(logger);
+
   warnx("exiting cleanly");
   return 0;
 }