X-Git-Url: https://unix4lyfe.org/gitweb/buftee/blobdiff_plain/b796cc222cabf6849c7f20b7f2194d7ae5ed7e08..HEAD:/buftee.c diff --git a/buftee.c b/buftee.c index 651e6f3..552a333 100644 --- a/buftee.c +++ b/buftee.c @@ -87,6 +87,9 @@ struct writer_thread { // When a writer runs out of work to do, it sleeps on a shared condition. pthread_cond_t* cond; + // The writer thread exits when it's not running and its queue is empty. + volatile int running; + STAILQ_ENTRY(writer_thread) entries; }; @@ -179,7 +182,14 @@ static int gettid(void) { return (int)syscall(SYS_gettid); } -static void nblogx(const char* format, ...); // Forward. +static void _nblog_helper(int want_errno, + int line, + const char* func, + const char* format, + ...); // Forward. + +//#define nblog(fmt...) _nblog_helper(1, __LINE__, __FUNCTION__, fmt) +#define nblogx(fmt...) _nblog_helper(0, __LINE__, __FUNCTION__, fmt) static void warn_time(const char* desc, const struct timespec* restrict start, @@ -231,16 +241,17 @@ static void wait_until_writable(const int fd) { errx(1, "select() did not return writable fd = %d", fd); } -static int xwrite(const int fd, struct buf* const buf) { - ssize_t write_ret; - int saved_errno; - struct timespec t0; - struct timespec t1; +// Returns zero on EOF, dies on error, retries on short write. +static int write_buf(const int fd, struct buf* const buf) { + char* data = buf->data; + size_t left = (size_t)buf->len; for (;;) { + struct timespec t0; + struct timespec t1; get_mono_time(&t0); - write_ret = write(fd, buf->data, (size_t)buf->len); - saved_errno = errno; + ssize_t write_ret = write(fd, data, left); + int saved_errno = errno; get_mono_time(&t1); warn_time("write()", &t0, &t1); @@ -251,17 +262,21 @@ static int xwrite(const int fd, struct buf* const buf) { wait_until_writable(fd); continue; } - err(1, "write(fd = %d, count = %d) failed", fd, buf->len); + err(1, "write(fd = %d, count = %d) failed", fd, (int)left); } - if (write_ret == 0) + if (write_ret == 0) { return 0; // EOF + } assert(write_ret >= 0); - if (write_ret < buf->len) - err(1, "write(fd = %d, count = %d) stopped short (returned %d)", - fd, buf->len, (int)write_ret); - // FIXME: handle this - assert(write_ret == buf->len); - return (int)write_ret; + if (write_ret < (ssize_t)left) { + nblogx("short write(fd = %d, count = %d): wrote %d", + fd, (int)left, (int)write_ret); + left -= write_ret; + data += write_ret; + continue; + } + assert(write_ret == (ssize_t)left); + return 1; } } @@ -298,31 +313,37 @@ static void unlock(pthread_mutex_t* mutex) { static void* writer_routine(void *arg) { struct writer_thread* my = arg; struct buf* buf = NULL; + + nblogx("writer thread for fd %d starting", my->fd); lock(my->mutex); for (;;) { - while (!stopping && STAILQ_EMPTY(&my->queue)) { + while (my->running && STAILQ_EMPTY(&my->queue)) { // Sleep. pthread_cond_wait(my->cond, my->mutex); } - if (!STAILQ_EMPTY(&my->queue)) + if (!STAILQ_EMPTY(&my->queue)) { buf = dequeue(&my->queue); + } unlock(my->mutex); - if (stopping) break; - assert(buf != NULL); + if (buf == NULL) { + assert(!my->running); + break; + } // Write. - int write_ret = xwrite(my->fd, buf); - if (write_ret == 0) { + if (write_buf(my->fd, buf) == 0) { errx(1, "fd %d hit EOF", my->fd); } - assert(write_ret == buf->len); // Unreference buffer, freeing it if we have to. lock(my->mutex); unref_buf(buf); + buf = NULL; + } + if (arg != logger) { + nblogx("thread exiting cleanly"); } - nblogx("thread exiting cleanly"); return NULL; } @@ -335,6 +356,7 @@ static void add_writer_thread(struct writer_thread_list* list, writer->fd = fd; writer->mutex = shared_queue_mutex; writer->cond = shared_wakeup_cond; + writer->running = 1; STAILQ_INIT(&writer->queue); STAILQ_INSERT_TAIL(list, writer, entries); xpthread_create(&writer->thread, writer_routine, writer); @@ -356,14 +378,20 @@ static void init_logger_thread(void) { pthread_mutex_init(logger->mutex, NULL); logger->cond = malloc(sizeof(*logger->cond)); pthread_cond_init(logger->cond, NULL); + logger->running = 1; STAILQ_INIT(&logger->queue); xpthread_create(&logger->thread, writer_routine, logger); } static void _nblog_helper(int want_errno, - int saved_errno, + int line, + const char* func, const char* format, - va_list va) { + ...) { + int saved_errno = errno; + va_list va; + va_start(va, format); + // Timing. struct timespec now; struct timespec diff; @@ -372,10 +400,11 @@ static void _nblog_helper(int want_errno, // Prefix. char buf[512]; - size_t len; + int len; extern char *__progname; // This is where glibc stashes argv[0]. - len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ", - __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec); + len = snprintf(buf, sizeof(buf), "%s:%d:%s(): tid %d at %d.%09d: ", + __progname, line, func, gettid(), + (int)diff.tv_sec, (int)diff.tv_nsec); // Format message. len += vsnprintf(buf + len, sizeof(buf) - len, format, va); @@ -395,22 +424,6 @@ static void _nblog_helper(int want_errno, unlock(logger->mutex); } -// nblog() is like warn() but non-blocking. -static void nblog(const char* format, ...) { - int saved_errno = errno; - va_list va; - va_start(va, format); - _nblog_helper(1, saved_errno, format, va); - va_end(va); -} - -static void nblogx(const char* format, ...) { - va_list va; - va_start(va, format); - _nblog_helper(0, 0, format, va); - va_end(va); -} - static void xpthread_join(pthread_t thread) { int ret = pthread_join(thread, NULL); if (ret == 0) return; @@ -480,9 +493,14 @@ int main(int argc, char **argv) { unlock(&shared_queue_mutex); } - // Wake and join threads. + nblogx("stopping: draining writer threads"); lock(&shared_queue_mutex); - stopping = 1; + { + struct writer_thread* writer; + STAILQ_FOREACH(writer, &writers, entries) { + writer->running = 0; + } + } xpthread_cond_broadcast(&shared_wakeup_cond); unlock(&shared_queue_mutex); { @@ -496,16 +514,23 @@ int main(int argc, char **argv) { while (!STAILQ_EMPTY(&writers)) { struct writer_thread* writer = STAILQ_FIRST(&writers); STAILQ_REMOVE_HEAD(&writers, entries); - // FIXME: free its queue? + if (!STAILQ_EMPTY(&writer->queue)) { + nblogx("queue for fd %d is not empty", writer->fd); + } free(writer); } // Clean up logger thread. - nblogx("writer threads stopped"); + nblogx("writer threads stopped, stopping logger"); lock(logger->mutex); + logger->running = 0; xpthread_cond_broadcast(logger->cond); unlock(logger->mutex); xpthread_join(logger->thread); + pthread_mutex_destroy(logger->mutex); + free(logger->mutex); + pthread_cond_destroy(logger->cond); + free(logger->cond); free(logger); warnx("exiting cleanly");