X-Git-Url: https://unix4lyfe.org/gitweb/buftee/blobdiff_plain/a79b289569a193985f754d8394b9e1f470d83cd7..HEAD:/buftee.c diff --git a/buftee.c b/buftee.c index 8e430b6..552a333 100644 --- a/buftee.c +++ b/buftee.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -51,18 +52,8 @@ #define READ_BUF_SIZE 4096 #define SLOW_NSEC 4000 -// *** GLOBALS ***************************************************************** - -// All queues are locked through one global mutex. -static pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; - -// When a writer runs out of work to do, it sleeps on this global cond. -static pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; - // Asserted on receipt of SIGTERM, SIGINT. -static volatile int stopping = 0; - -// *** (end globals) *********************************************************** +static volatile int stopping = 0; // Global. static void sig_stopping(int _ignored_ __attribute__((__unused__))) { stopping = 1; @@ -90,12 +81,24 @@ struct writer_thread { // Each writer has its own queue. struct buf_queue queue; + // Pointer to a shared mutex which protects all queues. + pthread_mutex_t* mutex; + + // When a writer runs out of work to do, it sleeps on a shared condition. + pthread_cond_t* cond; + + // The writer thread exits when it's not running and its queue is empty. + volatile int running; + STAILQ_ENTRY(writer_thread) entries; }; // A list of writer threads. STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list +static struct writer_thread* logger = NULL; // Global. +static struct timespec logger_start; // Global. + static struct buf* alloc_buf(const char* const data, const int len) { assert(len > 0); struct buf* buf = malloc(sizeof(*buf)); @@ -179,18 +182,25 @@ static int gettid(void) { return (int)syscall(SYS_gettid); } +static void _nblog_helper(int want_errno, + int line, + const char* func, + const char* format, + ...); // Forward. + +//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt) +#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt) + static void warn_time(const char* desc, const struct timespec* restrict start, const struct timespec* restrict end) { + if (pthread_self() == logger->thread) { + return; + } struct timespec diff; time_diff(start, end, &diff); if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) { - char buf[128]; - extern char *__progname; // This is where glibc stashes argv[0]. - snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n", - __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec); - // Best effort write to a non-blocking stderr. - (void)write(STDERR_FILENO, buf, strlen(buf)); + nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec); } } @@ -231,37 +241,42 @@ static void wait_until_writable(const int fd) { errx(1, "select() did not return writable fd = %d", fd); } -static int xwrite(const int fd, struct buf* const buf) { - ssize_t write_ret; - int saved_errno; - struct timespec t0; - struct timespec t1; +// Returns zero on EOF, dies on error, retries on short write. +static int write_buf(const int fd, struct buf* const buf) { + char* data = buf->data; + size_t left = (size_t)buf->len; for (;;) { + struct timespec t0; + struct timespec t1; get_mono_time(&t0); - write_ret = write(fd, buf->data, (size_t)buf->len); - saved_errno = errno; + ssize_t write_ret = write(fd, data, left); + int saved_errno = errno; get_mono_time(&t1); warn_time("write()", &t0, &t1); errno = saved_errno; if (write_ret == -1) { if (errno == EAGAIN) { - warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd); + nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd); wait_until_writable(fd); continue; } - err(1, "write(fd = %d, count = %d) failed", fd, buf->len); + err(1, "write(fd = %d, count = %d) failed", fd, (int)left); } - if (write_ret == 0) + if (write_ret == 0) { return 0; // EOF + } assert(write_ret >= 0); - if (write_ret < buf->len) - err(1, "write(fd = %d, count = %d) stopped short (returned %d)", - fd, buf->len, (int)write_ret); - // FIXME: handle this - assert(write_ret == buf->len); - return (int)write_ret; + if (write_ret < (ssize_t)left) { + nblogx("short write(fd = %d, count = %d): wrote %d", + fd, (int)left, (int)write_ret); + left -= write_ret; + data += write_ret; + continue; + } + assert(write_ret == (ssize_t)left); + return 1; } } @@ -298,41 +313,53 @@ static void unlock(pthread_mutex_t* mutex) { static void* writer_routine(void *arg) { struct writer_thread* my = arg; struct buf* buf = NULL; - lock(&shared_queue_mutex); + + nblogx("writer thread for fd %d starting", my->fd); + lock(my->mutex); for (;;) { - while (!stopping && STAILQ_EMPTY(&my->queue)) { + while (my->running && STAILQ_EMPTY(&my->queue)) { // Sleep. - pthread_cond_wait(&shared_wakeup_cond, &shared_queue_mutex); + pthread_cond_wait(my->cond, my->mutex); } - if (!STAILQ_EMPTY(&my->queue)) + if (!STAILQ_EMPTY(&my->queue)) { buf = dequeue(&my->queue); - unlock(&shared_queue_mutex); + } + unlock(my->mutex); - if (stopping) break; - assert(buf != NULL); + if (buf == NULL) { + assert(!my->running); + break; + } // Write. - int write_ret = xwrite(my->fd, buf); - if (write_ret == 0) { + if (write_buf(my->fd, buf) == 0) { errx(1, "fd %d hit EOF", my->fd); } - assert(write_ret == buf->len); // Unreference buffer, freeing it if we have to. - lock(&shared_queue_mutex); + lock(my->mutex); unref_buf(buf); + buf = NULL; + } + if (arg != logger) { + nblogx("thread exiting cleanly"); } - warnx("thread exiting cleanly"); return NULL; } -static void add_writer_thread(struct writer_thread_list* list, const int fd) { - set_nonblocking(fd); +static void add_writer_thread(struct writer_thread_list* list, + int fd, + pthread_mutex_t* shared_queue_mutex, + pthread_cond_t* shared_wakeup_cond) { struct writer_thread* writer = malloc(sizeof(*writer)); + set_nonblocking(fd); writer->fd = fd; - STAILQ_INIT(&(writer->queue)); + writer->mutex = shared_queue_mutex; + writer->cond = shared_wakeup_cond; + writer->running = 1; + STAILQ_INIT(&writer->queue); STAILQ_INSERT_TAIL(list, writer, entries); - xpthread_create(&(writer->thread), writer_routine, writer); + xpthread_create(&writer->thread, writer_routine, writer); } static void xpthread_cond_broadcast(pthread_cond_t* cond) { @@ -342,6 +369,61 @@ static void xpthread_cond_broadcast(pthread_cond_t* cond) { err(1, "pthread_cond_broadcast(%p) failed", cond); } +static void init_logger_thread(void) { + assert(logger == NULL); + get_mono_time(&logger_start); + logger = malloc(sizeof(*logger)); + logger->fd = STDERR_FILENO; + logger->mutex = malloc(sizeof(*logger->mutex)); + pthread_mutex_init(logger->mutex, NULL); + logger->cond = malloc(sizeof(*logger->cond)); + pthread_cond_init(logger->cond, NULL); + logger->running = 1; + STAILQ_INIT(&logger->queue); + xpthread_create(&logger->thread, writer_routine, logger); +} + +static void _nblog_helper(int want_errno, + int line, + const char* func, + const char* format, + ...) { + int saved_errno = errno; + va_list va; + va_start(va, format); + + // Timing. + struct timespec now; + struct timespec diff; + get_mono_time(&now); + time_diff(&logger_start, &now, &diff); + + // Prefix. + char buf[512]; + int len; + extern char *__progname; // This is where glibc stashes argv[0]. + len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ", + __progname, line, func, gettid(), + (int)diff.tv_sec, (int)diff.tv_nsec); + + // Format message. + len += vsnprintf(buf + len, sizeof(buf) - len, format, va); + + if (want_errno) { + len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)", + strerror(saved_errno), saved_errno); + } + + len += snprintf(buf + len, sizeof(buf) - len, "\n"); + struct buf* b = alloc_buf(buf, len); + + // Enqueue. + lock(logger->mutex); + enqueue(&logger->queue, b); + xpthread_cond_broadcast(logger->cond); + unlock(logger->mutex); +} + static void xpthread_join(pthread_t thread) { int ret = pthread_join(thread, NULL); if (ret == 0) return; @@ -354,6 +436,12 @@ static void sig_continue(int _ignored_ __attribute__((__unused__))) { } int main(int argc, char **argv) { + init_logger_thread(); + nblogx("starting"); + + pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; + struct writer_thread_list writers; STAILQ_INIT(&writers); @@ -365,18 +453,24 @@ int main(int argc, char **argv) { // On Linux, making STDOUT non-blocking has the side-effect of // also making STDIN nonblocking. - add_writer_thread(&writers, STDOUT_FILENO); + add_writer_thread(&writers, + STDOUT_FILENO, + &shared_queue_mutex, + &shared_wakeup_cond); // Process cmdline args. for (int i = 1; i < argc; i++) { - add_writer_thread(&writers, make_file(argv[i])); + add_writer_thread(&writers, + make_file(argv[i]), + &shared_queue_mutex, + &shared_wakeup_cond); } // Reader loop. while (!stopping) { wait_until_readable(STDIN_FILENO); if (stopping) { - warnx("stopping after select()"); + nblogx("stopping after select()"); break; } @@ -384,7 +478,7 @@ int main(int argc, char **argv) { char data[READ_BUF_SIZE]; int read_ret = xread(STDIN_FILENO, data, sizeof(data)); if (read_ret == 0) { - warnx("stdin hit EOF"); + nblogx("stdin hit EOF"); break; } struct buf* buf = alloc_buf(data, read_ret); @@ -392,29 +486,53 @@ int main(int argc, char **argv) { // Enqueue. lock(&shared_queue_mutex); struct writer_thread* writer; - STAILQ_FOREACH(writer, &writers, entries) enqueue(&(writer->queue), buf); + STAILQ_FOREACH(writer, &writers, entries) { + enqueue(&(writer->queue), buf); + } xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); } - // Wake and join threads. - stopping = 1; + nblogx("stopping: draining writer threads"); lock(&shared_queue_mutex); + { + struct writer_thread* writer; + STAILQ_FOREACH(writer, &writers, entries) { + writer->running = 0; + } + } xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); { struct writer_thread* writer; - STAILQ_FOREACH(writer, &writers, entries) xpthread_join(writer->thread); + STAILQ_FOREACH(writer, &writers, entries) { + xpthread_join(writer->thread); + } } // Free writer list. while (!STAILQ_EMPTY(&writers)) { struct writer_thread* writer = STAILQ_FIRST(&writers); STAILQ_REMOVE_HEAD(&writers, entries); - // FIXME: free its queue? + if (!STAILQ_EMPTY(&writer->queue)) { + nblogx("queue for fd %d is not empty", writer->fd); + } free(writer); } + // Clean up logger thread. + nblogx("writer threads stopped, stopping logger"); + lock(logger->mutex); + logger->running = 0; + xpthread_cond_broadcast(logger->cond); + unlock(logger->mutex); + xpthread_join(logger->thread); + pthread_mutex_destroy(logger->mutex); + free(logger->mutex); + pthread_cond_destroy(logger->cond); + free(logger->cond); + free(logger); + warnx("exiting cleanly"); return 0; }