#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
+#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define SLOW_NSEC 4000
// Asserted on receipt of SIGTERM, SIGINT.
-static volatile int stopping = 0;
+static volatile int stopping = 0; // Global.
static void sig_stopping(int _ignored_ __attribute__((__unused__))) {
stopping = 1;
// A list of writer threads.
STAILQ_HEAD(writer_thread_list, writer_thread); // struct writer_thread_list
+static struct writer_thread* logger = NULL; // Global.
+static struct timespec logger_start; // Global.
+
static struct buf* alloc_buf(const char* const data, const int len) {
assert(len > 0);
struct buf* buf = malloc(sizeof(*buf));
return (int)syscall(SYS_gettid);
}
+static void nblogx(const char* format, ...); // Forward.
+
static void warn_time(const char* desc,
const struct timespec* restrict start,
const struct timespec* restrict end) {
+ if (pthread_self() == logger->thread) {
+ return;
+ }
struct timespec diff;
time_diff(start, end, &diff);
if (diff.tv_sec > 0 || diff.tv_nsec > SLOW_NSEC) {
- char buf[128];
- extern char *__progname; // This is where glibc stashes argv[0].
- snprintf(buf, sizeof(buf), "%s:tid %d: %s took %d.%09d secs\n",
- __progname, gettid(), desc, (int)diff.tv_sec, (int)diff.tv_nsec);
- // Best effort write to a non-blocking stderr.
- (void)write(STDERR_FILENO, buf, strlen(buf));
+ nblogx("%s took %d.%09d secs", desc, (int)diff.tv_sec, (int)diff.tv_nsec);
}
}
errno = saved_errno;
if (write_ret == -1) {
if (errno == EAGAIN) {
- warn("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
+ nblogx("write(fd = %d) got EAGAIN, sleeping and retrying", fd);
wait_until_writable(fd);
continue;
}
lock(my->mutex);
unref_buf(buf);
}
- warnx("thread exiting cleanly");
+ nblogx("thread exiting cleanly");
return NULL;
}
err(1, "pthread_cond_broadcast(%p) failed", cond);
}
+static void init_logger_thread(void) {
+ assert(logger == NULL);
+ get_mono_time(&logger_start);
+ logger = malloc(sizeof(*logger));
+ logger->fd = STDERR_FILENO;
+ logger->mutex = malloc(sizeof(*logger->mutex));
+ pthread_mutex_init(logger->mutex, NULL);
+ logger->cond = malloc(sizeof(*logger->cond));
+ pthread_cond_init(logger->cond, NULL);
+ STAILQ_INIT(&logger->queue);
+ xpthread_create(&logger->thread, writer_routine, logger);
+}
+
+static void _nblog_helper(int want_errno,
+ int saved_errno,
+ const char* format,
+ va_list va) {
+ // Timing.
+ struct timespec now;
+ struct timespec diff;
+ get_mono_time(&now);
+ time_diff(&logger_start, &now, &diff);
+
+ // Prefix.
+ char buf[512];
+ size_t len;
+ extern char *__progname; // This is where glibc stashes argv[0].
+ len = snprintf(buf, sizeof(buf), "%s: tid %d at %d.%09d: ",
+ __progname, gettid(), (int)diff.tv_sec, (int)diff.tv_nsec);
+
+ // Format message.
+ len += vsnprintf(buf + len, sizeof(buf) - len, format, va);
+
+ if (want_errno) {
+ len += snprintf(buf + len, sizeof(buf) - len, ": %s (errno = %d)",
+ strerror(saved_errno), saved_errno);
+ }
+
+ len += snprintf(buf + len, sizeof(buf) - len, "\n");
+ struct buf* b = alloc_buf(buf, len);
+
+ // Enqueue.
+ lock(logger->mutex);
+ enqueue(&logger->queue, b);
+ xpthread_cond_broadcast(logger->cond);
+ unlock(logger->mutex);
+}
+
+// nblog() is like warn() but non-blocking.
+static void nblog(const char* format, ...) {
+ int saved_errno = errno;
+ va_list va;
+ va_start(va, format);
+ _nblog_helper(1, saved_errno, format, va);
+ va_end(va);
+}
+
+static void nblogx(const char* format, ...) {
+ va_list va;
+ va_start(va, format);
+ _nblog_helper(0, 0, format, va);
+ va_end(va);
+}
+
static void xpthread_join(pthread_t thread) {
int ret = pthread_join(thread, NULL);
if (ret == 0) return;
}
int main(int argc, char **argv) {
+ init_logger_thread();
+ nblogx("starting");
+
pthread_mutex_t shared_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t shared_wakeup_cond = PTHREAD_COND_INITIALIZER;
while (!stopping) {
wait_until_readable(STDIN_FILENO);
if (stopping) {
- warnx("stopping after select()");
+ nblogx("stopping after select()");
break;
}
char data[READ_BUF_SIZE];
int read_ret = xread(STDIN_FILENO, data, sizeof(data));
if (read_ret == 0) {
- warnx("stdin hit EOF");
+ nblogx("stdin hit EOF");
break;
}
struct buf* buf = alloc_buf(data, read_ret);
free(writer);
}
+ // Clean up logger thread.
+ nblogx("writer threads stopped");
+ lock(logger->mutex);
+ xpthread_cond_broadcast(logger->cond);
+ unlock(logger->mutex);
+ xpthread_join(logger->thread);
+ free(logger);
+
warnx("exiting cleanly");
return 0;
}