From: Emil Mikulic Date: Sun, 24 Mar 2013 14:16:47 +0000 (+1100) Subject: Use a separate thread for logging to stderr. X-Git-Url: https://unix4lyfe.org/gitweb/buftee/commitdiff_plain/b796cc222cabf6849c7f20b7f2194d7ae5ed7e08 Use a separate thread for logging to stderr. --- diff --git a/buftee.c b/buftee.c index 1f22a6a..651e6f3 100644 --- a/buftee.c +++ b/buftee.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +53,7 @@ #define SLOW_NSEC 4000 // Asserted on receipt of SIGTERM, SIGINT. -static volatile int stopping = 0; +static volatile int stopping = 0; // Global. static void sig_stopping(int _ignored_ __attribute__((__unused__))) { stopping = 1; @@ -92,6 +93,9 @@ struct writer_thread { // A list of writer threads. STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list +static struct writer_thread* logger = NULL; // Global. +static struct timespec logger_start; // Global. + static struct buf* alloc_buf(const char* const data, const int len) { assert(len > 0); struct buf* buf = malloc(sizeof(*buf)); @@ -175,18 +179,18 @@ static int gettid(void) { return (int)syscall(SYS_gettid); } +static void nblogx(const char* format, ...); // Forward. + static void warn_time(const char* desc, const struct timespec* restrict start, const struct timespec* restrict end) { + if (pthread_self() == logger->thread) { + return; + } struct timespec diff; time_diff(start, end, &diff); if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) { - char buf[128]; - extern char *__progname; // This is where glibc stashes argv[0]. - snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n", - __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec); - // Best effort write to a non-blocking stderr. - (void)write(STDERR_FILENO, buf, strlen(buf)); + nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec); } } @@ -243,7 +247,7 @@ static int xwrite(const int fd, struct buf* const buf) { errno = saved_errno; if (write_ret == -1) { if (errno == EAGAIN) { - warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd); + nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd); wait_until_writable(fd); continue; } @@ -318,7 +322,7 @@ static void* writer_routine(void *arg) { lock(my->mutex); unref_buf(buf); } - warnx("thread exiting cleanly"); + nblogx("thread exiting cleanly"); return NULL; } @@ -343,6 +347,70 @@ static void xpthread_cond_broadcast(pthread_cond_t* cond) { err(1, "pthread_cond_broadcast(%p) failed", cond); } +static void init_logger_thread(void) { + assert(logger == NULL); + get_mono_time(&logger_start); + logger = malloc(sizeof(*logger)); + logger->fd = STDERR_FILENO; + logger->mutex = malloc(sizeof(*logger->mutex)); + pthread_mutex_init(logger->mutex, NULL); + logger->cond = malloc(sizeof(*logger->cond)); + pthread_cond_init(logger->cond, NULL); + STAILQ_INIT(&logger->queue); + xpthread_create(&logger->thread, writer_routine, logger); +} + +static void _nblog_helper(int want_errno, + int saved_errno, + const char* format, + va_list va) { + // Timing. + struct timespec now; + struct timespec diff; + get_mono_time(&now); + time_diff(&logger_start, &now, &diff); + + // Prefix. + char buf[512]; + size_t len; + extern char *__progname; // This is where glibc stashes argv[0]. + len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ", + __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec); + + // Format message. + len += vsnprintf(buf + len, sizeof(buf) - len, format, va); + + if (want_errno) { + len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)", + strerror(saved_errno), saved_errno); + } + + len += snprintf(buf + len, sizeof(buf) - len, "\n"); + struct buf* b = alloc_buf(buf, len); + + // Enqueue. + lock(logger->mutex); + enqueue(&logger->queue, b); + xpthread_cond_broadcast(logger->cond); + unlock(logger->mutex); +} + +// nblog() is like warn() but non-blocking. +static void nblog(const char* format, ...) { + int saved_errno = errno; + va_list va; + va_start(va, format); + _nblog_helper(1, saved_errno, format, va); + va_end(va); +} + +static void nblogx(const char* format, ...) { + va_list va; + va_start(va, format); + _nblog_helper(0, 0, format, va); + va_end(va); +} + static void xpthread_join(pthread_t thread) { int ret = pthread_join(thread, NULL); if (ret == 0) return; @@ -355,6 +423,9 @@ static void sig_continue(int _ignored_ __attribute__((__unused__))) { } int main(int argc, char **argv) { + init_logger_thread(); + nblogx("starting"); + pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER; @@ -386,7 +457,7 @@ int main(int argc, char **argv) { while (!stopping) { wait_until_readable(STDIN_FILENO); if (stopping) { - warnx("stopping after select()"); + nblogx("stopping after select()"); break; } @@ -394,7 +465,7 @@ int main(int argc, char **argv) { char data[READ_BUF_SIZE]; int read_ret = xread(STDIN_FILENO, data, sizeof(data)); if (read_ret == 0) { - warnx("stdin hit EOF"); + nblogx("stdin hit EOF"); break; } struct buf* buf = alloc_buf(data, read_ret); @@ -429,6 +500,14 @@ int main(int argc, char **argv) { free(writer); } + // Clean up logger thread. + nblogx("writer threads stopped"); + lock(logger->mutex); + xpthread_cond_broadcast(logger->cond); + unlock(logger->mutex); + xpthread_join(logger->thread); + free(logger); + warnx("exiting cleanly"); return 0; }