Initial revision.
authorEmil Mikulic <emikulic@gmail.com>
Sun, 10 Mar 2013 17:14:23 +0000 (04:14 +1100)
committerEmil Mikulic <emikulic@gmail.com>
Sun, 10 Mar 2013 17:14:23 +0000 (04:14 +1100)
Makefile [new file with mode: 0644]
buftee.c [new file with mode: 0644]
tests/build.sh [new file with mode: 0755]

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..361e7c9
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,3 @@
+CFLAGS+=-std=c99 -pthread
+LDLIBS+=-lrt
+buftee: buftee.c
diff --git a/buftee.c b/buftee.c
new file mode 100644 (file)
index 0000000..4b03eca
--- /dev/null
+++ b/buftee.c
@@ -0,0 +1,390 @@
+/*-
+ * Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *
+ * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ]
+ */
+
+// buftee: like tee(1) but buffers in memory.
+//
+// Read from stdin into memory all the time, so that the writer doesn't
+// block.  Our memory usage is unbounded.
+//
+// Write to a number of file descriptors whenever we're able to.
+// Because write() can take a long time even with O_NONBLOCK set on the
+// fd, we have to do this from a separate kernel scheduling entity (we use
+// pthreads)
+//
+// We use a thread per output filedescriptor so that one slow output (e.g. disk)
+// doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use
+// flow control on the terminal!)
+
+#define _GNU_SOURCE  // for clock_gettime()
+
+#include <sys/queue.h>
+#include <sys/select.h>
+#include <sys/syscall.h>
+
+#include <assert.h>
+#include <err.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#define READ_BUF_SIZE 4096
+#define SLOW_NSEC 4000
+
+// *** GLOBALS *****************************************************************
+
+// All queues are locked through one global mutex.
+static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// When a writer runs out of work to do, it sleeps on this global cond.
+static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
+
+// Asserted on receipt of SIGTERM, SIGINT.
+static volatile int stopping = 0;
+
+// *** (end globals) ***********************************************************
+
+static void signal_handler(int _ignored_ __attribute__((__unused__))) {
+  stopping = 1;
+}
+
+// Reference-counted buffer, contains data that was read in the main thread.
+struct buf {
+  char* data;
+  int len;
+  int refcount;
+};
+
+// Queue of buffers.
+struct buf_queue_elem {
+  struct buf* buf;
+  STAILQ_ENTRY(buf_queue_elem) entries;
+};
+STAILQ_HEAD(buf_queue, buf_queue_elem);  // struct buf_queue
+
+// Context for a writer thread.
+struct writer_thread {
+  pthread_t thread;
+  int fd;
+
+  // Each writer has its own queue.
+  struct buf_queue queue;
+
+  STAILQ_ENTRY(writer_thread) entries;
+};
+
+// A list of writer threads.
+STAILQ_HEAD(writer_thread_list, writer_thread);  // struct writer_thread_list
+
+static struct buf* alloc_buf(const char* const data, const int len) {
+  assert(len > 0);
+  struct buf* buf = malloc(sizeof(*buf));
+  buf->data = malloc((size_t)len);
+  memcpy(buf->data, data, (size_t)len);
+  buf->len = len;
+  buf->refcount = 0;
+  return buf;
+}
+
+static void unref_buf(struct buf* buf) {
+  assert(buf->refcount > 0);
+  if ((--buf->refcount) == 0) {
+    free(buf->data);
+    free(buf);
+  }
+}
+
+static void enqueue(struct buf_queue* restrict queue,
+                    struct buf* restrict buf) {
+  struct buf_queue_elem* elem = malloc(sizeof(*elem));
+  elem->buf = buf;
+  buf->refcount++;
+  STAILQ_INSERT_TAIL(queue, elem, entries);
+}
+
+static struct buf* dequeue(struct buf_queue* const queue) {
+  assert(!STAILQ_EMPTY(queue));
+  struct buf_queue_elem* head;
+  head = STAILQ_FIRST(queue);
+  STAILQ_REMOVE_HEAD(queue, entries);
+  struct buf* buf = head->buf;
+  free(head);
+  return buf;
+}
+
+static void xpthread_create(pthread_t* thread,
+                            void* (*start_routine)(void*),
+                            void* arg) {
+  int ret = pthread_create(thread, NULL, start_routine, arg);
+  if (ret == 0) return;
+  errno = ret;
+  err(1, "pthread_create(%p) failed", thread);
+}
+
+static void set_nonblocking(const int fd) {
+  int flags;
+  if ((flags = fcntl(fd, F_GETFL)) == -1)
+    err(1, "fcntl(fd = %d, F_GETFL) failed", fd);
+  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
+    err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd);
+}
+
+static int make_file(const char* filename) {
+  int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666);
+  if (fd == -1)
+    err(1, "failed to open(\"%s\")", filename);
+  return fd;
+}
+
+static void get_mono_time(struct timespec* t) {
+  if (clock_gettime(CLOCK_MONOTONIC, t) == -1)
+    err(1, "clock_gettime(CLOCK_MONOTONIC) failed");
+}
+
+static void time_diff(const struct timespec* restrict start,
+                      const struct timespec* restrict end,
+                      struct timespec* restrict out) {
+  out->tv_sec = end->tv_sec - start->tv_sec;
+  out->tv_nsec = end->tv_nsec - start->tv_nsec;
+  if (out->tv_nsec < 0) {
+    out->tv_sec -= 1;
+    out->tv_nsec += 1000000000;
+  }
+  assert(out->tv_sec >= 0);
+  assert(out->tv_nsec >= 0);
+  assert(out->tv_nsec < 1000000000);
+}
+
+static void warn_time(const char* desc,
+                      const struct timespec* restrict start,
+                      const struct timespec* restrict end) {
+  struct timespec diff;
+  time_diff(start, end, &diff);
+  if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
+    char buf[128];
+    extern char *__progname;  // This is where glibc stashes argv[0].
+    snprintf(buf, sizeof(buf), "%d: %s: %s took %d.%09d secs\n",
+            (int)getpid(), __progname, desc,
+            (int)diff.tv_sec, (int)diff.tv_nsec);
+    // Best effort write to a non-blocking stderr.
+    (void)write(STDERR_FILENO, buf, strlen(buf));
+  }
+}
+
+static int xread(const int fd, char* const restrict buf, const int count) {
+  ssize_t read_ret;
+  int saved_errno;
+  struct timespec t0;
+  struct timespec t1;
+
+  get_mono_time(&t0);
+  read_ret = read(fd, buf, (size_t)count);
+  saved_errno = errno;
+  get_mono_time(&t1);
+  warn_time("read()", &t0, &t1);
+
+  errno = saved_errno;
+  if (read_ret == -1)
+    err(1, "read(fd = %d, count = %d) failed", fd, count);
+  //FIXME: EAGAIN?
+
+  assert(read_ret >= 0);
+  return (int)read_ret;
+}
+
+static int xwrite(const int fd, struct buf* const buf) {
+  ssize_t write_ret;
+  int saved_errno;
+  struct timespec t0;
+  struct timespec t1;
+
+  get_mono_time(&t0);
+  write_ret = write(fd, buf->data, (size_t)buf->len);
+  saved_errno = errno;
+  get_mono_time(&t1);
+  warn_time("write()", &t0, &t1);
+
+  errno = saved_errno;
+  if (write_ret == -1)
+    err(1, "write(fd = %d, count = %d) failed", fd, buf->len);
+  //FIXME: EAGAIN?
+  if (write_ret == 0)
+    return 0;
+  assert(write_ret >= 0);
+  if (write_ret < buf->len)
+    err(1, "write(fd = %d, count = %d) stopped short (returned %d)",
+        fd, buf->len, (int)write_ret);
+    // FIXME: handle this
+  assert(write_ret == buf->len);
+  return (int)write_ret;
+}
+
+static int max(const int a, const int b) { return (a > b) ? a : b; }
+
+static void wait_until_readable(const int fd) {
+  int select_ret;
+  fd_set read_fds;
+  FD_ZERO(&read_fds);
+  FD_SET(fd, &read_fds);
+  if ((select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL)) == -1)
+    err(1, "select() failed");
+  if (!FD_ISSET(fd, &read_fds))
+    errx(1, "select() did not return readable fd = %d", fd);
+}
+
+static void lock(pthread_mutex_t* mutex) {
+  int ret = pthread_mutex_lock(mutex);
+  if (ret == 0) return;
+  errno = ret;
+  err(1, "pthread_mutex_lock(%p) failed", mutex);
+}
+
+static void unlock(pthread_mutex_t* mutex) {
+  int ret = pthread_mutex_unlock(mutex);
+  if (ret == 0) return;
+  errno = ret;
+  err(1, "pthread_mutex_unlock(%p) failed", mutex);
+}
+
+static void* writer_routine(void *arg) {
+  struct writer_thread* my = arg;
+  for (;;) {
+    // FIXME: less locking
+    struct buf* buf = NULL;
+
+    lock(&shared_queue_mutex);
+    while (!stopping && STAILQ_EMPTY(&my->queue)) {
+      // Sleep.
+      pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex);
+    }
+    if (!STAILQ_EMPTY(&my->queue))
+      buf = dequeue(&my->queue);
+    unlock(&shared_queue_mutex);
+
+    if (stopping) break;
+    assert(buf != NULL);
+
+    // Write.
+    int write_ret = xwrite(my->fd, buf);
+    if (write_ret == 0) {
+      errx(1, "fd %d hit EOF", my->fd);
+    }
+    assert(write_ret == buf->len);
+
+    // Unreference buffer, freeing it if we have to.
+    lock(&shared_queue_mutex);
+    unref_buf(buf);
+    unlock(&shared_queue_mutex);
+  }
+  warnx("thread exiting cleanly");
+  return NULL;
+}
+
+static void add_writer_thread(struct writer_thread_list* list, const int fd) {
+  set_nonblocking(fd);
+  struct writer_thread* writer = malloc(sizeof(*writer));
+  writer->fd = fd;
+  STAILQ_INIT(&(writer->queue));
+  STAILQ_INSERT_TAIL(list, writer, entries);
+  xpthread_create(&(writer->thread), writer_routine, writer);
+}
+
+static void xpthread_cond_broadcast(pthread_cond_t* cond) {
+  int ret = pthread_cond_broadcast(cond);
+  if (ret == 0) return;
+  errno = ret;
+  err(1, "pthread_cond_broadcast(%p) failed", cond);
+}
+
+static void xpthread_join(pthread_t thread) {
+  int ret = pthread_join(thread, NULL);
+  if (ret == 0) return;
+  errno = ret;
+  err(1, "pthread_join(%lu) failed", thread);
+}
+
+int main(int argc, char **argv) {
+  struct writer_thread_list writers;
+  STAILQ_INIT(&writers);
+
+  if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed");
+  if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed");
+  //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed");
+  set_nonblocking(STDERR_FILENO);
+
+  // On Linux, making STDOUT non-blocking has the side-effect of
+  // also making STDIN nonblocking.
+  add_writer_thread(&writers, STDOUT_FILENO);
+
+  // Process cmdline args.
+  for (int i = 1; i < argc; i++) {
+    add_writer_thread(&writers, make_file(argv[i]));
+  }
+
+  // Reader loop.
+  while (!stopping) {
+    wait_until_readable(STDIN_FILENO);
+    if (stopping) {
+      warnx("stopping after select()");
+      break;
+    }
+
+    // Read.
+    char data[READ_BUF_SIZE];
+    int read_ret = xread(STDIN_FILENO, data, sizeof(data));
+    if (read_ret == 0) {
+      warnx("stdin hit EOF");
+      break;
+    }
+    struct buf* buf = alloc_buf(data, read_ret);
+
+    // Enqueue.
+    lock(&shared_queue_mutex);
+    struct writer_thread* writer;
+    STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf);
+    xpthread_cond_broadcast(&shared_wakeup_cond);
+    unlock(&shared_queue_mutex);
+  }
+
+  // Wake and join threads.
+  stopping = 1;
+  lock(&shared_queue_mutex);
+  xpthread_cond_broadcast(&shared_wakeup_cond);
+  unlock(&shared_queue_mutex);
+  {
+    struct writer_thread* writer;
+    STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread);
+  }
+
+  // Free writer list.
+  while (!STAILQ_EMPTY(&writers)) {
+    struct writer_thread* writer = STAILQ_FIRST(&writers);
+    STAILQ_REMOVE_HEAD(&writers, entries);
+    // FIXME: free its queue?
+    free(writer);
+  }
+
+  warnx("exiting cleanly");
+  return 0;
+}
+// vim:set ts=2 sw=2 tw=80 et:
diff --git a/tests/build.sh b/tests/build.sh
new file mode 100755 (executable)
index 0000000..d0cef60
--- /dev/null
@@ -0,0 +1,61 @@
+#!/bin/bash
+# Test building with different compilers.
+# Copyright (c) 2013 Emil Mikulic <emikulic@gmail.com>
+#
+BOLD=$(tput bold)
+NORMAL=$(tput sgr0)
+RED=$(tput setf 9)
+GRAY=$(tput setf 8)
+RULER=${GRAY}$(for i in $(seq $(tput cols)); do echo -n -; done)${NORMAL}
+notice() { echo "${BOLD}==> $*${NORMAL}"; }
+fatal() { echo "${RED}==> FATAL: $*${NORMAL}" >&2; exit 1; }
+report() {
+  local RET=$?
+  echo -n "${BOLD}==> $*: "
+  if [[ $RET = 0 ]]; then
+    echo success
+  else
+    echo "${RED}FAILURE"
+  fi
+  echo "${RULER}"
+}
+
+# Local hack:
+MY_CLANG=$HOME/llvm/install/bin/clang
+if [[ -z $CLANG ]]; then
+  # No CLANG env var set, try to guess.
+  if [[ -e $MY_CLANG ]]; then
+    CLANG=$MY_CLANG
+  else
+    CLANG=clang
+  fi
+fi
+if ! which $CLANG >/dev/null; then
+  fatal "can't find clang as [$CLANG]"
+fi
+notice "clang is [$CLANG]"
+
+RUNDIR=$PWD
+notice "[$0] run from [$RUNDIR]"
+cd $RUNDIR || exit 1
+SRCDIR=$(cd $(dirname $0)/..; pwd)
+notice "src dir is [$SRCDIR]"
+cd $SRCDIR || exit 1
+
+notice build: standard
+rm -f buftee
+make
+report build: standard
+
+notice build: gcc with all the warnings
+rm -f buftee
+CFLAGS="-fdiagnostics-show-option --all-warnings --extra-warnings -O"
+env CC=gcc "CFLAGS=$CFLAGS" make
+report build: gcc with all the warnings
+
+notice build: clang with all the warnings
+rm -f buftee
+env CC=$CLANG "CFLAGS=-Weverything" make
+report build: clang
+
+notice finished