Make STDERR nonblocking again after SIGCONT.
[buftee] / buftee.c
index 0ab9bd4..8e430b6 100644 (file)
--- a/buftee.c
+++ b/buftee.c
@@ -64,7 +64,7 @@ static volatile int stopping = 0;
 
 // *** (end globals) ***********************************************************
 
-static void signal_handler(int _ignored_ __attribute__((__unused__))) {
+static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
   stopping = 1;
 }
 
@@ -215,42 +215,68 @@ static int xread(const int fd, char* const restrict buf, const int count) {
   return (int)read_ret;
 }
 
+static void wait_until_writable(const int fd) {
+  fd_set write_fds;
+  FD_ZERO(&write_fds);
+  FD_SET(fd, &write_fds);
+  int select_ret = select(fd + 1, NULL, &write_fds, NULL, NULL);
+  if (select_ret == -1) {
+    if (errno == EINTR) {
+      assert(stopping);  // that should have been SIGTERM
+      return;
+    }
+    err(1, "select(write fd = %d) failed", fd);
+  }
+  if (!FD_ISSET(fd, &write_fds))
+    errx(1, "select() did not return writable fd = %d", fd);
+}
+
 static int xwrite(const int fd, struct buf* const buf) {
   ssize_t write_ret;
   int saved_errno;
   struct timespec t0;
   struct timespec t1;
 
-  get_mono_time(&t0);
-  write_ret = write(fd, buf->data, (size_t)buf->len);
-  saved_errno = errno;
-  get_mono_time(&t1);
-  warn_time("write()", &t0, &t1);
-
-  errno = saved_errno;
-  if (write_ret == -1)
-    err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
-  //FIXME: EAGAIN?
-  if (write_ret == 0)
-    return 0;
-  assert(write_ret >= 0);
-  if (write_ret < buf->len)
-    err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
-        fd, buf->len, (int)write_ret);
-    // FIXME: handle this
-  assert(write_ret == buf->len);
-  return (int)write_ret;
+  for (;;) {
+    get_mono_time(&t0);
+    write_ret = write(fd, buf->data, (size_t)buf->len);
+    saved_errno = errno;
+    get_mono_time(&t1);
+    warn_time("write()", &t0, &t1);
+
+    errno = saved_errno;
+    if (write_ret == -1) {
+      if (errno == EAGAIN) {
+        warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+        wait_until_writable(fd);
+        continue;
+      }
+      err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+    }
+    if (write_ret == 0)
+      return 0;  // EOF
+    assert(write_ret >= 0);
+    if (write_ret < buf->len)
+      err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
+          fd, buf->len, (int)write_ret);
+      // FIXME: handle this
+    assert(write_ret == buf->len);
+    return (int)write_ret;
+  }
 }
 
-static int max(const int a, const int b) { return (a > b) ? a : b; }
-
 static void wait_until_readable(const int fd) {
-  int select_ret;
   fd_set read_fds;
   FD_ZERO(&read_fds);
   FD_SET(fd, &read_fds);
-  if ((select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL)) == -1)
-    err(1, "select() failed");
+  int select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL);
+  if (select_ret == -1) {
+    if (errno == EINTR) {
+      assert(stopping);  // that should have been SIGTERM
+      return;
+    }
+    err(1, "select(read fd = %d) failed", fd);
+  }
   if (!FD_ISSET(fd, &read_fds))
     errx(1, "select() did not return readable fd = %d", fd);
 }
@@ -271,11 +297,9 @@ static void unlock(pthread_mutex_t* mutex) {
 
 static void* writer_routine(void *arg) {
   struct writer_thread* my = arg;
+  struct buf* buf = NULL;
+  lock(&shared_queue_mutex);
   for (;;) {
-    // FIXME: less locking
-    struct buf* buf = NULL;
-
-    lock(&shared_queue_mutex);
     while (!stopping && STAILQ_EMPTY(&my->queue)) {
       // Sleep.
       pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
@@ -297,7 +321,6 @@ static void* writer_routine(void *arg) {
     // Unreference buffer, freeing it if we have to.
     lock(&shared_queue_mutex);
     unref_buf(buf);
-    unlock(&shared_queue_mutex);
   }
   warnx("thread exiting cleanly");
   return NULL;
@@ -326,14 +349,19 @@ static void xpthread_join(pthread_t thread) {
   err(1, "pthread_join(%lu) failed", thread);
 }
 
+static void sig_continue(int _ignored_ __attribute__((__unused__))) {
+  set_nonblocking(STDERR_FILENO);
+}
+
 int main(int argc, char **argv) {
   struct writer_thread_list writers;
   STAILQ_INIT(&writers);
 
-  if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
-  if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGINT, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGTERM, sig_stopping) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGCONT, sig_continue) == SIG_ERR) err(1, "signal() failed");
   //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
-  set_nonblocking(STDERR_FILENO);
+  sig_continue(0);
 
   // On Linux, making STDOUT non-blocking has the side-effect of
   // also making STDIN nonblocking.