From: Emil Mikulic Date: Sun, 10 Mar 2013 17:14:23 +0000 (+1100) Subject: Initial revision. X-Git-Url: https://unix4lyfe.org/gitweb/buftee/commitdiff_plain/855dedd5f820815ceebc23d3bdaef1a44121bd09 Initial revision. --- 855dedd5f820815ceebc23d3bdaef1a44121bd09 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..361e7c9 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +CFLAGS+=-std=c99 -pthread +LDLIBS+=-lrt +buftee: buftee.c diff --git a/buftee.c b/buftee.c new file mode 100644 index 0000000..4b03eca --- /dev/null +++ b/buftee.c @@ -0,0 +1,390 @@ +/*- + * Copyright (c) 2013 Emil Mikulic + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * [ http://www.openbsd.org/cgi-bin/cvsweb/src/share/misc/license.template ] + */ + +// buftee: like tee(1) but buffers in memory. +// +// Read from stdin into memory all the time, so that the writer doesn't +// block. Our memory usage is unbounded. +// +// Write to a number of file descriptors whenever we're able to. +// Because write() can take a long time even with O_NONBLOCK set on the +// fd, we have to do this from a separate kernel scheduling entity (we use +// pthreads) +// +// We use a thread per output filedescriptor so that one slow output (e.g. disk) +// doesn't block another (e.g. stdout to a terminal) (or vice-versa if you use +// flow control on the terminal!) + +#define _GNU_SOURCE // for clock_gettime() + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define READ_BUF_SIZE 4096 +#define SLOW_NSEC 4000 + +// *** GLOBALS ***************************************************************** + +// All queues are locked through one global mutex. +static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; + +// When a writer runs out of work to do, it sleeps on this global cond. +static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; + +// Asserted on receipt of SIGTERM, SIGINT. +static volatile int stopping = 0; + +// *** (end globals) *********************************************************** + +static void signal_handler(int _ignored_ __attribute__((__unused__))) { + stopping = 1; +} + +// Reference-counted buffer, contains data that was read in the main thread. +struct buf { + char* data; + int len; + int refcount; +}; + +// Queue of buffers. +struct buf_queue_elem { + struct buf* buf; + STAILQ_ENTRY(buf_queue_elem) entries; +}; +STAILQ_HEAD(buf_queue, buf_queue_elem); // struct buf_queue + +// Context for a writer thread. +struct writer_thread { + pthread_t thread; + int fd; + + // Each writer has its own queue. + struct buf_queue queue; + + STAILQ_ENTRY(writer_thread) entries; +}; + +// A list of writer threads. +STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list + +static struct buf* alloc_buf(const char* const data, const int len) { + assert(len > 0); + struct buf* buf = malloc(sizeof(*buf)); + buf->data = malloc((size_t)len); + memcpy(buf->data, data, (size_t)len); + buf->len = len; + buf->refcount = 0; + return buf; +} + +static void unref_buf(struct buf* buf) { + assert(buf->refcount > 0); + if ((--buf->refcount) == 0) { + free(buf->data); + free(buf); + } +} + +static void enqueue(struct buf_queue* restrict queue, + struct buf* restrict buf) { + struct buf_queue_elem* elem = malloc(sizeof(*elem)); + elem->buf = buf; + buf->refcount++; + STAILQ_INSERT_TAIL(queue, elem, entries); +} + +static struct buf* dequeue(struct buf_queue* const queue) { + assert(!STAILQ_EMPTY(queue)); + struct buf_queue_elem* head; + head = STAILQ_FIRST(queue); + STAILQ_REMOVE_HEAD(queue, entries); + struct buf* buf = head->buf; + free(head); + return buf; +} + +static void xpthread_create(pthread_t* thread, + void* (*start_routine)(void*), + void* arg) { + int ret = pthread_create(thread, NULL, start_routine, arg); + if (ret == 0) return; + errno = ret; + err(1, "pthread_create(%p) failed", thread); +} + +static void set_nonblocking(const int fd) { + int flags; + if ((flags = fcntl(fd, F_GETFL)) == -1) + err(1, "fcntl(fd = %d, F_GETFL) failed", fd); + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) + err(1, "fcntl(fd = %d, F_SETFL, O_NONBLOCK) failed", fd); +} + +static int make_file(const char* filename) { + int fd = open(filename, O_CREAT | O_EXCL | O_NONBLOCK | O_WRONLY, 0666); + if (fd == -1) + err(1, "failed to open(\"%s\")", filename); + return fd; +} + +static void get_mono_time(struct timespec* t) { + if (clock_gettime(CLOCK_MONOTONIC, t) == -1) + err(1, "clock_gettime(CLOCK_MONOTONIC) failed"); +} + +static void time_diff(const struct timespec* restrict start, + const struct timespec* restrict end, + struct timespec* restrict out) { + out->tv_sec = end->tv_sec - start->tv_sec; + out->tv_nsec = end->tv_nsec - start->tv_nsec; + if (out->tv_nsec < 0) { + out->tv_sec -= 1; + out->tv_nsec += 1000000000; + } + assert(out->tv_sec >= 0); + assert(out->tv_nsec >= 0); + assert(out->tv_nsec < 1000000000); +} + +static void warn_time(const char* desc, + const struct timespec* restrict start, + const struct timespec* restrict end) { + struct timespec diff; + time_diff(start, end, &diff); + if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) { + char buf[128]; + extern char *__progname; // This is where glibc stashes argv[0]. + snprintf(buf, sizeof(buf), "%d: %s: %s took %d.%09d secs\n", + (int)getpid(), __progname, desc, + (int)diff.tv_sec, (int)diff.tv_nsec); + // Best effort write to a non-blocking stderr. + (void)write(STDERR_FILENO, buf, strlen(buf)); + } +} + +static int xread(const int fd, char* const restrict buf, const int count) { + ssize_t read_ret; + int saved_errno; + struct timespec t0; + struct timespec t1; + + get_mono_time(&t0); + read_ret = read(fd, buf, (size_t)count); + saved_errno = errno; + get_mono_time(&t1); + warn_time("read()", &t0, &t1); + + errno = saved_errno; + if (read_ret == -1) + err(1, "read(fd = %d, count = %d) failed", fd, count); + //FIXME: EAGAIN? + + assert(read_ret >= 0); + return (int)read_ret; +} + +static int xwrite(const int fd, struct buf* const buf) { + ssize_t write_ret; + int saved_errno; + struct timespec t0; + struct timespec t1; + + get_mono_time(&t0); + write_ret = write(fd, buf->data, (size_t)buf->len); + saved_errno = errno; + get_mono_time(&t1); + warn_time("write()", &t0, &t1); + + errno = saved_errno; + if (write_ret == -1) + err(1, "write(fd = %d, count = %d) failed", fd, buf->len); + //FIXME: EAGAIN? + if (write_ret == 0) + return 0; + assert(write_ret >= 0); + if (write_ret < buf->len) + err(1, "write(fd = %d, count = %d) stopped short (returned %d)", + fd, buf->len, (int)write_ret); + // FIXME: handle this + assert(write_ret == buf->len); + return (int)write_ret; +} + +static int max(const int a, const int b) { return (a > b) ? a : b; } + +static void wait_until_readable(const int fd) { + int select_ret; + fd_set read_fds; + FD_ZERO(&read_fds); + FD_SET(fd, &read_fds); + if ((select_ret = select(fd + 1, &read_fds, NULL, NULL, NULL)) == -1) + err(1, "select() failed"); + if (!FD_ISSET(fd, &read_fds)) + errx(1, "select() did not return readable fd = %d", fd); +} + +static void lock(pthread_mutex_t* mutex) { + int ret = pthread_mutex_lock(mutex); + if (ret == 0) return; + errno = ret; + err(1, "pthread_mutex_lock(%p) failed", mutex); +} + +static void unlock(pthread_mutex_t* mutex) { + int ret = pthread_mutex_unlock(mutex); + if (ret == 0) return; + errno = ret; + err(1, "pthread_mutex_unlock(%p) failed", mutex); +} + +static void* writer_routine(void *arg) { + struct writer_thread* my = arg; + for (;;) { + // FIXME: less locking + struct buf* buf = NULL; + + lock(&shared_queue_mutex); + while (!stopping && STAILQ_EMPTY(&my->queue)) { + // Sleep. + pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex); + } + if (!STAILQ_EMPTY(&my->queue)) + buf = dequeue(&my->queue); + unlock(&shared_queue_mutex); + + if (stopping) break; + assert(buf != NULL); + + // Write. + int write_ret = xwrite(my->fd, buf); + if (write_ret == 0) { + errx(1, "fd %d hit EOF", my->fd); + } + assert(write_ret == buf->len); + + // Unreference buffer, freeing it if we have to. + lock(&shared_queue_mutex); + unref_buf(buf); + unlock(&shared_queue_mutex); + } + warnx("thread exiting cleanly"); + return NULL; +} + +static void add_writer_thread(struct writer_thread_list* list, const int fd) { + set_nonblocking(fd); + struct writer_thread* writer = malloc(sizeof(*writer)); + writer->fd = fd; + STAILQ_INIT(&(writer->queue)); + STAILQ_INSERT_TAIL(list, writer, entries); + xpthread_create(&(writer->thread), writer_routine, writer); +} + +static void xpthread_cond_broadcast(pthread_cond_t* cond) { + int ret = pthread_cond_broadcast(cond); + if (ret == 0) return; + errno = ret; + err(1, "pthread_cond_broadcast(%p) failed", cond); +} + +static void xpthread_join(pthread_t thread) { + int ret = pthread_join(thread, NULL); + if (ret == 0) return; + errno = ret; + err(1, "pthread_join(%lu) failed", thread); +} + +int main(int argc, char **argv) { + struct writer_thread_list writers; + STAILQ_INIT(&writers); + + if (signal(SIGINT, signal_handler) == SIG_ERR) err(1, "signal() failed"); + if (signal(SIGTERM, signal_handler) == SIG_ERR) err(1, "signal() failed"); + //if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) err(1, "signal() failed"); + set_nonblocking(STDERR_FILENO); + + // On Linux, making STDOUT non-blocking has the side-effect of + // also making STDIN nonblocking. + add_writer_thread(&writers, STDOUT_FILENO); + + // Process cmdline args. + for (int i = 1; i < argc; i++) { + add_writer_thread(&writers, make_file(argv[i])); + } + + // Reader loop. + while (!stopping) { + wait_until_readable(STDIN_FILENO); + if (stopping) { + warnx("stopping after select()"); + break; + } + + // Read. + char data[READ_BUF_SIZE]; + int read_ret = xread(STDIN_FILENO, data, sizeof(data)); + if (read_ret == 0) { + warnx("stdin hit EOF"); + break; + } + struct buf* buf = alloc_buf(data, read_ret); + + // Enqueue. + lock(&shared_queue_mutex); + struct writer_thread* writer; + STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf); + xpthread_cond_broadcast(&shared_wakeup_cond); + unlock(&shared_queue_mutex); + } + + // Wake and join threads. + stopping = 1; + lock(&shared_queue_mutex); + xpthread_cond_broadcast(&shared_wakeup_cond); + unlock(&shared_queue_mutex); + { + struct writer_thread* writer; + STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread); + } + + // Free writer list. + while (!STAILQ_EMPTY(&writers)) { + struct writer_thread* writer = STAILQ_FIRST(&writers); + STAILQ_REMOVE_HEAD(&writers, entries); + // FIXME: free its queue? + free(writer); + } + + warnx("exiting cleanly"); + return 0; +} +// vim:set ts=2 sw=2 tw=80 et: diff --git a/tests/build.sh b/tests/build.sh new file mode 100755 index 0000000..d0cef60 --- /dev/null +++ b/tests/build.sh @@ -0,0 +1,61 @@ +#!/bin/bash +# Test building with different compilers. +# Copyright (c) 2013 Emil Mikulic +# +BOLD=$(tput bold) +NORMAL=$(tput sgr0) +RED=$(tput setf 9) +GRAY=$(tput setf 8) +RULER=${GRAY}$(for i in $(seq $(tput cols)); do echo -n -; done)${NORMAL} +notice() { echo "${BOLD}==> $*${NORMAL}"; } +fatal() { echo "${RED}==> FATAL: $*${NORMAL}" >&2; exit 1; } +report() { + local RET=$? + echo -n "${BOLD}==> $*: " + if [[ $RET = 0 ]]; then + echo success + else + echo "${RED}FAILURE" + fi + echo "${RULER}" +} + +# Local hack: +MY_CLANG=$HOME/llvm/install/bin/clang +if [[ -z $CLANG ]]; then + # No CLANG env var set, try to guess. + if [[ -e $MY_CLANG ]]; then + CLANG=$MY_CLANG + else + CLANG=clang + fi +fi +if ! which $CLANG >/dev/null; then + fatal "can't find clang as [$CLANG]" +fi +notice "clang is [$CLANG]" + +RUNDIR=$PWD +notice "[$0] run from [$RUNDIR]" +cd $RUNDIR || exit 1 +SRCDIR=$(cd $(dirname $0)/..; pwd) +notice "src dir is [$SRCDIR]" +cd $SRCDIR || exit 1 + +notice build: standard +rm -f buftee +make +report build: standard + +notice build: gcc with all the warnings +rm -f buftee +CFLAGS="-fdiagnostics-show-option --all-warnings --extra-warnings -O" +env CC=gcc "CFLAGS=$CFLAGS" make +report build: gcc with all the warnings + +notice build: clang with all the warnings +rm -f buftee +env CC=$CLANG "CFLAGS=-Weverything" make +report build: clang + +notice finished